bindctl_test.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705
  1. # Copyright (C) 2009 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import unittest
  16. import isc.cc.data
  17. import os
  18. import io
  19. import errno
  20. import sys
  21. import socket
  22. import ssl
  23. import http.client
  24. import pwd
  25. import getpass
  26. import re
  27. import json
  28. from optparse import OptionParser
  29. from isc.config.config_data import ConfigData, MultiConfigData
  30. from isc.config.module_spec import ModuleSpec
  31. from isc.testutils.parse_args import TestOptParser, OptsError
  32. from bindctl_main import set_bindctl_options
  33. from bindctl import cmdparse
  34. from bindctl import bindcmd
  35. from bindctl.moduleinfo import *
  36. from bindctl.exception import *
  37. try:
  38. from collections import OrderedDict
  39. except ImportError:
  40. from mycollections import OrderedDict
  41. class TestCmdLex(unittest.TestCase):
  42. def my_assert_raise(self, exception_type, cmd_line):
  43. self.assertRaises(exception_type, cmdparse.BindCmdParser, cmd_line)
  44. def testCommandWithoutParameter(self):
  45. cmd_parser = cmdparse.BindCmdParser("zone add")
  46. assert cmd_parser.module == "zone"
  47. assert cmd_parser.command == "add"
  48. self.assertEqual(len(cmd_parser.params), 0)
  49. def testCommandWithParameters(self):
  50. lines = {"zone add zone_name = cnnic.cn, file = cnnic.cn.file master=1.1.1.1",
  51. "zone add zone_name = \"cnnic.cn\", file ='cnnic.cn.file' master=1.1.1.1 ",
  52. "zone add zone_name = 'cnnic.cn\", file ='cnnic.cn.file' master=1.1.1.1, " }
  53. for cmd_line in lines:
  54. cmd_parser = cmdparse.BindCmdParser(cmd_line)
  55. assert cmd_parser.module == "zone"
  56. assert cmd_parser.command == "add"
  57. assert cmd_parser.params["zone_name"] == "cnnic.cn"
  58. assert cmd_parser.params["file"] == "cnnic.cn.file"
  59. assert cmd_parser.params["master"] == '1.1.1.1'
  60. def testCommandWithParamters_2(self):
  61. '''Test whether the parameters in key=value can be parsed properly.'''
  62. cmd_parser = cmdparse.BindCmdParser('zone cmd name = 1:34::2')
  63. self.assertEqual(cmd_parser.params['name'], '1:34::2')
  64. cmd_parser = cmdparse.BindCmdParser('zone cmd name = 1\"\'34**&2'
  65. ' value=44\"\'\"')
  66. self.assertEqual(cmd_parser.params['name'], '1\"\'34**&2')
  67. self.assertEqual(cmd_parser.params['value'], '44\"\'\"')
  68. cmd_parser = cmdparse.BindCmdParser('zone cmd name = 1\"\'34**&2'
  69. ',value= 44\"\'\"')
  70. self.assertEqual(cmd_parser.params['name'], '1\"\'34**&2')
  71. self.assertEqual(cmd_parser.params['value'], '44\"\'\"')
  72. cmd_parser = cmdparse.BindCmdParser('zone cmd name = 1\'34**&2'
  73. 'value=44\"\'\" value = '
  74. '\"==============\'')
  75. self.assertEqual(cmd_parser.params['name'], '1\'34**&2value=44\"\'\"')
  76. self.assertEqual(cmd_parser.params['value'], '==============')
  77. cmd_parser = cmdparse.BindCmdParser('zone cmd name = \"1234, '
  78. '567890 \" value ==&*/')
  79. self.assertEqual(cmd_parser.params['name'], '1234, 567890 ')
  80. self.assertEqual(cmd_parser.params['value'], '=&*/')
  81. def testCommandWithListParam(self):
  82. cmd_parser = cmdparse.BindCmdParser("zone set zone_name='cnnic.cn', "
  83. "master='1.1.1.1, 2.2.2.2'")
  84. assert cmd_parser.params["master"] == '1.1.1.1, 2.2.2.2'
  85. def testCommandWithHelpParam(self):
  86. cmd_parser = cmdparse.BindCmdParser("zone add help")
  87. assert cmd_parser.params["help"] == "help"
  88. cmd_parser = cmdparse.BindCmdParser("zone add help *&)&)*&&$#$^%")
  89. assert cmd_parser.params["help"] == "help"
  90. self.assertEqual(len(cmd_parser.params), 1)
  91. def testCmdModuleNameFormatError(self):
  92. self.my_assert_raise(CmdModuleNameFormatError, "zone=good")
  93. self.my_assert_raise(CmdModuleNameFormatError, "zo/ne")
  94. self.my_assert_raise(CmdModuleNameFormatError, "")
  95. self.my_assert_raise(CmdModuleNameFormatError, "=zone")
  96. self.my_assert_raise(CmdModuleNameFormatError, "zone,")
  97. def testCmdMissCommandNameFormatError(self):
  98. self.my_assert_raise(CmdMissCommandNameFormatError, "zone")
  99. self.my_assert_raise(CmdMissCommandNameFormatError, "zone ")
  100. self.my_assert_raise(CmdMissCommandNameFormatError, "help ")
  101. def testCmdCommandNameFormatError(self):
  102. self.my_assert_raise(CmdCommandNameFormatError, "zone =d")
  103. self.my_assert_raise(CmdCommandNameFormatError, "zone z=d")
  104. self.my_assert_raise(CmdCommandNameFormatError, "zone z-d ")
  105. self.my_assert_raise(CmdCommandNameFormatError, "zone zdd/")
  106. self.my_assert_raise(CmdCommandNameFormatError, "zone zdd/ \"")
  107. class TestCmdSyntax(unittest.TestCase):
  108. def _create_bindcmd(self):
  109. """Create one bindcmd"""
  110. tool = bindcmd.BindCmdInterpreter()
  111. string_spec = { 'item_type' : 'string',
  112. 'item_optional' : False,
  113. 'item_default' : ''}
  114. int_spec = { 'item_type' : 'integer',
  115. 'item_optional' : False,
  116. 'item_default' : 10}
  117. zone_file_param = ParamInfo(name = "zone_file",
  118. param_spec = string_spec)
  119. zone_name = ParamInfo(name = 'zone_name', param_spec = string_spec)
  120. load_cmd = CommandInfo(name = "load")
  121. load_cmd.add_param(zone_file_param)
  122. load_cmd.add_param(zone_name)
  123. param_master = ParamInfo(name = "master", optional = True,
  124. param_spec = string_spec)
  125. param_master = ParamInfo(name = "port", optional = True,
  126. param_spec = int_spec)
  127. param_allow_update = ParamInfo(name = "allow_update",
  128. optional = True,
  129. param_spec = string_spec)
  130. set_cmd = CommandInfo(name = "set")
  131. set_cmd.add_param(param_master)
  132. set_cmd.add_param(param_allow_update)
  133. set_cmd.add_param(zone_name)
  134. reload_all_cmd = CommandInfo(name = "reload_all")
  135. zone_module = ModuleInfo(name = "zone")
  136. zone_module.add_command(load_cmd)
  137. zone_module.add_command(set_cmd)
  138. zone_module.add_command(reload_all_cmd)
  139. tool.add_module_info(zone_module)
  140. return tool
  141. def setUp(self):
  142. self.bindcmd = self._create_bindcmd()
  143. def no_assert_raise(self, cmd_line):
  144. cmd_parser = cmdparse.BindCmdParser(cmd_line)
  145. self.bindcmd._validate_cmd(cmd_parser)
  146. def my_assert_raise(self, exception_type, cmd_line):
  147. cmd_parser = cmdparse.BindCmdParser(cmd_line)
  148. self.assertRaises(exception_type, self.bindcmd._validate_cmd,
  149. cmd_parser)
  150. def testValidateSuccess(self):
  151. self.no_assert_raise("zone load zone_file='cn' zone_name='cn'")
  152. self.no_assert_raise("zone load zone_file='cn', zone_name='cn', ")
  153. self.no_assert_raise("zone help ")
  154. self.no_assert_raise("zone load help ")
  155. self.no_assert_raise("zone help help='dd' ")
  156. self.no_assert_raise("zone set allow_update='1.1.1.1' zone_name='cn'")
  157. self.no_assert_raise("zone set zone_name='cn'")
  158. self.my_assert_raise(isc.cc.data.DataTypeError,
  159. "zone set zone_name ='cn', port='cn'")
  160. self.no_assert_raise("zone reload_all")
  161. def testCmdUnknownModuleSyntaxError(self):
  162. self.my_assert_raise(CmdUnknownModuleSyntaxError, "zoned d")
  163. self.my_assert_raise(CmdUnknownModuleSyntaxError, "dd dd ")
  164. def testCmdUnknownCmdSyntaxError(self):
  165. self.my_assert_raise(CmdUnknownCmdSyntaxError, "zone dd")
  166. def testCmdMissParamSyntaxError(self):
  167. self.my_assert_raise(CmdMissParamSyntaxError,
  168. "zone load zone_file='cn'")
  169. self.my_assert_raise(CmdMissParamSyntaxError,
  170. "zone load zone_name='cn'")
  171. self.my_assert_raise(CmdMissParamSyntaxError,
  172. "zone set allow_update='1.1.1.1'")
  173. self.my_assert_raise(CmdMissParamSyntaxError,
  174. "zone set ")
  175. def testCmdUnknownParamSyntaxError(self):
  176. self.my_assert_raise(CmdUnknownParamSyntaxError,
  177. "zone load zone_d='cn'")
  178. self.my_assert_raise(CmdUnknownParamSyntaxError,
  179. "zone reload_all zone_name = 'cn'")
  180. self.my_assert_raise(CmdUnknownParamSyntaxError,
  181. "zone help a b c")
  182. class TestModuleInfo(unittest.TestCase):
  183. def test_get_param_name_by_position(self):
  184. cmd = CommandInfo('command')
  185. cmd.add_param(ParamInfo('name'))
  186. cmd.add_param(ParamInfo('age'))
  187. cmd.add_param(ParamInfo('data', optional = True))
  188. cmd.add_param(ParamInfo('sex'))
  189. self.assertEqual('name', cmd.get_param_name_by_position(0, 2))
  190. self.assertEqual('age', cmd.get_param_name_by_position(1, 2))
  191. self.assertEqual('sex', cmd.get_param_name_by_position(2, 3))
  192. self.assertEqual('data', cmd.get_param_name_by_position(2, 4))
  193. self.assertEqual('data', cmd.get_param_name_by_position(2, 4))
  194. self.assertRaises(KeyError, cmd.get_param_name_by_position, 4, 4)
  195. class TestNameSequence(unittest.TestCase):
  196. """
  197. Test if the module/command/parameters is saved in the order creation
  198. """
  199. def _create_bindcmd(self):
  200. """Create one bindcmd"""
  201. self._cmd = CommandInfo(name = "load")
  202. self.module = ModuleInfo(name = "zone")
  203. self.tool = bindcmd.BindCmdInterpreter()
  204. for random_str in self.random_names:
  205. self._cmd.add_param(ParamInfo(name = random_str))
  206. self.module.add_command(CommandInfo(name = random_str))
  207. self.tool.add_module_info(ModuleInfo(name = random_str))
  208. def setUp(self):
  209. self.random_names = ['1erdfeDDWsd', '3fe', '2009erd',
  210. 'Fe231', 'tere142', 'rei8WD']
  211. self._create_bindcmd()
  212. def testSequence(self):
  213. param_names = self._cmd.get_param_names()
  214. cmd_names = self.module.get_command_names()
  215. module_names = self.tool.get_module_names()
  216. i = 0
  217. while i < len(self.random_names):
  218. assert self.random_names[i] == param_names[i+1]
  219. assert self.random_names[i] == cmd_names[i+1]
  220. assert self.random_names[i] == module_names[i+1]
  221. i = i + 1
  222. # tine class to fake a UIModuleCCSession, but only the config data
  223. # parts for the next set of tests
  224. class FakeCCSession(MultiConfigData):
  225. def __init__(self):
  226. self._local_changes = {}
  227. self._current_config = {}
  228. self._specifications = {}
  229. self.add_foo_spec()
  230. def add_foo_spec(self):
  231. spec = { "module_name": "foo",
  232. "config_data": [
  233. { "item_name": "an_int",
  234. "item_type": "integer",
  235. "item_optional": False,
  236. "item_default": 1
  237. },
  238. { "item_name": "a_list",
  239. "item_type": "list",
  240. "item_optional": False,
  241. "item_default": [],
  242. "list_item_spec":
  243. { "item_name": "a_string",
  244. "item_type": "string",
  245. "item_optional": False,
  246. "item_default": "bar"
  247. }
  248. }
  249. ]
  250. }
  251. self.set_specification(ModuleSpec(spec))
  252. # fake socket
  253. class FakeSocket():
  254. def __init__(self):
  255. self.run = True
  256. def connect(self, to):
  257. if not self.run:
  258. raise socket.error
  259. def close(self):
  260. self.run = False
  261. def send(self, data):
  262. if not self.run:
  263. raise socket.error
  264. return len(data)
  265. def makefile(self, type):
  266. return self
  267. def sendall(self, data):
  268. if not self.run:
  269. raise socket.error
  270. return len(data)
  271. class TestConfigCommands(unittest.TestCase):
  272. def setUp(self):
  273. self.tool = bindcmd.BindCmdInterpreter()
  274. mod_info = ModuleInfo(name = "foo")
  275. self.tool.add_module_info(mod_info)
  276. self.tool.config_data = FakeCCSession()
  277. self.stdout_backup = sys.stdout
  278. self.printed_messages = []
  279. self.tool._print = self.store_print
  280. def test_precmd(self):
  281. def update_all_modules_info():
  282. raise socket.error
  283. def precmd(line):
  284. self.tool.precmd(line)
  285. self.tool._update_all_modules_info = update_all_modules_info
  286. # If line is equals to 'EOF', _update_all_modules_info()
  287. # shouldn't be called
  288. precmd('EOF')
  289. self.assertRaises(socket.error, precmd, 'continue')
  290. def store_print(self, *args):
  291. '''Method to override _print in BindCmdInterpreter.
  292. Instead of printing the values, appends the argument tuple
  293. to the list in self.printed_messages'''
  294. self.printed_messages.append(" ".join(map(str, args)))
  295. def __check_printed_message(self, expected_message, printed_message):
  296. self.assertIsNotNone(re.match(expected_message, printed_message),
  297. "Printed message '" + printed_message +
  298. "' does not match '" + expected_message + "'")
  299. def __check_printed_messages(self, expected_messages):
  300. '''Helper test function to check the printed messages against a list
  301. of regexps'''
  302. self.assertEqual(len(expected_messages), len(self.printed_messages))
  303. for _ in map(self.__check_printed_message,
  304. expected_messages,
  305. self.printed_messages):
  306. pass
  307. def test_try_login(self):
  308. # Make sure __try_login raises the correct exception
  309. # upon failure of either send_POST or the read() on the
  310. # response
  311. orig_send_POST = self.tool.send_POST
  312. expected_printed_messages = []
  313. try:
  314. def send_POST_raiseImmediately(self, params):
  315. raise socket.error("test error")
  316. self.tool.send_POST = send_POST_raiseImmediately
  317. self.assertRaises(FailToLogin, self.tool._try_login, "foo", "bar")
  318. expected_printed_messages.append(
  319. 'Socket error while sending login information: test error')
  320. self.__check_printed_messages(expected_printed_messages)
  321. def create_send_POST_raiseOnRead(exception):
  322. '''Create a replacement send_POST() method that raises
  323. the given exception when read() is called on the value
  324. returned from send_POST()'''
  325. def send_POST_raiseOnRead(self, params):
  326. class MyResponse:
  327. def read(self):
  328. raise exception
  329. return MyResponse()
  330. return send_POST_raiseOnRead
  331. # basic socket error
  332. self.tool.send_POST =\
  333. create_send_POST_raiseOnRead(socket.error("read error"))
  334. self.assertRaises(FailToLogin, self.tool._try_login, "foo", "bar")
  335. expected_printed_messages.append(
  336. 'Socket error while sending login information: read error')
  337. self.__check_printed_messages(expected_printed_messages)
  338. # connection reset
  339. exc = socket.error("connection reset")
  340. exc.errno = errno.ECONNRESET
  341. self.tool.send_POST =\
  342. create_send_POST_raiseOnRead(exc)
  343. self.assertRaises(FailToLogin, self.tool._try_login, "foo", "bar")
  344. expected_printed_messages.append(
  345. 'Socket error while sending login information: '
  346. 'connection reset')
  347. expected_printed_messages.append(
  348. 'Please check the logs of b10-cmdctl, there may be a '
  349. 'problem accepting SSL connections, such as a permission '
  350. 'problem on the server certificate file.'
  351. )
  352. self.__check_printed_messages(expected_printed_messages)
  353. # 'normal' SSL error
  354. exc = ssl.SSLError()
  355. self.tool.send_POST =\
  356. create_send_POST_raiseOnRead(exc)
  357. self.assertRaises(FailToLogin, self.tool._try_login, "foo", "bar")
  358. expected_printed_messages.append(
  359. 'SSL error while sending login information: .*')
  360. self.__check_printed_messages(expected_printed_messages)
  361. # 'EOF' SSL error
  362. exc = ssl.SSLError()
  363. exc.errno = ssl.SSL_ERROR_EOF
  364. self.tool.send_POST =\
  365. create_send_POST_raiseOnRead(exc)
  366. self.assertRaises(FailToLogin, self.tool._try_login, "foo", "bar")
  367. expected_printed_messages.append(
  368. 'SSL error while sending login information: .*')
  369. expected_printed_messages.append(
  370. 'Please check the logs of b10-cmdctl, there may be a '
  371. 'problem accepting SSL connections, such as a permission '
  372. 'problem on the server certificate file.'
  373. )
  374. self.__check_printed_messages(expected_printed_messages)
  375. # any other exception should be passed through
  376. self.tool.send_POST =\
  377. create_send_POST_raiseOnRead(ImportError())
  378. self.assertRaises(ImportError, self.tool._try_login, "foo", "bar")
  379. self.__check_printed_messages(expected_printed_messages)
  380. finally:
  381. self.tool.send_POST = orig_send_POST
  382. def test_try_login_calls_cmdctl(self):
  383. # Make sure _try_login() makes the right API call to cmdctl.
  384. orig_conn = self.tool.conn
  385. try:
  386. class MyConn:
  387. def __init__(self):
  388. self.method = None
  389. self.url = None
  390. self.param = None
  391. self.headers = None
  392. def request(self, method, url, param, headers):
  393. self.method = method
  394. self.url = url
  395. self.param = param
  396. self.headers = headers
  397. def getresponse(self):
  398. class MyResponse:
  399. def __init__(self):
  400. self.status = http.client.OK
  401. def read(self):
  402. class MyData:
  403. def decode(self):
  404. return json.dumps(True)
  405. return MyData()
  406. return MyResponse()
  407. self.tool.conn = MyConn()
  408. self.assertTrue(self.tool._try_login('user32', 'pass64'))
  409. self.assertEqual(self.tool.conn.method, 'POST')
  410. self.assertEqual(self.tool.conn.url, '/login')
  411. self.assertEqual(json.loads(self.tool.conn.param),
  412. {"password": "pass64", "username": "user32"})
  413. self.assertIn('cookie', self.tool.conn.headers)
  414. finally:
  415. self.tool.conn = orig_conn
  416. def test_run(self):
  417. def login_to_cmdctl():
  418. return True
  419. def cmd_loop():
  420. self.tool._send_message("/module_spec", None)
  421. self.tool.login_to_cmdctl = login_to_cmdctl
  422. # rewrite cmdloop() to avoid interactive mode
  423. self.tool.cmdloop = cmd_loop
  424. self.tool.conn.sock = FakeSocket()
  425. self.tool.conn.sock.close()
  426. self.assertEqual(1, self.tool.run())
  427. # First few lines may be some kind of heading, or a warning that
  428. # Python readline is unavailable, so we do a sub-string check.
  429. self.assertIn("Failed to send request, the connection is closed",
  430. self.printed_messages)
  431. self.assertEqual(1, len(self.printed_messages))
  432. # validate log message for http.client.CannotSendRequest
  433. self.assertEqual(1, self.tool.run())
  434. # First few lines may be some kind of heading, or a warning that
  435. # Python readline is unavailable, so we do a sub-string check.
  436. self.assertIn("Can not send request, the connection is busy",
  437. self.printed_messages)
  438. self.assertEqual(2, len(self.printed_messages))
  439. def test_apply_cfg_command_int(self):
  440. self.tool.location = '/'
  441. self.assertEqual((1, MultiConfigData.DEFAULT),
  442. self.tool.config_data.get_value("/foo/an_int"))
  443. cmd_parser = cmdparse.BindCmdParser('config set identifier='
  444. '"foo/an_int" value="5"')
  445. self.tool.apply_config_cmd(cmd_parser)
  446. self.assertEqual((5, MultiConfigData.LOCAL),
  447. self.tool.config_data.get_value("/foo/an_int"))
  448. cmd_parser = cmdparse.BindCmdParser('config unset identifier='
  449. '"foo/an_int"')
  450. self.tool.apply_config_cmd(cmd_parser)
  451. self.assertEqual((1, MultiConfigData.DEFAULT),
  452. self.tool.config_data.get_value("/foo/an_int"))
  453. # this should raise a NotFoundError
  454. cmd_parser = cmdparse.BindCmdParser('config set identifier='
  455. '"foo/bar" value="[]"')
  456. self.assertRaises(isc.cc.data.DataNotFoundError,
  457. self.tool.apply_config_cmd, cmd_parser)
  458. cmd_parser = cmdparse.BindCmdParser('config unset identifier='
  459. '"foo/bar"')
  460. self.assertRaises(isc.cc.data.DataNotFoundError,
  461. self.tool.apply_config_cmd, cmd_parser)
  462. # this should raise a TypeError
  463. cmd_parser = cmdparse.BindCmdParser('config set identifier='
  464. '"foo/an_int" value="[]"')
  465. self.assertRaises(isc.cc.data.DataTypeError,
  466. self.tool.apply_config_cmd, cmd_parser)
  467. # this is a very specific one for use with a set of list tests
  468. # to try out the flexibility of the parser (only in the next test)
  469. def clt(self, full_cmd_string, item_value):
  470. cmd_parser = cmdparse.BindCmdParser(full_cmd_string)
  471. self.tool.apply_config_cmd(cmd_parser)
  472. self.assertEqual(([item_value], MultiConfigData.LOCAL),
  473. self.tool.config_data.get_value("/foo/a_list"))
  474. def test_apply_cfg_command_list(self):
  475. self.tool.location = '/'
  476. self.assertEqual(([], MultiConfigData.DEFAULT),
  477. self.tool.config_data.get_value("/foo/a_list"))
  478. self.clt("config set identifier=\"foo/a_list\" value=[\"a\"]", "a")
  479. self.clt("config set identifier=\"foo/a_list\" value =[\"b\"]", "b")
  480. self.clt("config set identifier=\"foo/a_list\" value= [\"c\"]", "c")
  481. self.clt("config set identifier=\"foo/a_list\" value = [\"d\"]", "d")
  482. self.clt("config set identifier =\"foo/a_list\" value=[\"e\"]", "e")
  483. self.clt("config set identifier= \"foo/a_list\" value=[\"f\"]", "f")
  484. self.clt("config set identifier = \"foo/a_list\" value=[\"g\"]", "g")
  485. self.clt("config set identifier = \"foo/a_list\" value = [\"h\"]", "h")
  486. self.clt("config set identifier = \"foo/a_list\" value=[\"i\" ]", "i")
  487. self.clt("config set identifier = \"foo/a_list\" value=[ \"j\"]", "j")
  488. self.clt("config set identifier = \"foo/a_list\" value=[ \"k\" ]", "k")
  489. # this should raise a TypeError
  490. cmd_parser = cmdparse.BindCmdParser('config set identifier='
  491. '"foo/a_list" value="a"')
  492. self.assertRaises(isc.cc.data.DataTypeError,
  493. self.tool.apply_config_cmd, cmd_parser)
  494. cmd_parser = cmdparse.BindCmdParser('config set identifier='
  495. '"foo/a_list" value=[1]')
  496. self.assertRaises(isc.cc.data.DataTypeError,
  497. self.tool.apply_config_cmd, cmd_parser)
  498. def tearDown(self):
  499. sys.stdout = self.stdout_backup
  500. def test_cmd_has_identifier_param(self):
  501. module = ModuleInfo(name="test_module")
  502. cmd = CommandInfo(name="command_with_identifier")
  503. param = ParamInfo(name=bindcmd.CFGITEM_IDENTIFIER_PARAM)
  504. cmd.add_param(param)
  505. module.add_command(cmd)
  506. cmd = CommandInfo(name="command_without_identifier")
  507. param = ParamInfo(name="some_argument")
  508. cmd.add_param(param)
  509. module.add_command(cmd)
  510. self.tool.add_module_info(module)
  511. cmd_parser = cmdparse.BindCmdParser('test_module '
  512. 'command_with_identifier')
  513. self.assertTrue(self.tool._cmd_has_identifier_param(cmd_parser))
  514. cmd_parser = cmdparse.BindCmdParser('test_module '
  515. 'command_without_identifier')
  516. self.assertFalse(self.tool._cmd_has_identifier_param(cmd_parser))
  517. cmd_parser = cmdparse.BindCmdParser('badmodule '
  518. 'command_without_identifier')
  519. self.assertFalse(self.tool._cmd_has_identifier_param(cmd_parser))
  520. def test_get_identifier_startswith(self):
  521. hints = self.tool._get_identifier_startswith("/")
  522. self.assertEqual(['foo/an_int', 'foo/a_list'], hints)
  523. hints = self.tool._get_identifier_startswith("/foo/an")
  524. self.assertEqual(['foo/an_int'], hints)
  525. hints = self.tool._get_identifier_startswith("/bar")
  526. self.assertEqual([], hints)
  527. class FakeBindCmdInterpreter(bindcmd.BindCmdInterpreter):
  528. def __init__(self):
  529. pass
  530. class TestBindCmdInterpreter(unittest.TestCase):
  531. def setUp(self):
  532. self.old_stdout = sys.stdout
  533. def tearDown(self):
  534. sys.stdout = self.old_stdout
  535. def _create_invalid_csv_file(self, csvfilename):
  536. import csv
  537. csvfile = open(csvfilename, 'w')
  538. writer = csv.writer(csvfile)
  539. writer.writerow(['name1'])
  540. writer.writerow(['name2'])
  541. csvfile.close()
  542. def test_csv_file_dir(self):
  543. # Checking default value
  544. home_dir = pwd.getpwnam(getpass.getuser()).pw_dir
  545. self.assertEqual(home_dir + os.sep + '.bind10' + os.sep,
  546. bindcmd.BindCmdInterpreter().csv_file_dir)
  547. new_csv_dir = '/something/different/'
  548. custom_cmd = bindcmd.BindCmdInterpreter(csv_file_dir=new_csv_dir)
  549. self.assertEqual(new_csv_dir, custom_cmd.csv_file_dir)
  550. def test_get_saved_user_info(self):
  551. with open(os.devnull, 'w') as f:
  552. sys.stdout = f
  553. cmd = bindcmd.BindCmdInterpreter()
  554. users = cmd._get_saved_user_info('/notexist', 'csv_file.csv')
  555. self.assertEqual([], users)
  556. csvfilename = 'csv_file.csv'
  557. self._create_invalid_csv_file(csvfilename)
  558. users = cmd._get_saved_user_info('./', csvfilename)
  559. self.assertEqual([], users)
  560. os.remove(csvfilename)
  561. class TestCommandLineOptions(unittest.TestCase):
  562. def setUp(self):
  563. self.parser = TestOptParser()
  564. set_bindctl_options(self.parser)
  565. def test_csv_file_dir(self):
  566. # by default the option is "undefined"
  567. (options, _) = self.parser.parse_args([])
  568. self.assertEqual(None, options.csv_file_dir)
  569. # specify the option, valid case.
  570. (options, _) = self.parser.parse_args(['--csv-file-dir', 'some_dir'])
  571. self.assertEqual('some_dir', options.csv_file_dir)
  572. # missing option arg; should trigger parser error.
  573. self.assertRaises(OptsError, self.parser.parse_args,
  574. ['--csv-file-dir'])
  575. if __name__== "__main__":
  576. unittest.main()