cmdctl_test.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790
  1. # Copyright (C) 2009 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import unittest
  16. import ssl, socket
  17. import tempfile
  18. import time
  19. import stat
  20. import sys
  21. from cmdctl import *
  22. import isc.log
  23. assert 'CMDCTL_SRC_PATH' in os.environ,\
  24. "Please run this test with 'make check'"
  25. SRC_FILE_PATH = os.environ['CMDCTL_SRC_PATH'] + os.sep
  26. assert 'CMDCTL_BUILD_PATH' in os.environ,\
  27. "Please run this test with 'make check'"
  28. BUILD_FILE_PATH = os.environ['CMDCTL_BUILD_PATH'] + os.sep
  29. # Rewrite the class for unittest.
  30. class MySecureHTTPRequestHandler(SecureHTTPRequestHandler):
  31. def __init__(self):
  32. self.session_id = None
  33. def send_response(self, rcode):
  34. self.rcode = rcode
  35. def end_headers(self):
  36. pass
  37. class FakeSecureHTTPServer(SecureHTTPServer):
  38. def __init__(self):
  39. self.user_sessions = {}
  40. self.cmdctl = FakeCommandControlForTestRequestHandler()
  41. self._verbose = True
  42. self._user_infos = {}
  43. self.idle_timeout = 1200
  44. self._lock = threading.Lock()
  45. class FakeCommandControlForTestRequestHandler(CommandControl):
  46. def __init__(self):
  47. self._config_data = {}
  48. self.modules_spec = {}
  49. self._lock = threading.Lock()
  50. def send_command(self, mod, cmd, param):
  51. return 0, {}
  52. # context to temporarily make a file unreadable
  53. class UnreadableFile:
  54. def __init__(self, file_name):
  55. self.file_name = file_name
  56. self.orig_mode = os.stat(file_name).st_mode
  57. def __enter__(self):
  58. os.chmod(self.file_name, self.orig_mode & ~stat.S_IRUSR)
  59. def __exit__(self, type, value, traceback):
  60. os.chmod(self.file_name, self.orig_mode)
  61. class TmpTextFile:
  62. """
  63. Context class for temporarily creating a text file with some
  64. lines of content.
  65. The file is automatically deleted if the context is left, so
  66. make sure to not use the path of an existing file!
  67. """
  68. def __init__(self, path, contents):
  69. self.__path = path
  70. self.__contents = contents
  71. def __enter__(self):
  72. with open(self.__path, 'w') as f:
  73. f.write("\n".join(self.__contents) + "\n")
  74. def __exit__(self, type, value, traceback):
  75. os.unlink(self.__path)
  76. class TestSecureHTTPRequestHandler(unittest.TestCase):
  77. def setUp(self):
  78. self.old_stdout = sys.stdout
  79. sys.stdout = open(os.devnull, 'w')
  80. self.handler = MySecureHTTPRequestHandler()
  81. self.handler.server = FakeSecureHTTPServer()
  82. self.handler.server.user_sessions = {}
  83. self.handler.server._user_infos = {}
  84. self.handler.headers = {}
  85. self.handler.rfile = open('input.tmp', 'w+b')
  86. self.handler.wfile = open('output.tmp', 'w+b')
  87. def tearDown(self):
  88. sys.stdout.close()
  89. sys.stdout = self.old_stdout
  90. self.handler.wfile.close()
  91. os.remove('output.tmp')
  92. self.handler.rfile.close()
  93. os.remove('input.tmp')
  94. def test_is_session_valid(self):
  95. self.assertIsNone(self.handler.session_id)
  96. self.assertFalse(self.handler._is_session_valid())
  97. self.handler.session_id = 4234
  98. self.assertTrue(self.handler._is_session_valid())
  99. def test_parse_request_path(self):
  100. self.handler.path = ''
  101. mod, cmd = self.handler._parse_request_path()
  102. self.assertTrue((mod == None) and (cmd == None))
  103. self.handler.path = '/abc'
  104. mod, cmd = self.handler._parse_request_path()
  105. self.assertTrue((mod == 'abc') and (cmd == None))
  106. self.handler.path = '/abc/edf'
  107. mod, cmd = self.handler._parse_request_path()
  108. self.assertTrue((mod == 'abc') and (cmd == 'edf'))
  109. self.handler.path = '/abc/edf/ghi'
  110. mod, cmd = self.handler._parse_request_path()
  111. self.assertTrue((mod == 'abc') and (cmd == 'edf'))
  112. def test_parse_request_path_1(self):
  113. self.handler.path = '/ab*c'
  114. mod, cmd = self.handler._parse_request_path()
  115. self.assertTrue((mod == 'ab') and cmd == None)
  116. self.handler.path = '/abc/ed*fdd/ddd'
  117. mod, cmd = self.handler._parse_request_path()
  118. self.assertTrue((mod == 'abc') and cmd == 'ed')
  119. self.handler.path = '/-*/edfdd/ddd'
  120. mod, cmd = self.handler._parse_request_path()
  121. self.assertTrue((mod == None) and (cmd == None))
  122. self.handler.path = '/-*/edfdd/ddd'
  123. mod, cmd = self.handler._parse_request_path()
  124. self.assertTrue((mod == None) and (cmd == None))
  125. def test_do_GET(self):
  126. self.handler.do_GET()
  127. self.assertEqual(self.handler.rcode, http.client.BAD_REQUEST)
  128. def test_do_GET_1(self):
  129. self.handler.headers['cookie'] = 12345
  130. self.handler.do_GET()
  131. self.assertEqual(self.handler.rcode, http.client.UNAUTHORIZED)
  132. def test_do_GET_2(self):
  133. self.handler.headers['cookie'] = 12345
  134. self.handler.server.user_sessions[12345] = time.time() + 1000000
  135. self.handler.path = '/how/are'
  136. self.handler.do_GET()
  137. self.assertEqual(self.handler.rcode, http.client.NO_CONTENT)
  138. def test_do_GET_3(self):
  139. self.handler.headers['cookie'] = 12346
  140. self.handler.server.user_sessions[12346] = time.time() + 1000000
  141. path_vec = ['config_data', 'module_spec']
  142. for path in path_vec:
  143. self.handler.path = '/' + path
  144. self.handler.do_GET()
  145. self.assertEqual(self.handler.rcode, http.client.OK)
  146. def test_is_user_logged_in(self):
  147. self.handler.server.user_sessions = {}
  148. self.handler.session_id = 12345
  149. self.assertTrue(self.handler._is_user_logged_in() == False)
  150. self.handler.server.user_sessions[12345] = time.time()
  151. self.assertTrue(self.handler._is_user_logged_in())
  152. self.handler.server.user_sessions[12345] = time.time() - 1500
  153. self.handler.idle_timeout = 1200
  154. self.assertTrue(self.handler._is_user_logged_in() == False)
  155. def test_check_user_name_and_pwd(self):
  156. self.handler.headers = {}
  157. ret, msg = self.handler._check_user_name_and_pwd()
  158. self.assertFalse(ret)
  159. self.assertEqual(msg, ['invalid username or password'])
  160. def test_check_user_name_and_pwd_1(self):
  161. user_info = {'username':'root', 'password':'abc123'}
  162. len = self.handler.rfile.write(json.dumps(user_info).encode())
  163. self.handler.headers['Content-Length'] = len
  164. self.handler.rfile.seek(0, 0)
  165. self.handler.server._user_infos['root'] = ['aa', 'aaa']
  166. ret, msg = self.handler._check_user_name_and_pwd()
  167. self.assertFalse(ret)
  168. self.assertEqual(msg, ['username or password error'])
  169. def test_check_user_name_and_pwd_2(self):
  170. user_info = {'username':'root', 'password':'abc123'}
  171. len = self.handler.rfile.write(json.dumps(user_info).encode())
  172. self.handler.headers['Content-Length'] = len - 1
  173. self.handler.rfile.seek(0, 0)
  174. ret, msg = self.handler._check_user_name_and_pwd()
  175. self.assertFalse(ret)
  176. self.assertEqual(msg, ['invalid username or password'])
  177. def test_check_user_name_and_pwd_3(self):
  178. user_info = {'usernae':'root', 'password':'abc123'}
  179. len = self.handler.rfile.write(json.dumps(user_info).encode())
  180. self.handler.headers['Content-Length'] = len
  181. self.handler.rfile.seek(0, 0)
  182. ret, msg = self.handler._check_user_name_and_pwd()
  183. self.assertFalse(ret)
  184. self.assertEqual(msg, ['need user name'])
  185. def test_check_user_name_and_pwd_4(self):
  186. user_info = {'username':'root', 'pssword':'abc123'}
  187. len = self.handler.rfile.write(json.dumps(user_info).encode())
  188. self.handler.headers['Content-Length'] = len
  189. self.handler.rfile.seek(0, 0)
  190. self.handler.server._user_infos['root'] = ['aa', 'aaa']
  191. ret, msg = self.handler._check_user_name_and_pwd()
  192. self.assertFalse(ret)
  193. self.assertEqual(msg, ['need password'])
  194. def test_check_user_name_and_pwd_5(self):
  195. user_info = {'username':'root', 'password':'abc123'}
  196. len = self.handler.rfile.write(json.dumps(user_info).encode())
  197. self.handler.headers['Content-Length'] = len
  198. self.handler.rfile.seek(0, 0)
  199. ret, msg = self.handler._check_user_name_and_pwd()
  200. self.assertFalse(ret)
  201. self.assertEqual(msg, ['username or password error'])
  202. def test_do_POST(self):
  203. self.handler.headers = {}
  204. self.handler.do_POST()
  205. self.assertEqual(self.handler.rcode, http.client.BAD_REQUEST)
  206. def test_do_POST_1(self):
  207. self.handler.headers = {}
  208. self.handler.headers['cookie'] = 12345
  209. self.handler.path = '/'
  210. self.handler.do_POST()
  211. self.assertEqual(self.handler.rcode, http.client.UNAUTHORIZED)
  212. def test_handle_post_request(self):
  213. self.handler.path = '/cfgmgr/revert'
  214. self.handler.headers = {}
  215. rcode, reply = self.handler._handle_post_request()
  216. self.assertEqual(http.client.BAD_REQUEST, rcode)
  217. def test_handle_post_request_1(self):
  218. self.handler.path = '/*d/revert'
  219. self.handler.headers = {}
  220. rcode, reply = self.handler._handle_post_request()
  221. self.assertEqual(http.client.BAD_REQUEST, rcode)
  222. def _gen_module_spec(self):
  223. spec = { 'commands': [
  224. { 'command_name' :'command',
  225. 'command_args': [ {
  226. 'item_name' : 'param1',
  227. 'item_type' : 'integer',
  228. 'item_optional' : False,
  229. 'item_default' : 0
  230. } ],
  231. 'command_description' : 'cmd description'
  232. }
  233. ]
  234. }
  235. return spec
  236. def test_handle_post_request_2(self):
  237. params = {'param1':123}
  238. len = self.handler.rfile.write(json.dumps(params).encode())
  239. self.handler.headers['Content-Length'] = len
  240. self.handler.rfile.seek(0, 0)
  241. self.handler.path = '/module/command'
  242. self.handler.server.cmdctl.modules_spec = {}
  243. self.handler.server.cmdctl.modules_spec['module'] = self._gen_module_spec()
  244. rcode, reply = self.handler._handle_post_request()
  245. self.assertEqual(http.client.OK, rcode)
  246. def test_handle_post_request_3(self):
  247. params = {'param1':'abc'}
  248. len = self.handler.rfile.write(json.dumps(params).encode())
  249. self.handler.headers['Content-Length'] = len
  250. self.handler.rfile.seek(0, 0)
  251. self.handler.path = '/module/command'
  252. self.handler.server.cmdctl.modules_spec = {}
  253. self.handler.server.cmdctl.modules_spec['module'] = self._gen_module_spec()
  254. rcode, reply = self.handler._handle_post_request()
  255. self.assertEqual(http.client.BAD_REQUEST, rcode)
  256. def test_handle_login(self):
  257. orig_is_user_logged_in = self.handler._is_user_logged_in
  258. orig_check_user_name_and_pwd = self.handler._check_user_name_and_pwd
  259. try:
  260. def create_is_user_logged_in(status):
  261. '''Create a replacement _is_user_logged_in() method.'''
  262. def my_is_user_logged_in():
  263. return status
  264. return my_is_user_logged_in
  265. # Check case where _is_user_logged_in() returns True
  266. self.handler._is_user_logged_in = create_is_user_logged_in(True)
  267. self.handler.headers['cookie'] = 12345
  268. self.handler.path = '/login'
  269. self.handler.do_POST()
  270. self.assertEqual(self.handler.rcode, http.client.OK)
  271. self.handler.wfile.seek(0, 0)
  272. d = self.handler.wfile.read()
  273. self.assertEqual(json.loads(d.decode()),
  274. ['user has already login'])
  275. # Clear the output
  276. self.handler.wfile.seek(0, 0)
  277. self.handler.wfile.truncate()
  278. # Check case where _is_user_logged_in() returns False
  279. self.handler._is_user_logged_in = create_is_user_logged_in(False)
  280. def create_check_user_name_and_pwd(status, error_info=None):
  281. '''Create a replacement _check_user_name_and_pwd() method.'''
  282. def my_check_user_name_and_pwd():
  283. return status, error_info
  284. return my_check_user_name_and_pwd
  285. # (a) Check case where _check_user_name_and_pwd() returns
  286. # valid user status
  287. self.handler._check_user_name_and_pwd = \
  288. create_check_user_name_and_pwd(True)
  289. self.handler.do_POST()
  290. self.assertEqual(self.handler.rcode, http.client.OK)
  291. self.handler.wfile.seek(0, 0)
  292. d = self.handler.wfile.read()
  293. self.assertEqual(json.loads(d.decode()), ['login success'])
  294. # Clear the output
  295. self.handler.wfile.seek(0, 0)
  296. self.handler.wfile.truncate()
  297. # (b) Check case where _check_user_name_and_pwd() returns
  298. # invalid user status
  299. self.handler._check_user_name_and_pwd = \
  300. create_check_user_name_and_pwd(False, ['login failed'])
  301. self.handler.do_POST()
  302. self.assertEqual(self.handler.rcode, http.client.UNAUTHORIZED)
  303. self.handler.wfile.seek(0, 0)
  304. d = self.handler.wfile.read()
  305. self.assertEqual(json.loads(d.decode()), ['login failed'])
  306. finally:
  307. self.handler._is_user_logged_in = orig_is_user_logged_in
  308. self.handler._check_user_name_and_pwd = orig_check_user_name_and_pwd
  309. class MockSession:
  310. """Act like isc.cc.Session, stealing group_sendmsg/recvmsg().
  311. The initial simple version only records given parameters in
  312. group_sendmsg() for later inspection and raise a timeout exception
  313. from recvmsg(). As we see the need for more test cases these methods
  314. should be extended.
  315. """
  316. def __init__(self, sent_messages):
  317. self.__sent_messages = sent_messages
  318. def group_sendmsg(self, msg, module_name, want_answer):
  319. self.__sent_messages.append((msg, module_name))
  320. def group_recvmsg(self, nonblock, seq):
  321. raise isc.cc.session.SessionTimeout('dummy timeout')
  322. class MyCommandControl(CommandControl):
  323. def __init__(self, httpserver, verbose):
  324. super().__init__(httpserver, verbose)
  325. self.sent_messages = [] # for inspection; allow tests to see it
  326. self._cc = MockSession(self.sent_messages)
  327. def _get_modules_specification(self):
  328. return {}
  329. def _get_config_data_from_config_manager(self):
  330. return {}
  331. def _setup_session(self):
  332. spec_file = BUILD_FILE_PATH + 'cmdctl.spec'
  333. module_spec = isc.config.module_spec_from_file(spec_file)
  334. config = isc.config.config_data.ConfigData(module_spec)
  335. self._module_name = 'Cmdctl'
  336. self._cmdctl_config_data = config.get_full_config()
  337. def _handle_msg_from_msgq(self):
  338. pass
  339. def _start_msg_handle_thread(self): # just not bother to be threads
  340. pass
  341. def _get_current_thread(self):
  342. return None
  343. class TestCommandControl(unittest.TestCase):
  344. def setUp(self):
  345. self.old_stdout = sys.stdout
  346. sys.stdout = open(os.devnull, 'w')
  347. self.cmdctl = MyCommandControl(None, True)
  348. def tearDown(self):
  349. sys.stdout.close()
  350. sys.stdout = self.old_stdout
  351. def _check_config(self, cmdctl):
  352. key, cert, account = cmdctl.get_cmdctl_config_data()
  353. self.assertIsNotNone(key)
  354. self.assertIsNotNone(cert)
  355. self.assertIsNotNone(account)
  356. def test_get_cmdctl_config_data(self):
  357. old_env = os.environ
  358. if 'B10_FROM_SOURCE' in os.environ:
  359. del os.environ['B10_FROM_SOURCE']
  360. self.cmdctl.get_cmdctl_config_data()
  361. self._check_config(self.cmdctl)
  362. os.environ = old_env
  363. old_env = os.environ
  364. os.environ['B10_FROM_SOURCE'] = '../'
  365. self._check_config(self.cmdctl)
  366. os.environ = old_env
  367. def test_parse_command_result(self):
  368. self.assertEqual({}, self.cmdctl._parse_command_result(1, {'error' : 1}))
  369. self.assertEqual({'a': 1}, self.cmdctl._parse_command_result(0, {'a' : 1}))
  370. def _check_answer(self, answer, rcode_, msg_):
  371. rcode, msg = ccsession.parse_answer(answer)
  372. self.assertEqual(rcode, rcode_)
  373. self.assertEqual(msg, msg_)
  374. def test_command_handler(self):
  375. answer = self.cmdctl.command_handler('unknown-command', None)
  376. self._check_answer(answer, 1, 'unknown command: unknown-command')
  377. # Send a real command. Mock stuff so the shutdown command doesn't
  378. # cause an exception.
  379. class ModuleCC:
  380. def send_stopping():
  381. pass
  382. self.cmdctl._module_cc = ModuleCC
  383. called = []
  384. class Server:
  385. def shutdown():
  386. called.append('shutdown')
  387. self.cmdctl._httpserver = Server
  388. answer = self.cmdctl.command_handler('shutdown', None)
  389. rcode, msg = ccsession.parse_answer(answer)
  390. self.assertEqual(rcode, 0)
  391. self.assertIsNone(msg)
  392. self.assertEqual(['shutdown'], called)
  393. def test_command_handler_spec_update(self):
  394. # Should not be present
  395. self.assertFalse("foo" in self.cmdctl.modules_spec)
  396. answer = self.cmdctl.command_handler(
  397. ccsession.COMMAND_MODULE_SPECIFICATION_UPDATE, [ "foo", {} ])
  398. rcode, msg = ccsession.parse_answer(answer)
  399. self.assertEqual(rcode, 0)
  400. self.assertEqual(msg, None)
  401. # Should now be present
  402. self.assertTrue("foo" in self.cmdctl.modules_spec)
  403. # When sending specification 'None', it should be removed
  404. answer = self.cmdctl.command_handler(
  405. ccsession.COMMAND_MODULE_SPECIFICATION_UPDATE, [ "foo", None ])
  406. rcode, msg = ccsession.parse_answer(answer)
  407. self.assertEqual(rcode, 0)
  408. self.assertEqual(msg, None)
  409. # Should no longer be present
  410. self.assertFalse("foo" in self.cmdctl.modules_spec)
  411. # Don't store 'None' if it wasn't there in the first place!
  412. answer = self.cmdctl.command_handler(
  413. ccsession.COMMAND_MODULE_SPECIFICATION_UPDATE, [ "foo", None ])
  414. rcode, msg = ccsession.parse_answer(answer)
  415. self.assertEqual(rcode, 1)
  416. self.assertEqual(msg, "No such module: foo")
  417. # Should still not present
  418. self.assertFalse("foo" in self.cmdctl.modules_spec)
  419. def test_check_config_handler(self):
  420. answer = self.cmdctl.config_handler({'non-exist': 123})
  421. self._check_answer(answer, 1, 'unknown config item: non-exist')
  422. old_env = os.environ
  423. os.environ['B10_FROM_SOURCE'] = '../'
  424. self._check_config(self.cmdctl)
  425. os.environ = old_env
  426. answer = self.cmdctl.config_handler({'key_file': '/user/non-exist_folder'})
  427. self._check_answer(answer, 1, "'/user/non-exist_folder' does not exist")
  428. answer = self.cmdctl.config_handler({'cert_file': '/user/non-exist_folder'})
  429. self._check_answer(answer, 1, "'/user/non-exist_folder' does not exist")
  430. answer = self.cmdctl.config_handler({'accounts_file': '/user/non-exist_folder'})
  431. self._check_answer(answer, 1,
  432. "Invalid accounts file: [Errno 2] No such file or directory: '/user/non-exist_folder'")
  433. # Test with invalid accounts file
  434. file_name = 'tmp.account.file'
  435. temp_file = open(file_name, 'w')
  436. writer = csv.writer(temp_file)
  437. writer.writerow(['a', 'b'])
  438. temp_file.close()
  439. answer = self.cmdctl.config_handler({'accounts_file': file_name})
  440. self._check_answer(answer, 1, "Invalid accounts file: list index out of range")
  441. os.remove(file_name)
  442. def test_send_command(self):
  443. # Send a command to other module. We check an expected message
  444. # is sent via the session (cmdct._cc). Due to the behavior of
  445. # our mock session object the anser will be "fail", but it's not
  446. # the subject of this test, and so it's okay.
  447. # TODO: more detailed cases should be tested.
  448. rcode, value = self.cmdctl.send_command('Init', 'shutdown', None)
  449. self.assertEqual(1, len(self.cmdctl.sent_messages))
  450. self.assertEqual(({'command': ['shutdown']}, 'Init'),
  451. self.cmdctl.sent_messages[-1])
  452. self.assertEqual(1, rcode)
  453. # Send a command to cmdctl itself. Should be the same effect.
  454. rcode, value = self.cmdctl.send_command('Cmdctl', 'shutdown',
  455. None)
  456. self.assertEqual(2, len(self.cmdctl.sent_messages))
  457. self.assertEqual(({'command': ['shutdown']}, 'Cmdctl'),
  458. self.cmdctl.sent_messages[-1])
  459. self.assertEqual(1, rcode)
  460. class MySecureHTTPServer(SecureHTTPServer):
  461. def server_bind(self):
  462. pass
  463. class TestSecureHTTPServer(unittest.TestCase):
  464. def setUp(self):
  465. self.old_stdout = sys.stdout
  466. self.old_stderr = sys.stderr
  467. sys.stdout = open(os.devnull, 'w')
  468. sys.stderr = sys.stdout
  469. self.server = MySecureHTTPServer(('localhost', 8080),
  470. MySecureHTTPRequestHandler,
  471. MyCommandControl, verbose=True)
  472. def tearDown(self):
  473. # both sys.stdout and sys.stderr are the same, so closing one is
  474. # sufficient
  475. sys.stdout.close()
  476. sys.stdout = self.old_stdout
  477. sys.stderr = self.old_stderr
  478. def test_addr_in_use(self):
  479. server_one = None
  480. try:
  481. server_one = SecureHTTPServer(('localhost', 53531),
  482. MySecureHTTPRequestHandler,
  483. MyCommandControl)
  484. except CmdctlException:
  485. pass
  486. else:
  487. self.assertRaises(CmdctlException, SecureHTTPServer,
  488. ('localhost', 53531),
  489. MySecureHTTPRequestHandler, MyCommandControl)
  490. if server_one:
  491. server_one.server_close()
  492. def test_create_user_info(self):
  493. self.server._create_user_info('/local/not-exist')
  494. self.assertEqual(0, len(self.server._user_infos))
  495. self.server._create_user_info(SRC_FILE_PATH + 'cmdctl-accounts.csv')
  496. self.assertEqual(1, len(self.server._user_infos))
  497. self.assertTrue('root' in self.server._user_infos)
  498. def test_get_user_info(self):
  499. self.assertIsNone(self.server.get_user_info('root'))
  500. self.server._create_user_info(SRC_FILE_PATH + 'cmdctl-accounts.csv')
  501. self.assertIn('6f0c73bd33101a5ec0294b3ca39fec90ef4717fe',
  502. self.server.get_user_info('root'))
  503. # When the file is not changed calling _create_user_info() again
  504. # should have no effect. In order to test this, we overwrite the
  505. # user-infos that were just set and make sure it isn't touched by
  506. # the call (so make sure it isn't set to some empty value)
  507. fake_users_val = { 'notinfile': [] }
  508. self.server._user_infos = fake_users_val
  509. self.server._create_user_info(SRC_FILE_PATH + 'cmdctl-accounts.csv')
  510. self.assertEqual(fake_users_val, self.server._user_infos)
  511. def test_create_user_info_changing_file_time(self):
  512. self.assertEqual(0, len(self.server._user_infos))
  513. # Create a file
  514. accounts_file = BUILD_FILE_PATH + 'new_file.csv'
  515. with TmpTextFile(accounts_file, ['root,foo,bar']):
  516. self.server._create_user_info(accounts_file)
  517. self.assertEqual(1, len(self.server._user_infos))
  518. self.assertTrue('root' in self.server._user_infos)
  519. # Make sure re-reading is a noop if file was not modified
  520. fake_users_val = { 'notinfile': [] }
  521. self.server._user_infos = fake_users_val
  522. self.server._create_user_info(accounts_file)
  523. self.assertEqual(fake_users_val, self.server._user_infos)
  524. # create the file again, this time read should not be a noop
  525. with TmpTextFile(accounts_file, ['otherroot,foo,bar']):
  526. # Set mtime in future
  527. stat = os.stat(accounts_file)
  528. os.utime(accounts_file, (stat.st_atime, stat.st_mtime + 10))
  529. self.server._create_user_info(accounts_file)
  530. self.assertEqual(1, len(self.server._user_infos))
  531. self.assertTrue('otherroot' in self.server._user_infos)
  532. def test_create_user_info_changing_file_name(self):
  533. """
  534. Check that the accounts file is re-read if the file name is different
  535. """
  536. self.assertEqual(0, len(self.server._user_infos))
  537. # Create two files
  538. accounts_file1 = BUILD_FILE_PATH + 'new_file.csv'
  539. accounts_file2 = BUILD_FILE_PATH + 'new_file2.csv'
  540. with TmpTextFile(accounts_file2, ['otherroot,foo,bar']):
  541. with TmpTextFile(accounts_file1, ['root,foo,bar']):
  542. self.server._create_user_info(accounts_file1)
  543. self.assertEqual(1, len(self.server._user_infos))
  544. self.assertTrue('root' in self.server._user_infos)
  545. # Make sure re-reading is a noop if file was not modified
  546. fake_users_val = { 'notinfile': [] }
  547. self.server._user_infos = fake_users_val
  548. self.server._create_user_info(accounts_file1)
  549. self.assertEqual(fake_users_val, self.server._user_infos)
  550. # But a different file should be read
  551. self.server._create_user_info(accounts_file2)
  552. self.assertEqual(1, len(self.server._user_infos))
  553. self.assertTrue('otherroot' in self.server._user_infos)
  554. def test_create_user_info_nonexistent_file(self):
  555. # Even if there was data initially, if set to a nonexistent
  556. # file it should result in no users
  557. accounts_file = BUILD_FILE_PATH + 'new_file.csv'
  558. self.assertFalse(os.path.exists(accounts_file))
  559. fake_users_val = { 'notinfile': [] }
  560. self.server._user_infos = fake_users_val
  561. self.server._create_user_info(accounts_file)
  562. self.assertEqual({}, self.server._user_infos)
  563. # Should it now be created it should be read
  564. with TmpTextFile(accounts_file, ['root,foo,bar']):
  565. self.server._create_user_info(accounts_file)
  566. self.assertEqual(1, len(self.server._user_infos))
  567. self.assertTrue('root' in self.server._user_infos)
  568. def test_check_file(self):
  569. # Just some file that we know exists
  570. file_name = BUILD_FILE_PATH + 'cmdctl-keyfile.pem'
  571. check_file(file_name)
  572. self.assertRaises(CmdctlException, check_file, '/local/not-exist')
  573. self.assertRaises(CmdctlException, check_file, '/')
  574. @unittest.skipIf(os.getuid() == 0,
  575. 'test cannot be run as root user')
  576. def test_check_file_for_unreadable(self):
  577. file_name = BUILD_FILE_PATH + 'cmdctl-keyfile.pem'
  578. with UnreadableFile(file_name):
  579. self.assertRaises(CmdctlException, check_file, file_name)
  580. def test_check_key_and_cert(self):
  581. keyfile = BUILD_FILE_PATH + 'cmdctl-keyfile.pem'
  582. certfile = BUILD_FILE_PATH + 'cmdctl-certfile.pem'
  583. # no exists
  584. self.assertRaises(CmdctlException, self.server._check_key_and_cert,
  585. keyfile, '/local/not-exist')
  586. self.assertRaises(CmdctlException, self.server._check_key_and_cert,
  587. '/local/not-exist', certfile)
  588. # not a file
  589. self.assertRaises(CmdctlException, self.server._check_key_and_cert,
  590. keyfile, '/')
  591. self.assertRaises(CmdctlException, self.server._check_key_and_cert,
  592. '/', certfile)
  593. # All OK (also happens to check the context code above works)
  594. self.server._check_key_and_cert(keyfile, certfile)
  595. @unittest.skipIf(os.getuid() == 0,
  596. 'test cannot be run as root user')
  597. def test_check_key_and_cert_for_unreadable(self):
  598. keyfile = BUILD_FILE_PATH + 'cmdctl-keyfile.pem'
  599. certfile = BUILD_FILE_PATH + 'cmdctl-certfile.pem'
  600. # no read permission
  601. with UnreadableFile(certfile):
  602. self.assertRaises(CmdctlException,
  603. self.server._check_key_and_cert,
  604. keyfile, certfile)
  605. with UnreadableFile(keyfile):
  606. self.assertRaises(CmdctlException,
  607. self.server._check_key_and_cert,
  608. keyfile, certfile)
  609. def test_wrap_sock_in_ssl_context(self):
  610. sock = socket.socket()
  611. # Bad files should result in a CmdctlException in the basic file
  612. # checks
  613. self.assertRaises(CmdctlException,
  614. self.server._wrap_socket_in_ssl_context,
  615. sock,
  616. 'no_such_file', 'no_such_file')
  617. # Using a non-certificate file would cause an SSLError
  618. self.assertRaises(socket.error,
  619. self.server._wrap_socket_in_ssl_context,
  620. sock,
  621. BUILD_FILE_PATH + 'cmdctl.py',
  622. BUILD_FILE_PATH + 'cmdctl-certfile.pem')
  623. # Should succeed
  624. sock1 = socket.socket()
  625. ssl_sock = self.server._wrap_socket_in_ssl_context(sock1,
  626. BUILD_FILE_PATH + 'cmdctl-keyfile.pem',
  627. BUILD_FILE_PATH + 'cmdctl-certfile.pem')
  628. self.assertTrue(isinstance(ssl_sock, ssl.SSLSocket))
  629. # wrap_socket can also raise IOError, which should be caught and
  630. # handled like the other errors.
  631. # Force this by temporarily disabling our own file checks
  632. orig_check_func = self.server._check_key_and_cert
  633. try:
  634. self.server._check_key_and_cert = lambda x,y: None
  635. self.assertRaises(IOError,
  636. self.server._wrap_socket_in_ssl_context,
  637. sock,
  638. 'no_such_file', 'no_such_file')
  639. finally:
  640. self.server._check_key_and_cert = orig_check_func
  641. class TestFuncNotInClass(unittest.TestCase):
  642. def test_check_port(self):
  643. self.assertRaises(OptionValueError, check_port, None, 'port', -1, None)
  644. self.assertRaises(OptionValueError, check_port, None, 'port', 65536, None)
  645. self.assertRaises(OptionValueError, check_addr, None, 'ipstr', 'a.b.d', None)
  646. self.assertRaises(OptionValueError, check_addr, None, 'ipstr', '1::0:a.b', None)
  647. if __name__== "__main__":
  648. isc.log.resetUnitTestRootLogger()
  649. unittest.main()