cmdctl_test.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
  1. # Copyright (C) 2009 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import unittest
  16. import socket
  17. import tempfile
  18. import time
  19. import stat
  20. import sys
  21. from cmdctl import *
  22. import isc.log
  23. assert 'CMDCTL_SRC_PATH' in os.environ,\
  24. "Please run this test with 'make check'"
  25. SRC_FILE_PATH = os.environ['CMDCTL_SRC_PATH'] + os.sep
  26. assert 'CMDCTL_BUILD_PATH' in os.environ,\
  27. "Please run this test with 'make check'"
  28. BUILD_FILE_PATH = os.environ['CMDCTL_BUILD_PATH'] + os.sep
  29. # Rewrite the class for unittest.
  30. class MySecureHTTPRequestHandler(SecureHTTPRequestHandler):
  31. def __init__(self):
  32. pass
  33. def send_response(self, rcode):
  34. self.rcode = rcode
  35. def end_headers(self):
  36. pass
  37. def do_GET(self):
  38. self.wfile = open('tmp.file', 'wb')
  39. super().do_GET()
  40. self.wfile.close()
  41. os.remove('tmp.file')
  42. def do_POST(self):
  43. self.wfile = open("tmp.file", 'wb')
  44. super().do_POST()
  45. self.wfile.close()
  46. os.remove('tmp.file')
  47. class FakeSecureHTTPServer(SecureHTTPServer):
  48. def __init__(self):
  49. self.user_sessions = {}
  50. self.cmdctl = FakeCommandControlForTestRequestHandler()
  51. self._verbose = True
  52. self._user_infos = {}
  53. self.idle_timeout = 1200
  54. self._lock = threading.Lock()
  55. class FakeCommandControlForTestRequestHandler(CommandControl):
  56. def __init__(self):
  57. self._config_data = {}
  58. self.modules_spec = {}
  59. self._lock = threading.Lock()
  60. def send_command(self, mod, cmd, param):
  61. return 0, {}
  62. # context to temporarily make a file unreadable
  63. class UnreadableFile:
  64. def __init__(self, file_name):
  65. self.file_name = file_name
  66. self.orig_mode = os.stat(file_name).st_mode
  67. def __enter__(self):
  68. os.chmod(self.file_name, self.orig_mode & ~stat.S_IRUSR)
  69. def __exit__(self, type, value, traceback):
  70. os.chmod(self.file_name, self.orig_mode)
  71. class TmpTextFile:
  72. """
  73. Context class for temporarily creating a text file with some
  74. lines of content.
  75. The file is automatically deleted if the context is left, so
  76. make sure to not use the path of an existing file!
  77. """
  78. def __init__(self, path, contents):
  79. self.__path = path
  80. self.__contents = contents
  81. def __enter__(self):
  82. with open(self.__path, 'w') as f:
  83. f.write("\n".join(self.__contents) + "\n")
  84. def __exit__(self, type, value, traceback):
  85. os.unlink(self.__path)
  86. class TestSecureHTTPRequestHandler(unittest.TestCase):
  87. def setUp(self):
  88. self.old_stdout = sys.stdout
  89. sys.stdout = open(os.devnull, 'w')
  90. self.handler = MySecureHTTPRequestHandler()
  91. self.handler.server = FakeSecureHTTPServer()
  92. self.handler.server.user_sessions = {}
  93. self.handler.server._user_infos = {}
  94. self.handler.headers = {}
  95. self.handler.rfile = open("check.tmp", 'w+b')
  96. def tearDown(self):
  97. sys.stdout.close()
  98. sys.stdout = self.old_stdout
  99. self.handler.rfile.close()
  100. os.remove('check.tmp')
  101. def test_parse_request_path(self):
  102. self.handler.path = ''
  103. mod, cmd = self.handler._parse_request_path()
  104. self.assertTrue((mod == None) and (cmd == None))
  105. self.handler.path = '/abc'
  106. mod, cmd = self.handler._parse_request_path()
  107. self.assertTrue((mod == 'abc') and (cmd == None))
  108. self.handler.path = '/abc/edf'
  109. mod, cmd = self.handler._parse_request_path()
  110. self.assertTrue((mod == 'abc') and (cmd == 'edf'))
  111. self.handler.path = '/abc/edf/ghi'
  112. mod, cmd = self.handler._parse_request_path()
  113. self.assertTrue((mod == 'abc') and (cmd == 'edf'))
  114. def test_parse_request_path_1(self):
  115. self.handler.path = '/ab*c'
  116. mod, cmd = self.handler._parse_request_path()
  117. self.assertTrue((mod == 'ab') and cmd == None)
  118. self.handler.path = '/abc/ed*fdd/ddd'
  119. mod, cmd = self.handler._parse_request_path()
  120. self.assertTrue((mod == 'abc') and cmd == 'ed')
  121. self.handler.path = '/-*/edfdd/ddd'
  122. mod, cmd = self.handler._parse_request_path()
  123. self.assertTrue((mod == None) and (cmd == None))
  124. self.handler.path = '/-*/edfdd/ddd'
  125. mod, cmd = self.handler._parse_request_path()
  126. self.assertTrue((mod == None) and (cmd == None))
  127. def test_do_GET(self):
  128. self.handler.do_GET()
  129. self.assertEqual(self.handler.rcode, http.client.BAD_REQUEST)
  130. def test_do_GET_1(self):
  131. self.handler.headers['cookie'] = 12345
  132. self.handler.do_GET()
  133. self.assertEqual(self.handler.rcode, http.client.UNAUTHORIZED)
  134. def test_do_GET_2(self):
  135. self.handler.headers['cookie'] = 12345
  136. self.handler.server.user_sessions[12345] = time.time() + 1000000
  137. self.handler.path = '/how/are'
  138. self.handler.do_GET()
  139. self.assertEqual(self.handler.rcode, http.client.NO_CONTENT)
  140. def test_do_GET_3(self):
  141. self.handler.headers['cookie'] = 12346
  142. self.handler.server.user_sessions[12346] = time.time() + 1000000
  143. path_vec = ['config_data', 'module_spec']
  144. for path in path_vec:
  145. self.handler.path = '/' + path
  146. self.handler.do_GET()
  147. self.assertEqual(self.handler.rcode, http.client.OK)
  148. def test_user_logged_in(self):
  149. self.handler.server.user_sessions = {}
  150. self.handler.session_id = 12345
  151. self.assertTrue(self.handler._is_user_logged_in() == False)
  152. self.handler.server.user_sessions[12345] = time.time()
  153. self.assertTrue(self.handler._is_user_logged_in())
  154. self.handler.server.user_sessions[12345] = time.time() - 1500
  155. self.handler.idle_timeout = 1200
  156. self.assertTrue(self.handler._is_user_logged_in() == False)
  157. def test_check_user_name_and_pwd(self):
  158. self.handler.headers = {}
  159. ret, msg = self.handler._check_user_name_and_pwd()
  160. self.assertFalse(ret)
  161. self.assertEqual(msg, ['invalid username or password'])
  162. def test_check_user_name_and_pwd_1(self):
  163. user_info = {'username':'root', 'password':'abc123'}
  164. len = self.handler.rfile.write(json.dumps(user_info).encode())
  165. self.handler.headers['Content-Length'] = len
  166. self.handler.rfile.seek(0, 0)
  167. self.handler.server._user_infos['root'] = ['aa', 'aaa']
  168. ret, msg = self.handler._check_user_name_and_pwd()
  169. self.assertFalse(ret)
  170. self.assertEqual(msg, ['username or password error'])
  171. def test_check_user_name_and_pwd_2(self):
  172. user_info = {'username':'root', 'password':'abc123'}
  173. len = self.handler.rfile.write(json.dumps(user_info).encode())
  174. self.handler.headers['Content-Length'] = len - 1
  175. self.handler.rfile.seek(0, 0)
  176. ret, msg = self.handler._check_user_name_and_pwd()
  177. self.assertFalse(ret)
  178. self.assertEqual(msg, ['invalid username or password'])
  179. def test_check_user_name_and_pwd_3(self):
  180. user_info = {'usernae':'root', 'password':'abc123'}
  181. len = self.handler.rfile.write(json.dumps(user_info).encode())
  182. self.handler.headers['Content-Length'] = len
  183. self.handler.rfile.seek(0, 0)
  184. ret, msg = self.handler._check_user_name_and_pwd()
  185. self.assertFalse(ret)
  186. self.assertEqual(msg, ['need user name'])
  187. def test_check_user_name_and_pwd_4(self):
  188. user_info = {'username':'root', 'pssword':'abc123'}
  189. len = self.handler.rfile.write(json.dumps(user_info).encode())
  190. self.handler.headers['Content-Length'] = len
  191. self.handler.rfile.seek(0, 0)
  192. self.handler.server._user_infos['root'] = ['aa', 'aaa']
  193. ret, msg = self.handler._check_user_name_and_pwd()
  194. self.assertFalse(ret)
  195. self.assertEqual(msg, ['need password'])
  196. def test_check_user_name_and_pwd_5(self):
  197. user_info = {'username':'root', 'password':'abc123'}
  198. len = self.handler.rfile.write(json.dumps(user_info).encode())
  199. self.handler.headers['Content-Length'] = len
  200. self.handler.rfile.seek(0, 0)
  201. ret, msg = self.handler._check_user_name_and_pwd()
  202. self.assertFalse(ret)
  203. self.assertEqual(msg, ['username or password error'])
  204. def test_do_POST(self):
  205. self.handler.headers = {}
  206. self.handler.do_POST()
  207. self.assertEqual(self.handler.rcode, http.client.BAD_REQUEST)
  208. def test_do_POST_1(self):
  209. self.handler.headers = {}
  210. self.handler.headers['cookie'] = 12345
  211. self.handler.path = '/'
  212. self.handler.do_POST()
  213. self.assertEqual(self.handler.rcode, http.client.UNAUTHORIZED)
  214. def test_handle_post_request(self):
  215. self.handler.path = '/cfgmgr/revert'
  216. self.handler.headers = {}
  217. rcode, reply = self.handler._handle_post_request()
  218. self.assertEqual(http.client.BAD_REQUEST, rcode)
  219. def test_handle_post_request_1(self):
  220. self.handler.path = '/*d/revert'
  221. self.handler.headers = {}
  222. rcode, reply = self.handler._handle_post_request()
  223. self.assertEqual(http.client.BAD_REQUEST, rcode)
  224. def _gen_module_spec(self):
  225. spec = { 'commands': [
  226. { 'command_name' :'command',
  227. 'command_args': [ {
  228. 'item_name' : 'param1',
  229. 'item_type' : 'integer',
  230. 'item_optional' : False,
  231. 'item_default' : 0
  232. } ],
  233. 'command_description' : 'cmd description'
  234. }
  235. ]
  236. }
  237. return spec
  238. def test_handle_post_request_2(self):
  239. params = {'param1':123}
  240. len = self.handler.rfile.write(json.dumps(params).encode())
  241. self.handler.headers['Content-Length'] = len
  242. self.handler.rfile.seek(0, 0)
  243. self.handler.path = '/module/command'
  244. self.handler.server.cmdctl.modules_spec = {}
  245. self.handler.server.cmdctl.modules_spec['module'] = self._gen_module_spec()
  246. rcode, reply = self.handler._handle_post_request()
  247. self.assertEqual(http.client.OK, rcode)
  248. def test_handle_post_request_3(self):
  249. params = {'param1':'abc'}
  250. len = self.handler.rfile.write(json.dumps(params).encode())
  251. self.handler.headers['Content-Length'] = len
  252. self.handler.rfile.seek(0, 0)
  253. self.handler.path = '/module/command'
  254. self.handler.server.cmdctl.modules_spec = {}
  255. self.handler.server.cmdctl.modules_spec['module'] = self._gen_module_spec()
  256. rcode, reply = self.handler._handle_post_request()
  257. self.assertEqual(http.client.BAD_REQUEST, rcode)
  258. class MyCommandControl(CommandControl):
  259. def _get_modules_specification(self):
  260. return {}
  261. def _get_config_data_from_config_manager(self):
  262. return {}
  263. def _setup_session(self):
  264. spec_file = BUILD_FILE_PATH + 'cmdctl.spec'
  265. module_spec = isc.config.module_spec_from_file(spec_file)
  266. config = isc.config.config_data.ConfigData(module_spec)
  267. self._module_name = 'Cmdctl'
  268. self._cmdctl_config_data = config.get_full_config()
  269. def _handle_msg_from_msgq(self):
  270. pass
  271. class TestCommandControl(unittest.TestCase):
  272. def setUp(self):
  273. self.old_stdout = sys.stdout
  274. sys.stdout = open(os.devnull, 'w')
  275. self.cmdctl = MyCommandControl(None, True)
  276. def tearDown(self):
  277. sys.stdout.close()
  278. sys.stdout = self.old_stdout
  279. def _check_config(self, cmdctl):
  280. key, cert, account = cmdctl.get_cmdctl_config_data()
  281. self.assertIsNotNone(key)
  282. self.assertIsNotNone(cert)
  283. self.assertIsNotNone(account)
  284. def test_get_cmdctl_config_data(self):
  285. old_env = os.environ
  286. if 'B10_FROM_SOURCE' in os.environ:
  287. del os.environ['B10_FROM_SOURCE']
  288. self.cmdctl.get_cmdctl_config_data()
  289. self._check_config(self.cmdctl)
  290. os.environ = old_env
  291. old_env = os.environ
  292. os.environ['B10_FROM_SOURCE'] = '../'
  293. self._check_config(self.cmdctl)
  294. os.environ = old_env
  295. def test_parse_command_result(self):
  296. self.assertEqual({}, self.cmdctl._parse_command_result(1, {'error' : 1}))
  297. self.assertEqual({'a': 1}, self.cmdctl._parse_command_result(0, {'a' : 1}))
  298. def _check_answer(self, answer, rcode_, msg_):
  299. rcode, msg = ccsession.parse_answer(answer)
  300. self.assertEqual(rcode, rcode_)
  301. self.assertEqual(msg, msg_)
  302. def test_command_handler(self):
  303. answer = self.cmdctl.command_handler('unknown-command', None)
  304. self._check_answer(answer, 1, 'unknown command: unknown-command')
  305. answer = self.cmdctl.command_handler('print_settings', None)
  306. rcode, msg = ccsession.parse_answer(answer)
  307. self.assertEqual(rcode, 0)
  308. self.assertTrue(msg != None)
  309. def test_command_handler_spec_update(self):
  310. # Should not be present
  311. self.assertFalse("foo" in self.cmdctl.modules_spec)
  312. answer = self.cmdctl.command_handler(
  313. ccsession.COMMAND_MODULE_SPECIFICATION_UPDATE, [ "foo", {} ])
  314. rcode, msg = ccsession.parse_answer(answer)
  315. self.assertEqual(rcode, 0)
  316. self.assertEqual(msg, None)
  317. # Should now be present
  318. self.assertTrue("foo" in self.cmdctl.modules_spec)
  319. # When sending specification 'None', it should be removed
  320. answer = self.cmdctl.command_handler(
  321. ccsession.COMMAND_MODULE_SPECIFICATION_UPDATE, [ "foo", None ])
  322. rcode, msg = ccsession.parse_answer(answer)
  323. self.assertEqual(rcode, 0)
  324. self.assertEqual(msg, None)
  325. # Should no longer be present
  326. self.assertFalse("foo" in self.cmdctl.modules_spec)
  327. # Don't store 'None' if it wasn't there in the first place!
  328. answer = self.cmdctl.command_handler(
  329. ccsession.COMMAND_MODULE_SPECIFICATION_UPDATE, [ "foo", None ])
  330. rcode, msg = ccsession.parse_answer(answer)
  331. self.assertEqual(rcode, 1)
  332. self.assertEqual(msg, "No such module: foo")
  333. # Should still not present
  334. self.assertFalse("foo" in self.cmdctl.modules_spec)
  335. def test_check_config_handler(self):
  336. answer = self.cmdctl.config_handler({'non-exist': 123})
  337. self._check_answer(answer, 1, 'unknown config item: non-exist')
  338. old_env = os.environ
  339. os.environ['B10_FROM_SOURCE'] = '../'
  340. self._check_config(self.cmdctl)
  341. os.environ = old_env
  342. answer = self.cmdctl.config_handler({'key_file': '/user/non-exist_folder'})
  343. self._check_answer(answer, 1, "'/user/non-exist_folder' does not exist")
  344. answer = self.cmdctl.config_handler({'cert_file': '/user/non-exist_folder'})
  345. self._check_answer(answer, 1, "'/user/non-exist_folder' does not exist")
  346. answer = self.cmdctl.config_handler({'accounts_file': '/user/non-exist_folder'})
  347. self._check_answer(answer, 1,
  348. "Invalid accounts file: [Errno 2] No such file or directory: '/user/non-exist_folder'")
  349. # Test with invalid accounts file
  350. file_name = 'tmp.account.file'
  351. temp_file = open(file_name, 'w')
  352. writer = csv.writer(temp_file)
  353. writer.writerow(['a', 'b'])
  354. temp_file.close()
  355. answer = self.cmdctl.config_handler({'accounts_file': file_name})
  356. self._check_answer(answer, 1, "Invalid accounts file: list index out of range")
  357. os.remove(file_name)
  358. def test_send_command(self):
  359. rcode, value = self.cmdctl.send_command('Cmdctl', 'print_settings', None)
  360. self.assertEqual(rcode, 0)
  361. class MySecureHTTPServer(SecureHTTPServer):
  362. def server_bind(self):
  363. pass
  364. class TestSecureHTTPServer(unittest.TestCase):
  365. def setUp(self):
  366. self.old_stdout = sys.stdout
  367. self.old_stderr = sys.stderr
  368. sys.stdout = open(os.devnull, 'w')
  369. sys.stderr = sys.stdout
  370. self.server = MySecureHTTPServer(('localhost', 8080),
  371. MySecureHTTPRequestHandler,
  372. MyCommandControl, verbose=True)
  373. def tearDown(self):
  374. # both sys.stdout and sys.stderr are the same, so closing one is
  375. # sufficient
  376. sys.stdout.close()
  377. sys.stdout = self.old_stdout
  378. sys.stderr = self.old_stderr
  379. def test_addr_in_use(self):
  380. server_one = None
  381. try:
  382. server_one = SecureHTTPServer(('localhost', 53531),
  383. MySecureHTTPRequestHandler,
  384. MyCommandControl)
  385. except CmdctlException:
  386. pass
  387. else:
  388. self.assertRaises(CmdctlException, SecureHTTPServer,
  389. ('localhost', 53531),
  390. MySecureHTTPRequestHandler, MyCommandControl)
  391. if server_one:
  392. server_one.server_close()
  393. def test_create_user_info(self):
  394. self.server._create_user_info('/local/not-exist')
  395. self.assertEqual(0, len(self.server._user_infos))
  396. self.server._create_user_info(SRC_FILE_PATH + 'cmdctl-accounts.csv')
  397. self.assertEqual(1, len(self.server._user_infos))
  398. self.assertTrue('root' in self.server._user_infos)
  399. # When the file is not changed calling _create_user_info() again
  400. # should have no effect. In order to test this, we overwrite the
  401. # user-infos that were just set and make sure it isn't touched by
  402. # the call (so make sure it isn't set to some empty value)
  403. fake_users_val = { 'notinfile': [] }
  404. self.server._user_infos = fake_users_val
  405. self.server._create_user_info(SRC_FILE_PATH + 'cmdctl-accounts.csv')
  406. self.assertEqual(fake_users_val, self.server._user_infos)
  407. def test_create_user_info_changing_file_time(self):
  408. self.assertEqual(0, len(self.server._user_infos))
  409. # Create a file
  410. accounts_file = BUILD_FILE_PATH + 'new_file.csv'
  411. with TmpTextFile(accounts_file, ['root,foo,bar']):
  412. self.server._create_user_info(accounts_file)
  413. self.assertEqual(1, len(self.server._user_infos))
  414. self.assertTrue('root' in self.server._user_infos)
  415. # Make sure re-reading is a noop if file was not modified
  416. fake_users_val = { 'notinfile': [] }
  417. self.server._user_infos = fake_users_val
  418. self.server._create_user_info(accounts_file)
  419. self.assertEqual(fake_users_val, self.server._user_infos)
  420. # Yes sleep sucks, but in this case we need it to check for
  421. # a changed mtime, not for some thread to do its work
  422. # (do we run these tests on systems with 1+ secs mtimes?)
  423. time.sleep(0.1)
  424. # create the file again, this time read should not be a noop
  425. with TmpTextFile(accounts_file, ['otherroot,foo,bar']):
  426. self.server._create_user_info(accounts_file)
  427. self.assertEqual(1, len(self.server._user_infos))
  428. self.assertTrue('otherroot' in self.server._user_infos)
  429. def test_create_user_info_changing_file_name(self):
  430. """
  431. Check that the accounts file is re-read if the file name is different
  432. """
  433. self.assertEqual(0, len(self.server._user_infos))
  434. # Create two files
  435. accounts_file1 = BUILD_FILE_PATH + 'new_file.csv'
  436. accounts_file2 = BUILD_FILE_PATH + 'new_file2.csv'
  437. with TmpTextFile(accounts_file2, ['otherroot,foo,bar']):
  438. with TmpTextFile(accounts_file1, ['root,foo,bar']):
  439. self.server._create_user_info(accounts_file1)
  440. self.assertEqual(1, len(self.server._user_infos))
  441. self.assertTrue('root' in self.server._user_infos)
  442. # Make sure re-reading is a noop if file was not modified
  443. fake_users_val = { 'notinfile': [] }
  444. self.server._user_infos = fake_users_val
  445. self.server._create_user_info(accounts_file1)
  446. self.assertEqual(fake_users_val, self.server._user_infos)
  447. # But a different file should be read
  448. self.server._create_user_info(accounts_file2)
  449. self.assertEqual(1, len(self.server._user_infos))
  450. self.assertTrue('otherroot' in self.server._user_infos)
  451. def test_create_user_info_nonexistent_file(self):
  452. # Even if there was data initially, if set to a nonexistent
  453. # file it should result in no users
  454. accounts_file = BUILD_FILE_PATH + 'new_file.csv'
  455. self.assertFalse(os.path.exists(accounts_file))
  456. fake_users_val = { 'notinfile': [] }
  457. self.server._user_infos = fake_users_val
  458. self.server._create_user_info(accounts_file)
  459. self.assertEqual({}, self.server._user_infos)
  460. # Should it now be created it should be read
  461. with TmpTextFile(accounts_file, ['root,foo,bar']):
  462. self.server._create_user_info(accounts_file)
  463. self.assertEqual(1, len(self.server._user_infos))
  464. self.assertTrue('root' in self.server._user_infos)
  465. def test_check_file(self):
  466. # Just some file that we know exists
  467. file_name = BUILD_FILE_PATH + 'cmdctl-keyfile.pem'
  468. check_file(file_name)
  469. with UnreadableFile(file_name):
  470. self.assertRaises(CmdctlException, check_file, file_name)
  471. self.assertRaises(CmdctlException, check_file, '/local/not-exist')
  472. self.assertRaises(CmdctlException, check_file, '/')
  473. def test_check_key_and_cert(self):
  474. keyfile = BUILD_FILE_PATH + 'cmdctl-keyfile.pem'
  475. certfile = BUILD_FILE_PATH + 'cmdctl-certfile.pem'
  476. # no exists
  477. self.assertRaises(CmdctlException, self.server._check_key_and_cert,
  478. keyfile, '/local/not-exist')
  479. self.assertRaises(CmdctlException, self.server._check_key_and_cert,
  480. '/local/not-exist', certfile)
  481. # not a file
  482. self.assertRaises(CmdctlException, self.server._check_key_and_cert,
  483. keyfile, '/')
  484. self.assertRaises(CmdctlException, self.server._check_key_and_cert,
  485. '/', certfile)
  486. # no read permission
  487. with UnreadableFile(certfile):
  488. self.assertRaises(CmdctlException,
  489. self.server._check_key_and_cert,
  490. keyfile, certfile)
  491. with UnreadableFile(keyfile):
  492. self.assertRaises(CmdctlException,
  493. self.server._check_key_and_cert,
  494. keyfile, certfile)
  495. # All OK (also happens to check the context code above works)
  496. self.server._check_key_and_cert(keyfile, certfile)
  497. def test_wrap_sock_in_ssl_context(self):
  498. sock = socket.socket()
  499. # Bad files should result in a socket.error raised by our own
  500. # code in the basic file checks
  501. self.assertRaises(socket.error,
  502. self.server._wrap_socket_in_ssl_context,
  503. sock,
  504. 'no_such_file', 'no_such_file')
  505. # Using a non-certificate file would cause an SSLError, which
  506. # is caught by our code which then raises a basic socket.error
  507. self.assertRaises(socket.error,
  508. self.server._wrap_socket_in_ssl_context,
  509. sock,
  510. BUILD_FILE_PATH + 'cmdctl.py',
  511. BUILD_FILE_PATH + 'cmdctl-certfile.pem')
  512. # Should succeed
  513. sock1 = socket.socket()
  514. ssl_sock = self.server._wrap_socket_in_ssl_context(sock1,
  515. BUILD_FILE_PATH + 'cmdctl-keyfile.pem',
  516. BUILD_FILE_PATH + 'cmdctl-certfile.pem')
  517. self.assertTrue(isinstance(ssl_sock, ssl.SSLSocket))
  518. # wrap_socket can also raise IOError, which should be caught and
  519. # handled like the other errors.
  520. # Force this by temporarily disabling our own file checks
  521. orig_check_func = self.server._check_key_and_cert
  522. try:
  523. self.server._check_key_and_cert = lambda x,y: None
  524. self.assertRaises(socket.error,
  525. self.server._wrap_socket_in_ssl_context,
  526. sock,
  527. 'no_such_file', 'no_such_file')
  528. finally:
  529. self.server._check_key_and_cert = orig_check_func
  530. class TestFuncNotInClass(unittest.TestCase):
  531. def test_check_port(self):
  532. self.assertRaises(OptionValueError, check_port, None, 'port', -1, None)
  533. self.assertRaises(OptionValueError, check_port, None, 'port', 65536, None)
  534. self.assertRaises(OptionValueError, check_addr, None, 'ipstr', 'a.b.d', None)
  535. self.assertRaises(OptionValueError, check_addr, None, 'ipstr', '1::0:a.b', None)
  536. if __name__== "__main__":
  537. isc.log.resetUnitTestRootLogger()
  538. unittest.main()