|
@@ -33,7 +33,7 @@ BUILD_FILE_PATH = os.environ['CMDCTL_BUILD_PATH'] + os.sep
|
|
|
# Rewrite the class for unittest.
|
|
|
class MySecureHTTPRequestHandler(SecureHTTPRequestHandler):
|
|
|
def __init__(self):
|
|
|
- pass
|
|
|
+ self.session_id = None
|
|
|
|
|
|
def send_response(self, rcode):
|
|
|
self.rcode = rcode
|
|
@@ -41,19 +41,6 @@ class MySecureHTTPRequestHandler(SecureHTTPRequestHandler):
|
|
|
def end_headers(self):
|
|
|
pass
|
|
|
|
|
|
- def do_GET(self):
|
|
|
- self.wfile = open('tmp.file', 'wb')
|
|
|
- super().do_GET()
|
|
|
- self.wfile.close()
|
|
|
- os.remove('tmp.file')
|
|
|
-
|
|
|
- def do_POST(self):
|
|
|
- self.wfile = open("tmp.file", 'wb')
|
|
|
- super().do_POST()
|
|
|
- self.wfile.close()
|
|
|
- os.remove('tmp.file')
|
|
|
-
|
|
|
-
|
|
|
class FakeSecureHTTPServer(SecureHTTPServer):
|
|
|
def __init__(self):
|
|
|
self.user_sessions = {}
|
|
@@ -93,13 +80,22 @@ class TestSecureHTTPRequestHandler(unittest.TestCase):
|
|
|
self.handler.server.user_sessions = {}
|
|
|
self.handler.server._user_infos = {}
|
|
|
self.handler.headers = {}
|
|
|
- self.handler.rfile = open("check.tmp", 'w+b')
|
|
|
+ self.handler.rfile = open('input.tmp', 'w+b')
|
|
|
+ self.handler.wfile = open('output.tmp', 'w+b')
|
|
|
|
|
|
def tearDown(self):
|
|
|
sys.stdout.close()
|
|
|
sys.stdout = self.old_stdout
|
|
|
+ self.handler.wfile.close()
|
|
|
+ os.remove('output.tmp')
|
|
|
self.handler.rfile.close()
|
|
|
- os.remove('check.tmp')
|
|
|
+ os.remove('input.tmp')
|
|
|
+
|
|
|
+ def test_is_session_valid(self):
|
|
|
+ self.assertIsNone(self.handler.session_id)
|
|
|
+ self.assertFalse(self.handler._is_session_valid())
|
|
|
+ self.handler.session_id = 4234
|
|
|
+ self.assertTrue(self.handler._is_session_valid())
|
|
|
|
|
|
def test_parse_request_path(self):
|
|
|
self.handler.path = ''
|
|
@@ -160,7 +156,7 @@ class TestSecureHTTPRequestHandler(unittest.TestCase):
|
|
|
self.handler.do_GET()
|
|
|
self.assertEqual(self.handler.rcode, http.client.OK)
|
|
|
|
|
|
- def test_user_logged_in(self):
|
|
|
+ def test_is_user_logged_in(self):
|
|
|
self.handler.server.user_sessions = {}
|
|
|
self.handler.session_id = 12345
|
|
|
self.assertTrue(self.handler._is_user_logged_in() == False)
|
|
@@ -294,6 +290,68 @@ class TestSecureHTTPRequestHandler(unittest.TestCase):
|
|
|
rcode, reply = self.handler._handle_post_request()
|
|
|
self.assertEqual(http.client.BAD_REQUEST, rcode)
|
|
|
|
|
|
+ def test_handle_login(self):
|
|
|
+ orig_is_user_logged_in = self.handler._is_user_logged_in
|
|
|
+ orig_check_user_name_and_pwd = self.handler._check_user_name_and_pwd
|
|
|
+ try:
|
|
|
+ def create_is_user_logged_in(status):
|
|
|
+ '''Create a replacement _is_user_logged_in() method.'''
|
|
|
+ def my_is_user_logged_in():
|
|
|
+ return status
|
|
|
+ return my_is_user_logged_in
|
|
|
+
|
|
|
+ # Check case where _is_user_logged_in() returns True
|
|
|
+ self.handler._is_user_logged_in = create_is_user_logged_in(True)
|
|
|
+ self.handler.headers['cookie'] = 12345
|
|
|
+ self.handler.path = '/login'
|
|
|
+ self.handler.do_POST()
|
|
|
+ self.assertEqual(self.handler.rcode, http.client.OK)
|
|
|
+ self.handler.wfile.seek(0, 0)
|
|
|
+ d = self.handler.wfile.read()
|
|
|
+ self.assertEqual(json.loads(d.decode()),
|
|
|
+ ['user has already login'])
|
|
|
+
|
|
|
+ # Clear the output
|
|
|
+ self.handler.wfile.seek(0, 0)
|
|
|
+ self.handler.wfile.truncate()
|
|
|
+
|
|
|
+ # Check case where _is_user_logged_in() returns False
|
|
|
+ self.handler._is_user_logged_in = create_is_user_logged_in(False)
|
|
|
+
|
|
|
+ def create_check_user_name_and_pwd(status, error_info=None):
|
|
|
+ '''Create a replacement _check_user_name_and_pwd() method.'''
|
|
|
+ def my_check_user_name_and_pwd():
|
|
|
+ return status, error_info
|
|
|
+ return my_check_user_name_and_pwd
|
|
|
+
|
|
|
+ # (a) Check case where _check_user_name_and_pwd() returns
|
|
|
+ # valid user status
|
|
|
+ self.handler._check_user_name_and_pwd = \
|
|
|
+ create_check_user_name_and_pwd(True)
|
|
|
+ self.handler.do_POST()
|
|
|
+ self.assertEqual(self.handler.rcode, http.client.OK)
|
|
|
+ self.handler.wfile.seek(0, 0)
|
|
|
+ d = self.handler.wfile.read()
|
|
|
+ self.assertEqual(json.loads(d.decode()), ['login success'])
|
|
|
+
|
|
|
+ # Clear the output
|
|
|
+ self.handler.wfile.seek(0, 0)
|
|
|
+ self.handler.wfile.truncate()
|
|
|
+
|
|
|
+ # (b) Check case where _check_user_name_and_pwd() returns
|
|
|
+ # invalid user status
|
|
|
+ self.handler._check_user_name_and_pwd = \
|
|
|
+ create_check_user_name_and_pwd(False, ['login failed'])
|
|
|
+ self.handler.do_POST()
|
|
|
+ self.assertEqual(self.handler.rcode, http.client.UNAUTHORIZED)
|
|
|
+ self.handler.wfile.seek(0, 0)
|
|
|
+ d = self.handler.wfile.read()
|
|
|
+ self.assertEqual(json.loads(d.decode()), ['login failed'])
|
|
|
+
|
|
|
+ finally:
|
|
|
+ self.handler._is_user_logged_in = orig_is_user_logged_in
|
|
|
+ self.handler._check_user_name_and_pwd = orig_check_user_name_and_pwd
|
|
|
+
|
|
|
class MyCommandControl(CommandControl):
|
|
|
def _get_modules_specification(self):
|
|
|
return {}
|
|
@@ -470,6 +528,12 @@ class TestSecureHTTPServer(unittest.TestCase):
|
|
|
self.assertEqual(1, len(self.server._user_infos))
|
|
|
self.assertTrue('root' in self.server._user_infos)
|
|
|
|
|
|
+ def test_get_user_info(self):
|
|
|
+ self.assertIsNone(self.server.get_user_info('root'))
|
|
|
+ self.server._create_user_info(SRC_FILE_PATH + 'cmdctl-accounts.csv')
|
|
|
+ self.assertIn('6f0c73bd33101a5ec0294b3ca39fec90ef4717fe',
|
|
|
+ self.server.get_user_info('root'))
|
|
|
+
|
|
|
def test_check_file(self):
|
|
|
# Just some file that we know exists
|
|
|
file_name = BUILD_FILE_PATH + 'cmdctl-keyfile.pem'
|