cmdctl_test.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777
  1. # Copyright (C) 2009 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import unittest
  16. import socket
  17. import tempfile
  18. import time
  19. import stat
  20. import sys
  21. from cmdctl import *
  22. import isc.log
  23. assert 'CMDCTL_SRC_PATH' in os.environ,\
  24. "Please run this test with 'make check'"
  25. SRC_FILE_PATH = os.environ['CMDCTL_SRC_PATH'] + os.sep
  26. assert 'CMDCTL_BUILD_PATH' in os.environ,\
  27. "Please run this test with 'make check'"
  28. BUILD_FILE_PATH = os.environ['CMDCTL_BUILD_PATH'] + os.sep
  29. # Rewrite the class for unittest.
  30. class MySecureHTTPRequestHandler(SecureHTTPRequestHandler):
  31. def __init__(self):
  32. self.session_id = None
  33. def send_response(self, rcode):
  34. self.rcode = rcode
  35. def end_headers(self):
  36. pass
  37. class FakeSecureHTTPServer(SecureHTTPServer):
  38. def __init__(self):
  39. self.user_sessions = {}
  40. self.cmdctl = FakeCommandControlForTestRequestHandler()
  41. self._verbose = True
  42. self._user_infos = {}
  43. self.idle_timeout = 1200
  44. self._lock = threading.Lock()
  45. class FakeCommandControlForTestRequestHandler(CommandControl):
  46. def __init__(self):
  47. self._config_data = {}
  48. self.modules_spec = {}
  49. self._lock = threading.Lock()
  50. def send_command(self, mod, cmd, param):
  51. return 0, {}
  52. # context to temporarily make a file unreadable
  53. class UnreadableFile:
  54. def __init__(self, file_name):
  55. self.file_name = file_name
  56. self.orig_mode = os.stat(file_name).st_mode
  57. def __enter__(self):
  58. os.chmod(self.file_name, self.orig_mode & ~stat.S_IRUSR)
  59. def __exit__(self, type, value, traceback):
  60. os.chmod(self.file_name, self.orig_mode)
  61. class TmpTextFile:
  62. """
  63. Context class for temporarily creating a text file with some
  64. lines of content.
  65. The file is automatically deleted if the context is left, so
  66. make sure to not use the path of an existing file!
  67. """
  68. def __init__(self, path, contents):
  69. self.__path = path
  70. self.__contents = contents
  71. def __enter__(self):
  72. with open(self.__path, 'w') as f:
  73. f.write("\n".join(self.__contents) + "\n")
  74. def __exit__(self, type, value, traceback):
  75. os.unlink(self.__path)
  76. class TestSecureHTTPRequestHandler(unittest.TestCase):
  77. def setUp(self):
  78. self.old_stdout = sys.stdout
  79. sys.stdout = open(os.devnull, 'w')
  80. self.handler = MySecureHTTPRequestHandler()
  81. self.handler.server = FakeSecureHTTPServer()
  82. self.handler.server.user_sessions = {}
  83. self.handler.server._user_infos = {}
  84. self.handler.headers = {}
  85. self.handler.rfile = open('input.tmp', 'w+b')
  86. self.handler.wfile = open('output.tmp', 'w+b')
  87. def tearDown(self):
  88. sys.stdout.close()
  89. sys.stdout = self.old_stdout
  90. self.handler.wfile.close()
  91. os.remove('output.tmp')
  92. self.handler.rfile.close()
  93. os.remove('input.tmp')
  94. def test_is_session_valid(self):
  95. self.assertIsNone(self.handler.session_id)
  96. self.assertFalse(self.handler._is_session_valid())
  97. self.handler.session_id = 4234
  98. self.assertTrue(self.handler._is_session_valid())
  99. def test_parse_request_path(self):
  100. self.handler.path = ''
  101. mod, cmd = self.handler._parse_request_path()
  102. self.assertTrue((mod == None) and (cmd == None))
  103. self.handler.path = '/abc'
  104. mod, cmd = self.handler._parse_request_path()
  105. self.assertTrue((mod == 'abc') and (cmd == None))
  106. self.handler.path = '/abc/edf'
  107. mod, cmd = self.handler._parse_request_path()
  108. self.assertTrue((mod == 'abc') and (cmd == 'edf'))
  109. self.handler.path = '/abc/edf/ghi'
  110. mod, cmd = self.handler._parse_request_path()
  111. self.assertTrue((mod == 'abc') and (cmd == 'edf'))
  112. def test_parse_request_path_1(self):
  113. self.handler.path = '/ab*c'
  114. mod, cmd = self.handler._parse_request_path()
  115. self.assertTrue((mod == 'ab') and cmd == None)
  116. self.handler.path = '/abc/ed*fdd/ddd'
  117. mod, cmd = self.handler._parse_request_path()
  118. self.assertTrue((mod == 'abc') and cmd == 'ed')
  119. self.handler.path = '/-*/edfdd/ddd'
  120. mod, cmd = self.handler._parse_request_path()
  121. self.assertTrue((mod == None) and (cmd == None))
  122. self.handler.path = '/-*/edfdd/ddd'
  123. mod, cmd = self.handler._parse_request_path()
  124. self.assertTrue((mod == None) and (cmd == None))
  125. def test_do_GET(self):
  126. self.handler.do_GET()
  127. self.assertEqual(self.handler.rcode, http.client.BAD_REQUEST)
  128. def test_do_GET_1(self):
  129. self.handler.headers['cookie'] = 12345
  130. self.handler.do_GET()
  131. self.assertEqual(self.handler.rcode, http.client.UNAUTHORIZED)
  132. def test_do_GET_2(self):
  133. self.handler.headers['cookie'] = 12345
  134. self.handler.server.user_sessions[12345] = time.time() + 1000000
  135. self.handler.path = '/how/are'
  136. self.handler.do_GET()
  137. self.assertEqual(self.handler.rcode, http.client.NO_CONTENT)
  138. def test_do_GET_3(self):
  139. self.handler.headers['cookie'] = 12346
  140. self.handler.server.user_sessions[12346] = time.time() + 1000000
  141. path_vec = ['config_data', 'module_spec']
  142. for path in path_vec:
  143. self.handler.path = '/' + path
  144. self.handler.do_GET()
  145. self.assertEqual(self.handler.rcode, http.client.OK)
  146. def test_is_user_logged_in(self):
  147. self.handler.server.user_sessions = {}
  148. self.handler.session_id = 12345
  149. self.assertTrue(self.handler._is_user_logged_in() == False)
  150. self.handler.server.user_sessions[12345] = time.time()
  151. self.assertTrue(self.handler._is_user_logged_in())
  152. self.handler.server.user_sessions[12345] = time.time() - 1500
  153. self.handler.idle_timeout = 1200
  154. self.assertTrue(self.handler._is_user_logged_in() == False)
  155. def test_check_user_name_and_pwd(self):
  156. self.handler.headers = {}
  157. ret, msg = self.handler._check_user_name_and_pwd()
  158. self.assertFalse(ret)
  159. self.assertEqual(msg, ['invalid username or password'])
  160. def test_check_user_name_and_pwd_1(self):
  161. user_info = {'username':'root', 'password':'abc123'}
  162. len = self.handler.rfile.write(json.dumps(user_info).encode())
  163. self.handler.headers['Content-Length'] = len
  164. self.handler.rfile.seek(0, 0)
  165. self.handler.server._user_infos['root'] = ['aa', 'aaa']
  166. ret, msg = self.handler._check_user_name_and_pwd()
  167. self.assertFalse(ret)
  168. self.assertEqual(msg, ['username or password error'])
  169. def test_check_user_name_and_pwd_2(self):
  170. user_info = {'username':'root', 'password':'abc123'}
  171. len = self.handler.rfile.write(json.dumps(user_info).encode())
  172. self.handler.headers['Content-Length'] = len - 1
  173. self.handler.rfile.seek(0, 0)
  174. ret, msg = self.handler._check_user_name_and_pwd()
  175. self.assertFalse(ret)
  176. self.assertEqual(msg, ['invalid username or password'])
  177. def test_check_user_name_and_pwd_3(self):
  178. user_info = {'usernae':'root', 'password':'abc123'}
  179. len = self.handler.rfile.write(json.dumps(user_info).encode())
  180. self.handler.headers['Content-Length'] = len
  181. self.handler.rfile.seek(0, 0)
  182. ret, msg = self.handler._check_user_name_and_pwd()
  183. self.assertFalse(ret)
  184. self.assertEqual(msg, ['need user name'])
  185. def test_check_user_name_and_pwd_4(self):
  186. user_info = {'username':'root', 'pssword':'abc123'}
  187. len = self.handler.rfile.write(json.dumps(user_info).encode())
  188. self.handler.headers['Content-Length'] = len
  189. self.handler.rfile.seek(0, 0)
  190. self.handler.server._user_infos['root'] = ['aa', 'aaa']
  191. ret, msg = self.handler._check_user_name_and_pwd()
  192. self.assertFalse(ret)
  193. self.assertEqual(msg, ['need password'])
  194. def test_check_user_name_and_pwd_5(self):
  195. user_info = {'username':'root', 'password':'abc123'}
  196. len = self.handler.rfile.write(json.dumps(user_info).encode())
  197. self.handler.headers['Content-Length'] = len
  198. self.handler.rfile.seek(0, 0)
  199. ret, msg = self.handler._check_user_name_and_pwd()
  200. self.assertFalse(ret)
  201. self.assertEqual(msg, ['username or password error'])
  202. def test_do_POST(self):
  203. self.handler.headers = {}
  204. self.handler.do_POST()
  205. self.assertEqual(self.handler.rcode, http.client.BAD_REQUEST)
  206. def test_do_POST_1(self):
  207. self.handler.headers = {}
  208. self.handler.headers['cookie'] = 12345
  209. self.handler.path = '/'
  210. self.handler.do_POST()
  211. self.assertEqual(self.handler.rcode, http.client.UNAUTHORIZED)
  212. def test_handle_post_request(self):
  213. self.handler.path = '/cfgmgr/revert'
  214. self.handler.headers = {}
  215. rcode, reply = self.handler._handle_post_request()
  216. self.assertEqual(http.client.BAD_REQUEST, rcode)
  217. def test_handle_post_request_1(self):
  218. self.handler.path = '/*d/revert'
  219. self.handler.headers = {}
  220. rcode, reply = self.handler._handle_post_request()
  221. self.assertEqual(http.client.BAD_REQUEST, rcode)
  222. def _gen_module_spec(self):
  223. spec = { 'commands': [
  224. { 'command_name' :'command',
  225. 'command_args': [ {
  226. 'item_name' : 'param1',
  227. 'item_type' : 'integer',
  228. 'item_optional' : False,
  229. 'item_default' : 0
  230. } ],
  231. 'command_description' : 'cmd description'
  232. }
  233. ]
  234. }
  235. return spec
  236. def test_handle_post_request_2(self):
  237. params = {'param1':123}
  238. len = self.handler.rfile.write(json.dumps(params).encode())
  239. self.handler.headers['Content-Length'] = len
  240. self.handler.rfile.seek(0, 0)
  241. self.handler.path = '/module/command'
  242. self.handler.server.cmdctl.modules_spec = {}
  243. self.handler.server.cmdctl.modules_spec['module'] = self._gen_module_spec()
  244. rcode, reply = self.handler._handle_post_request()
  245. self.assertEqual(http.client.OK, rcode)
  246. def test_handle_post_request_3(self):
  247. params = {'param1':'abc'}
  248. len = self.handler.rfile.write(json.dumps(params).encode())
  249. self.handler.headers['Content-Length'] = len
  250. self.handler.rfile.seek(0, 0)
  251. self.handler.path = '/module/command'
  252. self.handler.server.cmdctl.modules_spec = {}
  253. self.handler.server.cmdctl.modules_spec['module'] = self._gen_module_spec()
  254. rcode, reply = self.handler._handle_post_request()
  255. self.assertEqual(http.client.BAD_REQUEST, rcode)
  256. def test_handle_post_request_to_itself(self):
  257. len = self.handler.rfile.write(json.dumps({}).encode())
  258. self.handler.headers['Content-Length'] = len
  259. self.handler.rfile.seek(0, 0)
  260. self.handler.path = '/Cmdctl/shutdown'
  261. rcode, reply = self.handler._handle_post_request()
  262. self.assertEqual(http.client.BAD_REQUEST, rcode)
  263. def test_handle_login(self):
  264. orig_is_user_logged_in = self.handler._is_user_logged_in
  265. orig_check_user_name_and_pwd = self.handler._check_user_name_and_pwd
  266. try:
  267. def create_is_user_logged_in(status):
  268. '''Create a replacement _is_user_logged_in() method.'''
  269. def my_is_user_logged_in():
  270. return status
  271. return my_is_user_logged_in
  272. # Check case where _is_user_logged_in() returns True
  273. self.handler._is_user_logged_in = create_is_user_logged_in(True)
  274. self.handler.headers['cookie'] = 12345
  275. self.handler.path = '/login'
  276. self.handler.do_POST()
  277. self.assertEqual(self.handler.rcode, http.client.OK)
  278. self.handler.wfile.seek(0, 0)
  279. d = self.handler.wfile.read()
  280. self.assertEqual(json.loads(d.decode()),
  281. ['user has already login'])
  282. # Clear the output
  283. self.handler.wfile.seek(0, 0)
  284. self.handler.wfile.truncate()
  285. # Check case where _is_user_logged_in() returns False
  286. self.handler._is_user_logged_in = create_is_user_logged_in(False)
  287. def create_check_user_name_and_pwd(status, error_info=None):
  288. '''Create a replacement _check_user_name_and_pwd() method.'''
  289. def my_check_user_name_and_pwd():
  290. return status, error_info
  291. return my_check_user_name_and_pwd
  292. # (a) Check case where _check_user_name_and_pwd() returns
  293. # valid user status
  294. self.handler._check_user_name_and_pwd = \
  295. create_check_user_name_and_pwd(True)
  296. self.handler.do_POST()
  297. self.assertEqual(self.handler.rcode, http.client.OK)
  298. self.handler.wfile.seek(0, 0)
  299. d = self.handler.wfile.read()
  300. self.assertEqual(json.loads(d.decode()), ['login success'])
  301. # Clear the output
  302. self.handler.wfile.seek(0, 0)
  303. self.handler.wfile.truncate()
  304. # (b) Check case where _check_user_name_and_pwd() returns
  305. # invalid user status
  306. self.handler._check_user_name_and_pwd = \
  307. create_check_user_name_and_pwd(False, ['login failed'])
  308. self.handler.do_POST()
  309. self.assertEqual(self.handler.rcode, http.client.UNAUTHORIZED)
  310. self.handler.wfile.seek(0, 0)
  311. d = self.handler.wfile.read()
  312. self.assertEqual(json.loads(d.decode()), ['login failed'])
  313. finally:
  314. self.handler._is_user_logged_in = orig_is_user_logged_in
  315. self.handler._check_user_name_and_pwd = orig_check_user_name_and_pwd
  316. class MockSession:
  317. """Act like isc.cc.Session, stealing group_sendmsg/recvmsg().
  318. The initial simple version only records given parameters in
  319. group_sendmsg() for later inspection and raise a timeout exception
  320. from recvmsg(). As we see the need for more test cases these methods
  321. should be extended.
  322. """
  323. def __init__(self, sent_messages):
  324. self.__sent_messages = sent_messages
  325. def group_sendmsg(self, msg, module_name, want_answer):
  326. self.__sent_messages.append((msg, module_name))
  327. def group_recvmsg(self, nonblock, seq):
  328. raise isc.cc.session.SessionTimeout('dummy timeout')
  329. class MyCommandControl(CommandControl):
  330. def __init__(self, httpserver, verbose):
  331. super().__init__(httpserver, verbose)
  332. self.sent_messages = [] # for inspection; allow tests to see it
  333. self._cc = MockSession(self.sent_messages)
  334. def _get_modules_specification(self):
  335. return {}
  336. def _get_config_data_from_config_manager(self):
  337. return {}
  338. def _setup_session(self):
  339. spec_file = BUILD_FILE_PATH + 'cmdctl.spec'
  340. module_spec = isc.config.module_spec_from_file(spec_file)
  341. config = isc.config.config_data.ConfigData(module_spec)
  342. self._module_name = 'Cmdctl'
  343. self._cmdctl_config_data = config.get_full_config()
  344. def _handle_msg_from_msgq(self):
  345. pass
  346. def _start_msg_handle_thread(self): # just not bother to be threads
  347. pass
  348. def _get_current_thread(self):
  349. return None
  350. class TestCommandControl(unittest.TestCase):
  351. def setUp(self):
  352. self.old_stdout = sys.stdout
  353. sys.stdout = open(os.devnull, 'w')
  354. self.cmdctl = MyCommandControl(None, True)
  355. def tearDown(self):
  356. sys.stdout.close()
  357. sys.stdout = self.old_stdout
  358. def _check_config(self, cmdctl):
  359. key, cert, account = cmdctl.get_cmdctl_config_data()
  360. self.assertIsNotNone(key)
  361. self.assertIsNotNone(cert)
  362. self.assertIsNotNone(account)
  363. def test_get_cmdctl_config_data(self):
  364. old_env = os.environ
  365. if 'B10_FROM_SOURCE' in os.environ:
  366. del os.environ['B10_FROM_SOURCE']
  367. self.cmdctl.get_cmdctl_config_data()
  368. self._check_config(self.cmdctl)
  369. os.environ = old_env
  370. old_env = os.environ
  371. os.environ['B10_FROM_SOURCE'] = '../'
  372. self._check_config(self.cmdctl)
  373. os.environ = old_env
  374. def test_parse_command_result(self):
  375. self.assertEqual({}, self.cmdctl._parse_command_result(1, {'error' : 1}))
  376. self.assertEqual({'a': 1}, self.cmdctl._parse_command_result(0, {'a' : 1}))
  377. def _check_answer(self, answer, rcode_, msg_):
  378. rcode, msg = ccsession.parse_answer(answer)
  379. self.assertEqual(rcode, rcode_)
  380. self.assertEqual(msg, msg_)
  381. def test_command_handler(self):
  382. answer = self.cmdctl.command_handler('unknown-command', None)
  383. self._check_answer(answer, 1, 'unknown command: unknown-command')
  384. answer = self.cmdctl.command_handler('print_settings', None)
  385. rcode, msg = ccsession.parse_answer(answer)
  386. self.assertEqual(rcode, 0)
  387. self.assertTrue(msg != None)
  388. def test_command_handler_spec_update(self):
  389. # Should not be present
  390. self.assertFalse("foo" in self.cmdctl.modules_spec)
  391. answer = self.cmdctl.command_handler(
  392. ccsession.COMMAND_MODULE_SPECIFICATION_UPDATE, [ "foo", {} ])
  393. rcode, msg = ccsession.parse_answer(answer)
  394. self.assertEqual(rcode, 0)
  395. self.assertEqual(msg, None)
  396. # Should now be present
  397. self.assertTrue("foo" in self.cmdctl.modules_spec)
  398. # When sending specification 'None', it should be removed
  399. answer = self.cmdctl.command_handler(
  400. ccsession.COMMAND_MODULE_SPECIFICATION_UPDATE, [ "foo", None ])
  401. rcode, msg = ccsession.parse_answer(answer)
  402. self.assertEqual(rcode, 0)
  403. self.assertEqual(msg, None)
  404. # Should no longer be present
  405. self.assertFalse("foo" in self.cmdctl.modules_spec)
  406. # Don't store 'None' if it wasn't there in the first place!
  407. answer = self.cmdctl.command_handler(
  408. ccsession.COMMAND_MODULE_SPECIFICATION_UPDATE, [ "foo", None ])
  409. rcode, msg = ccsession.parse_answer(answer)
  410. self.assertEqual(rcode, 1)
  411. self.assertEqual(msg, "No such module: foo")
  412. # Should still not present
  413. self.assertFalse("foo" in self.cmdctl.modules_spec)
  414. def test_check_config_handler(self):
  415. answer = self.cmdctl.config_handler({'non-exist': 123})
  416. self._check_answer(answer, 1, 'unknown config item: non-exist')
  417. old_env = os.environ
  418. os.environ['B10_FROM_SOURCE'] = '../'
  419. self._check_config(self.cmdctl)
  420. os.environ = old_env
  421. answer = self.cmdctl.config_handler({'key_file': '/user/non-exist_folder'})
  422. self._check_answer(answer, 1, "'/user/non-exist_folder' does not exist")
  423. answer = self.cmdctl.config_handler({'cert_file': '/user/non-exist_folder'})
  424. self._check_answer(answer, 1, "'/user/non-exist_folder' does not exist")
  425. answer = self.cmdctl.config_handler({'accounts_file': '/user/non-exist_folder'})
  426. self._check_answer(answer, 1,
  427. "Invalid accounts file: [Errno 2] No such file or directory: '/user/non-exist_folder'")
  428. # Test with invalid accounts file
  429. file_name = 'tmp.account.file'
  430. temp_file = open(file_name, 'w')
  431. writer = csv.writer(temp_file)
  432. writer.writerow(['a', 'b'])
  433. temp_file.close()
  434. answer = self.cmdctl.config_handler({'accounts_file': file_name})
  435. self._check_answer(answer, 1, "Invalid accounts file: list index out of range")
  436. os.remove(file_name)
  437. def test_send_command(self):
  438. # Send a command to other module. We check an expected message
  439. # is sent via the session (cmdct._cc). Due to the behavior of
  440. # our mock session object the anser will be "fail", but it's not
  441. # the subject of this test, and so it's okay.
  442. # TODO: more detailed cases should be tested.
  443. rcode, value = self.cmdctl.send_command('Init', 'shutdown', None)
  444. self.assertEqual(1, len(self.cmdctl.sent_messages))
  445. self.assertEqual(({'command': ['shutdown']}, 'Init'),
  446. self.cmdctl.sent_messages[-1])
  447. self.assertEqual(1, rcode)
  448. # Send a command to cmdctl itself. Should be the same effect.
  449. rcode, value = self.cmdctl.send_command('Cmdctl', 'print_settings',
  450. None)
  451. self.assertEqual(2, len(self.cmdctl.sent_messages))
  452. self.assertEqual(({'command': ['print_settings']}, 'Cmdctl'),
  453. self.cmdctl.sent_messages[-1])
  454. self.assertEqual(1, rcode)
  455. class MySecureHTTPServer(SecureHTTPServer):
  456. def server_bind(self):
  457. pass
  458. class TestSecureHTTPServer(unittest.TestCase):
  459. def setUp(self):
  460. self.old_stdout = sys.stdout
  461. self.old_stderr = sys.stderr
  462. sys.stdout = open(os.devnull, 'w')
  463. sys.stderr = sys.stdout
  464. self.server = MySecureHTTPServer(('localhost', 8080),
  465. MySecureHTTPRequestHandler,
  466. MyCommandControl, verbose=True)
  467. def tearDown(self):
  468. # both sys.stdout and sys.stderr are the same, so closing one is
  469. # sufficient
  470. sys.stdout.close()
  471. sys.stdout = self.old_stdout
  472. sys.stderr = self.old_stderr
  473. def test_addr_in_use(self):
  474. server_one = None
  475. try:
  476. server_one = SecureHTTPServer(('localhost', 53531),
  477. MySecureHTTPRequestHandler,
  478. MyCommandControl)
  479. except CmdctlException:
  480. pass
  481. else:
  482. self.assertRaises(CmdctlException, SecureHTTPServer,
  483. ('localhost', 53531),
  484. MySecureHTTPRequestHandler, MyCommandControl)
  485. if server_one:
  486. server_one.server_close()
  487. def test_create_user_info(self):
  488. self.server._create_user_info('/local/not-exist')
  489. self.assertEqual(0, len(self.server._user_infos))
  490. self.server._create_user_info(SRC_FILE_PATH + 'cmdctl-accounts.csv')
  491. self.assertEqual(1, len(self.server._user_infos))
  492. self.assertTrue('root' in self.server._user_infos)
  493. def test_get_user_info(self):
  494. self.assertIsNone(self.server.get_user_info('root'))
  495. self.server._create_user_info(SRC_FILE_PATH + 'cmdctl-accounts.csv')
  496. self.assertIn('6f0c73bd33101a5ec0294b3ca39fec90ef4717fe',
  497. self.server.get_user_info('root'))
  498. # When the file is not changed calling _create_user_info() again
  499. # should have no effect. In order to test this, we overwrite the
  500. # user-infos that were just set and make sure it isn't touched by
  501. # the call (so make sure it isn't set to some empty value)
  502. fake_users_val = { 'notinfile': [] }
  503. self.server._user_infos = fake_users_val
  504. self.server._create_user_info(SRC_FILE_PATH + 'cmdctl-accounts.csv')
  505. self.assertEqual(fake_users_val, self.server._user_infos)
  506. def test_create_user_info_changing_file_time(self):
  507. self.assertEqual(0, len(self.server._user_infos))
  508. # Create a file
  509. accounts_file = BUILD_FILE_PATH + 'new_file.csv'
  510. with TmpTextFile(accounts_file, ['root,foo,bar']):
  511. self.server._create_user_info(accounts_file)
  512. self.assertEqual(1, len(self.server._user_infos))
  513. self.assertTrue('root' in self.server._user_infos)
  514. # Make sure re-reading is a noop if file was not modified
  515. fake_users_val = { 'notinfile': [] }
  516. self.server._user_infos = fake_users_val
  517. self.server._create_user_info(accounts_file)
  518. self.assertEqual(fake_users_val, self.server._user_infos)
  519. # create the file again, this time read should not be a noop
  520. with TmpTextFile(accounts_file, ['otherroot,foo,bar']):
  521. # Set mtime in future
  522. stat = os.stat(accounts_file)
  523. os.utime(accounts_file, (stat.st_atime, stat.st_mtime + 10))
  524. self.server._create_user_info(accounts_file)
  525. self.assertEqual(1, len(self.server._user_infos))
  526. self.assertTrue('otherroot' in self.server._user_infos)
  527. def test_create_user_info_changing_file_name(self):
  528. """
  529. Check that the accounts file is re-read if the file name is different
  530. """
  531. self.assertEqual(0, len(self.server._user_infos))
  532. # Create two files
  533. accounts_file1 = BUILD_FILE_PATH + 'new_file.csv'
  534. accounts_file2 = BUILD_FILE_PATH + 'new_file2.csv'
  535. with TmpTextFile(accounts_file2, ['otherroot,foo,bar']):
  536. with TmpTextFile(accounts_file1, ['root,foo,bar']):
  537. self.server._create_user_info(accounts_file1)
  538. self.assertEqual(1, len(self.server._user_infos))
  539. self.assertTrue('root' in self.server._user_infos)
  540. # Make sure re-reading is a noop if file was not modified
  541. fake_users_val = { 'notinfile': [] }
  542. self.server._user_infos = fake_users_val
  543. self.server._create_user_info(accounts_file1)
  544. self.assertEqual(fake_users_val, self.server._user_infos)
  545. # But a different file should be read
  546. self.server._create_user_info(accounts_file2)
  547. self.assertEqual(1, len(self.server._user_infos))
  548. self.assertTrue('otherroot' in self.server._user_infos)
  549. def test_create_user_info_nonexistent_file(self):
  550. # Even if there was data initially, if set to a nonexistent
  551. # file it should result in no users
  552. accounts_file = BUILD_FILE_PATH + 'new_file.csv'
  553. self.assertFalse(os.path.exists(accounts_file))
  554. fake_users_val = { 'notinfile': [] }
  555. self.server._user_infos = fake_users_val
  556. self.server._create_user_info(accounts_file)
  557. self.assertEqual({}, self.server._user_infos)
  558. # Should it now be created it should be read
  559. with TmpTextFile(accounts_file, ['root,foo,bar']):
  560. self.server._create_user_info(accounts_file)
  561. self.assertEqual(1, len(self.server._user_infos))
  562. self.assertTrue('root' in self.server._user_infos)
  563. def test_check_file(self):
  564. # Just some file that we know exists
  565. file_name = BUILD_FILE_PATH + 'cmdctl-keyfile.pem'
  566. check_file(file_name)
  567. with UnreadableFile(file_name):
  568. self.assertRaises(CmdctlException, check_file, file_name)
  569. self.assertRaises(CmdctlException, check_file, '/local/not-exist')
  570. self.assertRaises(CmdctlException, check_file, '/')
  571. def test_check_key_and_cert(self):
  572. keyfile = BUILD_FILE_PATH + 'cmdctl-keyfile.pem'
  573. certfile = BUILD_FILE_PATH + 'cmdctl-certfile.pem'
  574. # no exists
  575. self.assertRaises(CmdctlException, self.server._check_key_and_cert,
  576. keyfile, '/local/not-exist')
  577. self.assertRaises(CmdctlException, self.server._check_key_and_cert,
  578. '/local/not-exist', certfile)
  579. # not a file
  580. self.assertRaises(CmdctlException, self.server._check_key_and_cert,
  581. keyfile, '/')
  582. self.assertRaises(CmdctlException, self.server._check_key_and_cert,
  583. '/', certfile)
  584. # no read permission
  585. with UnreadableFile(certfile):
  586. self.assertRaises(CmdctlException,
  587. self.server._check_key_and_cert,
  588. keyfile, certfile)
  589. with UnreadableFile(keyfile):
  590. self.assertRaises(CmdctlException,
  591. self.server._check_key_and_cert,
  592. keyfile, certfile)
  593. # All OK (also happens to check the context code above works)
  594. self.server._check_key_and_cert(keyfile, certfile)
  595. def test_wrap_sock_in_ssl_context(self):
  596. sock = socket.socket()
  597. # Bad files should result in a socket.error raised by our own
  598. # code in the basic file checks
  599. self.assertRaises(socket.error,
  600. self.server._wrap_socket_in_ssl_context,
  601. sock,
  602. 'no_such_file', 'no_such_file')
  603. # Using a non-certificate file would cause an SSLError, which
  604. # is caught by our code which then raises a basic socket.error
  605. self.assertRaises(socket.error,
  606. self.server._wrap_socket_in_ssl_context,
  607. sock,
  608. BUILD_FILE_PATH + 'cmdctl.py',
  609. BUILD_FILE_PATH + 'cmdctl-certfile.pem')
  610. # Should succeed
  611. sock1 = socket.socket()
  612. ssl_sock = self.server._wrap_socket_in_ssl_context(sock1,
  613. BUILD_FILE_PATH + 'cmdctl-keyfile.pem',
  614. BUILD_FILE_PATH + 'cmdctl-certfile.pem')
  615. self.assertTrue(isinstance(ssl_sock, ssl.SSLSocket))
  616. # wrap_socket can also raise IOError, which should be caught and
  617. # handled like the other errors.
  618. # Force this by temporarily disabling our own file checks
  619. orig_check_func = self.server._check_key_and_cert
  620. try:
  621. self.server._check_key_and_cert = lambda x,y: None
  622. self.assertRaises(socket.error,
  623. self.server._wrap_socket_in_ssl_context,
  624. sock,
  625. 'no_such_file', 'no_such_file')
  626. finally:
  627. self.server._check_key_and_cert = orig_check_func
  628. class TestFuncNotInClass(unittest.TestCase):
  629. def test_check_port(self):
  630. self.assertRaises(OptionValueError, check_port, None, 'port', -1, None)
  631. self.assertRaises(OptionValueError, check_port, None, 'port', 65536, None)
  632. self.assertRaises(OptionValueError, check_addr, None, 'ipstr', 'a.b.d', None)
  633. self.assertRaises(OptionValueError, check_addr, None, 'ipstr', '1::0:a.b', None)
  634. if __name__== "__main__":
  635. isc.log.resetUnitTestRootLogger()
  636. unittest.main()