|
@@ -17,6 +17,7 @@
|
|
|
import unittest
|
|
|
import socket
|
|
|
import tempfile
|
|
|
+import stat
|
|
|
import sys
|
|
|
from cmdctl import *
|
|
|
import isc.log
|
|
@@ -36,7 +37,7 @@ class MySecureHTTPRequestHandler(SecureHTTPRequestHandler):
|
|
|
|
|
|
def send_response(self, rcode):
|
|
|
self.rcode = rcode
|
|
|
-
|
|
|
+
|
|
|
def end_headers(self):
|
|
|
pass
|
|
|
|
|
@@ -51,13 +52,13 @@ class MySecureHTTPRequestHandler(SecureHTTPRequestHandler):
|
|
|
super().do_POST()
|
|
|
self.wfile.close()
|
|
|
os.remove('tmp.file')
|
|
|
-
|
|
|
+
|
|
|
|
|
|
class FakeSecureHTTPServer(SecureHTTPServer):
|
|
|
def __init__(self):
|
|
|
self.user_sessions = {}
|
|
|
self.cmdctl = FakeCommandControlForTestRequestHandler()
|
|
|
- self._verbose = True
|
|
|
+ self._verbose = True
|
|
|
self._user_infos = {}
|
|
|
self.idle_timeout = 1200
|
|
|
self._lock = threading.Lock()
|
|
@@ -71,6 +72,17 @@ class FakeCommandControlForTestRequestHandler(CommandControl):
|
|
|
def send_command(self, mod, cmd, param):
|
|
|
return 0, {}
|
|
|
|
|
|
+# context to temporarily make a file unreadable
|
|
|
+class UnreadableFile:
|
|
|
+ def __init__(self, file_name):
|
|
|
+ self.file_name = file_name
|
|
|
+ self.orig_mode = os.stat(file_name).st_mode
|
|
|
+
|
|
|
+ def __enter__(self):
|
|
|
+ os.chmod(self.file_name, self.orig_mode & ~stat.S_IRUSR)
|
|
|
+
|
|
|
+ def __exit__(self, type, value, traceback):
|
|
|
+ os.chmod(self.file_name, self.orig_mode)
|
|
|
|
|
|
class TestSecureHTTPRequestHandler(unittest.TestCase):
|
|
|
def setUp(self):
|
|
@@ -97,7 +109,7 @@ class TestSecureHTTPRequestHandler(unittest.TestCase):
|
|
|
self.handler.path = '/abc'
|
|
|
mod, cmd = self.handler._parse_request_path()
|
|
|
self.assertTrue((mod == 'abc') and (cmd == None))
|
|
|
-
|
|
|
+
|
|
|
self.handler.path = '/abc/edf'
|
|
|
mod, cmd = self.handler._parse_request_path()
|
|
|
self.assertTrue((mod == 'abc') and (cmd == 'edf'))
|
|
@@ -125,20 +137,20 @@ class TestSecureHTTPRequestHandler(unittest.TestCase):
|
|
|
|
|
|
def test_do_GET(self):
|
|
|
self.handler.do_GET()
|
|
|
- self.assertEqual(self.handler.rcode, http.client.BAD_REQUEST)
|
|
|
-
|
|
|
+ self.assertEqual(self.handler.rcode, http.client.BAD_REQUEST)
|
|
|
+
|
|
|
def test_do_GET_1(self):
|
|
|
self.handler.headers['cookie'] = 12345
|
|
|
self.handler.do_GET()
|
|
|
- self.assertEqual(self.handler.rcode, http.client.UNAUTHORIZED)
|
|
|
+ self.assertEqual(self.handler.rcode, http.client.UNAUTHORIZED)
|
|
|
|
|
|
def test_do_GET_2(self):
|
|
|
self.handler.headers['cookie'] = 12345
|
|
|
self.handler.server.user_sessions[12345] = time.time() + 1000000
|
|
|
self.handler.path = '/how/are'
|
|
|
self.handler.do_GET()
|
|
|
- self.assertEqual(self.handler.rcode, http.client.NO_CONTENT)
|
|
|
-
|
|
|
+ self.assertEqual(self.handler.rcode, http.client.NO_CONTENT)
|
|
|
+
|
|
|
def test_do_GET_3(self):
|
|
|
self.handler.headers['cookie'] = 12346
|
|
|
self.handler.server.user_sessions[12346] = time.time() + 1000000
|
|
@@ -146,8 +158,8 @@ class TestSecureHTTPRequestHandler(unittest.TestCase):
|
|
|
for path in path_vec:
|
|
|
self.handler.path = '/' + path
|
|
|
self.handler.do_GET()
|
|
|
- self.assertEqual(self.handler.rcode, http.client.OK)
|
|
|
-
|
|
|
+ self.assertEqual(self.handler.rcode, http.client.OK)
|
|
|
+
|
|
|
def test_user_logged_in(self):
|
|
|
self.handler.server.user_sessions = {}
|
|
|
self.handler.session_id = 12345
|
|
@@ -243,8 +255,8 @@ class TestSecureHTTPRequestHandler(unittest.TestCase):
|
|
|
self.assertEqual(http.client.BAD_REQUEST, rcode)
|
|
|
|
|
|
def _gen_module_spec(self):
|
|
|
- spec = { 'commands': [
|
|
|
- { 'command_name' :'command',
|
|
|
+ spec = { 'commands': [
|
|
|
+ { 'command_name' :'command',
|
|
|
'command_args': [ {
|
|
|
'item_name' : 'param1',
|
|
|
'item_type' : 'integer',
|
|
@@ -253,9 +265,9 @@ class TestSecureHTTPRequestHandler(unittest.TestCase):
|
|
|
} ],
|
|
|
'command_description' : 'cmd description'
|
|
|
}
|
|
|
- ]
|
|
|
+ ]
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
return spec
|
|
|
|
|
|
def test_handle_post_request_2(self):
|
|
@@ -296,7 +308,7 @@ class MyCommandControl(CommandControl):
|
|
|
self._module_name = 'Cmdctl'
|
|
|
self._cmdctl_config_data = config.get_full_config()
|
|
|
|
|
|
- def _handle_msg_from_msgq(self):
|
|
|
+ def _handle_msg_from_msgq(self):
|
|
|
pass
|
|
|
|
|
|
class TestCommandControl(unittest.TestCase):
|
|
@@ -305,7 +317,7 @@ class TestCommandControl(unittest.TestCase):
|
|
|
self.old_stdout = sys.stdout
|
|
|
sys.stdout = open(os.devnull, 'w')
|
|
|
self.cmdctl = MyCommandControl(None, True)
|
|
|
-
|
|
|
+
|
|
|
def tearDown(self):
|
|
|
sys.stdout.close()
|
|
|
sys.stdout = self.old_stdout
|
|
@@ -320,7 +332,7 @@ class TestCommandControl(unittest.TestCase):
|
|
|
old_env = os.environ
|
|
|
if 'B10_FROM_SOURCE' in os.environ:
|
|
|
del os.environ['B10_FROM_SOURCE']
|
|
|
- self.cmdctl.get_cmdctl_config_data()
|
|
|
+ self.cmdctl.get_cmdctl_config_data()
|
|
|
self._check_config(self.cmdctl)
|
|
|
os.environ = old_env
|
|
|
|
|
@@ -328,7 +340,7 @@ class TestCommandControl(unittest.TestCase):
|
|
|
os.environ['B10_FROM_SOURCE'] = '../'
|
|
|
self._check_config(self.cmdctl)
|
|
|
os.environ = old_env
|
|
|
-
|
|
|
+
|
|
|
def test_parse_command_result(self):
|
|
|
self.assertEqual({}, self.cmdctl._parse_command_result(1, {'error' : 1}))
|
|
|
self.assertEqual({'a': 1}, self.cmdctl._parse_command_result(0, {'a' : 1}))
|
|
@@ -391,13 +403,13 @@ class TestCommandControl(unittest.TestCase):
|
|
|
os.environ = old_env
|
|
|
|
|
|
answer = self.cmdctl.config_handler({'key_file': '/user/non-exist_folder'})
|
|
|
- self._check_answer(answer, 1, "the file doesn't exist: /user/non-exist_folder")
|
|
|
+ self._check_answer(answer, 1, "'/user/non-exist_folder' does not exist")
|
|
|
|
|
|
answer = self.cmdctl.config_handler({'cert_file': '/user/non-exist_folder'})
|
|
|
- self._check_answer(answer, 1, "the file doesn't exist: /user/non-exist_folder")
|
|
|
+ self._check_answer(answer, 1, "'/user/non-exist_folder' does not exist")
|
|
|
|
|
|
answer = self.cmdctl.config_handler({'accounts_file': '/user/non-exist_folder'})
|
|
|
- self._check_answer(answer, 1,
|
|
|
+ self._check_answer(answer, 1,
|
|
|
"Invalid accounts file: [Errno 2] No such file or directory: '/user/non-exist_folder'")
|
|
|
|
|
|
# Test with invalid accounts file
|
|
@@ -409,7 +421,7 @@ class TestCommandControl(unittest.TestCase):
|
|
|
answer = self.cmdctl.config_handler({'accounts_file': file_name})
|
|
|
self._check_answer(answer, 1, "Invalid accounts file: list index out of range")
|
|
|
os.remove(file_name)
|
|
|
-
|
|
|
+
|
|
|
def test_send_command(self):
|
|
|
rcode, value = self.cmdctl.send_command('Cmdctl', 'print_settings', None)
|
|
|
self.assertEqual(rcode, 0)
|
|
@@ -424,7 +436,7 @@ class TestSecureHTTPServer(unittest.TestCase):
|
|
|
self.old_stderr = sys.stderr
|
|
|
sys.stdout = open(os.devnull, 'w')
|
|
|
sys.stderr = sys.stdout
|
|
|
- self.server = MySecureHTTPServer(('localhost', 8080),
|
|
|
+ self.server = MySecureHTTPServer(('localhost', 8080),
|
|
|
MySecureHTTPRequestHandler,
|
|
|
MyCommandControl, verbose=True)
|
|
|
|
|
@@ -458,32 +470,67 @@ class TestSecureHTTPServer(unittest.TestCase):
|
|
|
self.assertEqual(1, len(self.server._user_infos))
|
|
|
self.assertTrue('root' in self.server._user_infos)
|
|
|
|
|
|
+ def test_check_file(self):
|
|
|
+ # Just some file that we know exists
|
|
|
+ file_name = SRC_FILE_PATH + 'cmdctl-keyfile.pem'
|
|
|
+ check_file(file_name)
|
|
|
+ with UnreadableFile(file_name):
|
|
|
+ self.assertRaises(CmdctlException, check_file, file_name)
|
|
|
+ self.assertRaises(CmdctlException, check_file, '/local/not-exist')
|
|
|
+ self.assertRaises(CmdctlException, check_file, '/')
|
|
|
+
|
|
|
+
|
|
|
def test_check_key_and_cert(self):
|
|
|
+ keyfile = SRC_FILE_PATH + 'cmdctl-keyfile.pem'
|
|
|
+ certfile = SRC_FILE_PATH + 'cmdctl-certfile.pem'
|
|
|
+
|
|
|
+ # no exists
|
|
|
+ self.assertRaises(CmdctlException, self.server._check_key_and_cert,
|
|
|
+ keyfile, '/local/not-exist')
|
|
|
self.assertRaises(CmdctlException, self.server._check_key_and_cert,
|
|
|
- '/local/not-exist', 'cmdctl-keyfile.pem')
|
|
|
+ '/local/not-exist', certfile)
|
|
|
|
|
|
- self.server._check_key_and_cert(SRC_FILE_PATH + 'cmdctl-keyfile.pem',
|
|
|
- SRC_FILE_PATH + 'cmdctl-certfile.pem')
|
|
|
+ # not a file
|
|
|
+ self.assertRaises(CmdctlException, self.server._check_key_and_cert,
|
|
|
+ keyfile, '/')
|
|
|
+ self.assertRaises(CmdctlException, self.server._check_key_and_cert,
|
|
|
+ '/', certfile)
|
|
|
+
|
|
|
+ # no read permission
|
|
|
+ with UnreadableFile(certfile):
|
|
|
+ self.assertRaises(CmdctlException,
|
|
|
+ self.server._check_key_and_cert,
|
|
|
+ keyfile, certfile)
|
|
|
+
|
|
|
+ with UnreadableFile(keyfile):
|
|
|
+ self.assertRaises(CmdctlException,
|
|
|
+ self.server._check_key_and_cert,
|
|
|
+ keyfile, certfile)
|
|
|
+
|
|
|
+ # All OK (also happens to check the context code above works)
|
|
|
+ self.server._check_key_and_cert(keyfile, certfile)
|
|
|
|
|
|
def test_wrap_sock_in_ssl_context(self):
|
|
|
sock = socket.socket()
|
|
|
- self.assertRaises(socket.error,
|
|
|
+
|
|
|
+ # Test exception is Fatal here (all specific cases are tested
|
|
|
+ # in test_check_key_and_cert())
|
|
|
+ self.assertRaises(CmdctlFatalException,
|
|
|
self.server._wrap_socket_in_ssl_context,
|
|
|
- sock,
|
|
|
- '../cmdctl-keyfile',
|
|
|
- '../cmdctl-certfile')
|
|
|
+ sock,
|
|
|
+ 'no_such_file', 'no_such_file')
|
|
|
|
|
|
sock1 = socket.socket()
|
|
|
- self.server._wrap_socket_in_ssl_context(sock1,
|
|
|
+ self.server._wrap_socket_in_ssl_context(sock1,
|
|
|
SRC_FILE_PATH + 'cmdctl-keyfile.pem',
|
|
|
SRC_FILE_PATH + 'cmdctl-certfile.pem')
|
|
|
|
|
|
class TestFuncNotInClass(unittest.TestCase):
|
|
|
def test_check_port(self):
|
|
|
- self.assertRaises(OptionValueError, check_port, None, 'port', -1, None)
|
|
|
- self.assertRaises(OptionValueError, check_port, None, 'port', 65536, None)
|
|
|
- self.assertRaises(OptionValueError, check_addr, None, 'ipstr', 'a.b.d', None)
|
|
|
- self.assertRaises(OptionValueError, check_addr, None, 'ipstr', '1::0:a.b', None)
|
|
|
+ self.assertRaises(OptionValueError, check_port, None, 'port', -1, None)
|
|
|
+ self.assertRaises(OptionValueError, check_port, None, 'port', 65536, None)
|
|
|
+ self.assertRaises(OptionValueError, check_addr, None, 'ipstr', 'a.b.d', None)
|
|
|
+ self.assertRaises(OptionValueError, check_addr, None, 'ipstr', '1::0:a.b', None)
|
|
|
|
|
|
|
|
|
if __name__== "__main__":
|