bindctl_test.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. # Copyright (C) 2009 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import unittest
  16. import isc.cc.data
  17. import os
  18. import io
  19. import sys
  20. import socket
  21. import http.client
  22. from isc.config.config_data import ConfigData, MultiConfigData
  23. from isc.config.module_spec import ModuleSpec
  24. from bindctl import cmdparse
  25. from bindctl import bindcmd
  26. from bindctl.moduleinfo import *
  27. from bindctl.exception import *
  28. try:
  29. from collections import OrderedDict
  30. except ImportError:
  31. from mycollections import OrderedDict
  32. class TestCmdLex(unittest.TestCase):
  33. def my_assert_raise(self, exception_type, cmd_line):
  34. self.assertRaises(exception_type, cmdparse.BindCmdParse, cmd_line)
  35. def testCommandWithoutParameter(self):
  36. cmd = cmdparse.BindCmdParse("zone add")
  37. assert cmd.module == "zone"
  38. assert cmd.command == "add"
  39. self.assertEqual(len(cmd.params), 0)
  40. def testCommandWithParameters(self):
  41. lines = {"zone add zone_name = cnnic.cn, file = cnnic.cn.file master=1.1.1.1",
  42. "zone add zone_name = \"cnnic.cn\", file ='cnnic.cn.file' master=1.1.1.1 ",
  43. "zone add zone_name = 'cnnic.cn\", file ='cnnic.cn.file' master=1.1.1.1, " }
  44. for cmd_line in lines:
  45. cmd = cmdparse.BindCmdParse(cmd_line)
  46. assert cmd.module == "zone"
  47. assert cmd.command == "add"
  48. assert cmd.params["zone_name"] == "cnnic.cn"
  49. assert cmd.params["file"] == "cnnic.cn.file"
  50. assert cmd.params["master"] == '1.1.1.1'
  51. def testCommandWithParamters_2(self):
  52. '''Test whether the parameters in key=value can be parsed properly.'''
  53. cmd = cmdparse.BindCmdParse('zone cmd name = 1:34::2')
  54. self.assertEqual(cmd.params['name'], '1:34::2')
  55. cmd = cmdparse.BindCmdParse('zone cmd name = 1\"\'34**&2 value=44\"\'\"')
  56. self.assertEqual(cmd.params['name'], '1\"\'34**&2')
  57. self.assertEqual(cmd.params['value'], '44\"\'\"')
  58. cmd = cmdparse.BindCmdParse('zone cmd name = 1\"\'34**&2 ,value= 44\"\'\"')
  59. self.assertEqual(cmd.params['name'], '1\"\'34**&2')
  60. self.assertEqual(cmd.params['value'], '44\"\'\"')
  61. cmd = cmdparse.BindCmdParse('zone cmd name = 1\'34**&2value=44\"\'\" value = \"==============\'')
  62. self.assertEqual(cmd.params['name'], '1\'34**&2value=44\"\'\"')
  63. self.assertEqual(cmd.params['value'], '==============')
  64. cmd = cmdparse.BindCmdParse('zone cmd name = \"1234, 567890 \" value ==&*/')
  65. self.assertEqual(cmd.params['name'], '1234, 567890 ')
  66. self.assertEqual(cmd.params['value'], '=&*/')
  67. def testCommandWithListParam(self):
  68. cmd = cmdparse.BindCmdParse("zone set zone_name='cnnic.cn', master='1.1.1.1, 2.2.2.2'")
  69. assert cmd.params["master"] == '1.1.1.1, 2.2.2.2'
  70. def testCommandWithHelpParam(self):
  71. cmd = cmdparse.BindCmdParse("zone add help")
  72. assert cmd.params["help"] == "help"
  73. cmd = cmdparse.BindCmdParse("zone add help *&)&)*&&$#$^%")
  74. assert cmd.params["help"] == "help"
  75. self.assertEqual(len(cmd.params), 1)
  76. def testCmdModuleNameFormatError(self):
  77. self.my_assert_raise(CmdModuleNameFormatError, "zone=good")
  78. self.my_assert_raise(CmdModuleNameFormatError, "zo/ne")
  79. self.my_assert_raise(CmdModuleNameFormatError, "")
  80. self.my_assert_raise(CmdModuleNameFormatError, "=zone")
  81. self.my_assert_raise(CmdModuleNameFormatError, "zone,")
  82. def testCmdMissCommandNameFormatError(self):
  83. self.my_assert_raise(CmdMissCommandNameFormatError, "zone")
  84. self.my_assert_raise(CmdMissCommandNameFormatError, "zone ")
  85. self.my_assert_raise(CmdMissCommandNameFormatError, "help ")
  86. def testCmdCommandNameFormatError(self):
  87. self.my_assert_raise(CmdCommandNameFormatError, "zone =d")
  88. self.my_assert_raise(CmdCommandNameFormatError, "zone z=d")
  89. self.my_assert_raise(CmdCommandNameFormatError, "zone z-d ")
  90. self.my_assert_raise(CmdCommandNameFormatError, "zone zdd/")
  91. self.my_assert_raise(CmdCommandNameFormatError, "zone zdd/ \"")
  92. class TestCmdSyntax(unittest.TestCase):
  93. def _create_bindcmd(self):
  94. """Create one bindcmd"""
  95. tool = bindcmd.BindCmdInterpreter()
  96. string_spec = { 'item_type' : 'string',
  97. 'item_optional' : False,
  98. 'item_default' : ''}
  99. int_spec = { 'item_type' : 'integer',
  100. 'item_optional' : False,
  101. 'item_default' : 10}
  102. zone_file_param = ParamInfo(name = "zone_file", param_spec = string_spec)
  103. zone_name = ParamInfo(name = 'zone_name', param_spec = string_spec)
  104. load_cmd = CommandInfo(name = "load")
  105. load_cmd.add_param(zone_file_param)
  106. load_cmd.add_param(zone_name)
  107. param_master = ParamInfo(name = "master", optional = True, param_spec = string_spec)
  108. param_master = ParamInfo(name = "port", optional = True, param_spec = int_spec)
  109. param_allow_update = ParamInfo(name = "allow_update", optional = True, param_spec = string_spec)
  110. set_cmd = CommandInfo(name = "set")
  111. set_cmd.add_param(param_master)
  112. set_cmd.add_param(param_allow_update)
  113. set_cmd.add_param(zone_name)
  114. reload_all_cmd = CommandInfo(name = "reload_all")
  115. zone_module = ModuleInfo(name = "zone")
  116. zone_module.add_command(load_cmd)
  117. zone_module.add_command(set_cmd)
  118. zone_module.add_command(reload_all_cmd)
  119. tool.add_module_info(zone_module)
  120. return tool
  121. def setUp(self):
  122. self.bindcmd = self._create_bindcmd()
  123. def no_assert_raise(self, cmd_line):
  124. cmd = cmdparse.BindCmdParse(cmd_line)
  125. self.bindcmd._validate_cmd(cmd)
  126. def my_assert_raise(self, exception_type, cmd_line):
  127. cmd = cmdparse.BindCmdParse(cmd_line)
  128. self.assertRaises(exception_type, self.bindcmd._validate_cmd, cmd)
  129. def testValidateSuccess(self):
  130. self.no_assert_raise("zone load zone_file='cn' zone_name='cn'")
  131. self.no_assert_raise("zone load zone_file='cn', zone_name='cn', ")
  132. self.no_assert_raise("zone help ")
  133. self.no_assert_raise("zone load help ")
  134. self.no_assert_raise("zone help help='dd' ")
  135. self.no_assert_raise("zone set allow_update='1.1.1.1' zone_name='cn'")
  136. self.no_assert_raise("zone set zone_name='cn'")
  137. self.my_assert_raise(isc.cc.data.DataTypeError, "zone set zone_name ='cn', port='cn'")
  138. self.no_assert_raise("zone reload_all")
  139. def testCmdUnknownModuleSyntaxError(self):
  140. self.my_assert_raise(CmdUnknownModuleSyntaxError, "zoned d")
  141. self.my_assert_raise(CmdUnknownModuleSyntaxError, "dd dd ")
  142. def testCmdUnknownCmdSyntaxError(self):
  143. self.my_assert_raise(CmdUnknownCmdSyntaxError, "zone dd")
  144. def testCmdMissParamSyntaxError(self):
  145. self.my_assert_raise(CmdMissParamSyntaxError, "zone load zone_file='cn'")
  146. self.my_assert_raise(CmdMissParamSyntaxError, "zone load zone_name='cn'")
  147. self.my_assert_raise(CmdMissParamSyntaxError, "zone set allow_update='1.1.1.1'")
  148. self.my_assert_raise(CmdMissParamSyntaxError, "zone set ")
  149. def testCmdUnknownParamSyntaxError(self):
  150. self.my_assert_raise(CmdUnknownParamSyntaxError, "zone load zone_d='cn'")
  151. self.my_assert_raise(CmdUnknownParamSyntaxError, "zone reload_all zone_name = 'cn'")
  152. class TestModuleInfo(unittest.TestCase):
  153. def test_get_param_name_by_position(self):
  154. cmd = CommandInfo('command')
  155. cmd.add_param(ParamInfo('name'))
  156. cmd.add_param(ParamInfo('age'))
  157. cmd.add_param(ParamInfo('data', optional = True))
  158. cmd.add_param(ParamInfo('sex'))
  159. self.assertEqual('name', cmd.get_param_name_by_position(0, 2))
  160. self.assertEqual('age', cmd.get_param_name_by_position(1, 2))
  161. self.assertEqual('sex', cmd.get_param_name_by_position(2, 3))
  162. self.assertEqual('data', cmd.get_param_name_by_position(2, 4))
  163. self.assertEqual('data', cmd.get_param_name_by_position(2, 4))
  164. self.assertRaises(KeyError, cmd.get_param_name_by_position, 4, 4)
  165. class TestNameSequence(unittest.TestCase):
  166. """
  167. Test if the module/command/parameters is saved in the order creation
  168. """
  169. def _create_bindcmd(self):
  170. """Create one bindcmd"""
  171. self._cmd = CommandInfo(name = "load")
  172. self.module = ModuleInfo(name = "zone")
  173. self.tool = bindcmd.BindCmdInterpreter()
  174. for random_str in self.random_names:
  175. self._cmd.add_param(ParamInfo(name = random_str))
  176. self.module.add_command(CommandInfo(name = random_str))
  177. self.tool.add_module_info(ModuleInfo(name = random_str))
  178. def setUp(self):
  179. self.random_names = ['1erdfeDDWsd', '3fe', '2009erd', 'Fe231', 'tere142', 'rei8WD']
  180. self._create_bindcmd()
  181. def testSequence(self):
  182. param_names = self._cmd.get_param_names()
  183. cmd_names = self.module.get_command_names()
  184. module_names = self.tool.get_module_names()
  185. i = 0
  186. while i < len(self.random_names):
  187. assert self.random_names[i] == param_names[i+1]
  188. assert self.random_names[i] == cmd_names[i+1]
  189. assert self.random_names[i] == module_names[i+1]
  190. i = i + 1
  191. # tine class to fake a UIModuleCCSession, but only the config data
  192. # parts for the next set of tests
  193. class FakeCCSession(MultiConfigData):
  194. def __init__(self):
  195. self._local_changes = {}
  196. self._current_config = {}
  197. self._specifications = {}
  198. self.add_foo_spec()
  199. def add_foo_spec(self):
  200. spec = { "module_name": "foo",
  201. "config_data": [
  202. { "item_name": "an_int",
  203. "item_type": "integer",
  204. "item_optional": False,
  205. "item_default": 1
  206. },
  207. { "item_name": "a_list",
  208. "item_type": "list",
  209. "item_optional": False,
  210. "item_default": [],
  211. "list_item_spec":
  212. { "item_name": "a_string",
  213. "item_type": "string",
  214. "item_optional": False,
  215. "item_default": "bar"
  216. }
  217. }
  218. ]
  219. }
  220. self.set_specification(ModuleSpec(spec))
  221. # fake socket
  222. class FakeSocket():
  223. def __init__(self):
  224. self.run = True
  225. def connect(self, to):
  226. if not self.run:
  227. raise socket.error
  228. def close(self):
  229. self.run = False
  230. def send(self, data):
  231. if not self.run:
  232. raise socket.error
  233. return len(data)
  234. def makefile(self, type):
  235. return self
  236. def sendall(self, data):
  237. if not self.run:
  238. raise socket.error
  239. return len(data)
  240. class TestConfigCommands(unittest.TestCase):
  241. def setUp(self):
  242. self.tool = bindcmd.BindCmdInterpreter()
  243. mod_info = ModuleInfo(name = "foo")
  244. self.tool.add_module_info(mod_info)
  245. self.tool.config_data = FakeCCSession()
  246. self.stdout_backup = sys.stdout
  247. def test_run(self):
  248. def login_to_cmdctl():
  249. return True
  250. def cmd_loop():
  251. self.tool._send_message("/module_spec", None)
  252. self.tool.login_to_cmdctl = login_to_cmdctl
  253. # rewrite cmdloop() to avoid interactive mode
  254. self.tool.cmdloop = cmd_loop
  255. self.tool.conn.sock = FakeSocket()
  256. self.tool.conn.close()
  257. # validate log message for socket.err
  258. socket_err_output = io.StringIO()
  259. sys.stdout = socket_err_output
  260. self.assertRaises(None, self.tool.run())
  261. self.assertEqual("Fail to send request, the connection is closed\n",
  262. socket_err_output.getvalue())
  263. socket_err_output.close()
  264. # validate log messae for http.client.CannotSendRequest
  265. cannot_send_output = io.StringIO()
  266. sys.stdout = cannot_send_output
  267. self.assertRaises(None, self.tool.run())
  268. self.assertEqual("Can not send request, the connection is busy\n",
  269. cannot_send_output.getvalue())
  270. cannot_send_output.close()
  271. def test_apply_cfg_command_int(self):
  272. self.tool.location = '/'
  273. self.assertEqual((1, MultiConfigData.DEFAULT),
  274. self.tool.config_data.get_value("/foo/an_int"))
  275. cmd = cmdparse.BindCmdParse("config set identifier=\"foo/an_int\" value=\"5\"")
  276. self.tool.apply_config_cmd(cmd)
  277. self.assertEqual((5, MultiConfigData.LOCAL),
  278. self.tool.config_data.get_value("/foo/an_int"))
  279. # this should raise a NotFoundError
  280. cmd = cmdparse.BindCmdParse("config set identifier=\"foo/bar\" value=\"[]\"")
  281. self.assertRaises(isc.cc.data.DataNotFoundError, self.tool.apply_config_cmd, cmd)
  282. # this should raise a TypeError
  283. cmd = cmdparse.BindCmdParse("config set identifier=\"foo/an_int\" value=\"[]\"")
  284. self.assertRaises(isc.cc.data.DataTypeError, self.tool.apply_config_cmd, cmd)
  285. # this is a very specific one for use with a set of list tests
  286. # to try out the flexibility of the parser (only in the next test)
  287. def clt(self, full_cmd_string, item_value):
  288. cmd = cmdparse.BindCmdParse(full_cmd_string)
  289. self.tool.apply_config_cmd(cmd)
  290. self.assertEqual(([item_value], MultiConfigData.LOCAL),
  291. self.tool.config_data.get_value("/foo/a_list"))
  292. def test_apply_cfg_command_list(self):
  293. self.tool.location = '/'
  294. self.assertEqual(([], MultiConfigData.DEFAULT),
  295. self.tool.config_data.get_value("/foo/a_list"))
  296. self.clt("config set identifier=\"foo/a_list\" value=[\"a\"]", "a")
  297. self.clt("config set identifier=\"foo/a_list\" value =[\"b\"]", "b")
  298. self.clt("config set identifier=\"foo/a_list\" value= [\"c\"]", "c")
  299. self.clt("config set identifier=\"foo/a_list\" value = [\"d\"]", "d")
  300. self.clt("config set identifier =\"foo/a_list\" value=[\"e\"]", "e")
  301. self.clt("config set identifier= \"foo/a_list\" value=[\"f\"]", "f")
  302. self.clt("config set identifier = \"foo/a_list\" value=[\"g\"]", "g")
  303. self.clt("config set identifier = \"foo/a_list\" value = [\"h\"]", "h")
  304. self.clt("config set identifier = \"foo/a_list\" value=[\"i\" ]", "i")
  305. self.clt("config set identifier = \"foo/a_list\" value=[ \"j\"]", "j")
  306. self.clt("config set identifier = \"foo/a_list\" value=[ \"k\" ]", "k")
  307. # this should raise a TypeError
  308. cmd = cmdparse.BindCmdParse("config set identifier=\"foo/a_list\" value=\"a\"")
  309. self.assertRaises(isc.cc.data.DataTypeError, self.tool.apply_config_cmd, cmd)
  310. cmd = cmdparse.BindCmdParse("config set identifier=\"foo/a_list\" value=[1]")
  311. self.assertRaises(isc.cc.data.DataTypeError, self.tool.apply_config_cmd, cmd)
  312. def tearDown(self):
  313. sys.stdout = self.stdout_backup
  314. class FakeBindCmdInterpreter(bindcmd.BindCmdInterpreter):
  315. def __init__(self):
  316. pass
  317. class TestBindCmdInterpreter(unittest.TestCase):
  318. def _create_invalid_csv_file(self, csvfilename):
  319. import csv
  320. csvfile = open(csvfilename, 'w')
  321. writer = csv.writer(csvfile)
  322. writer.writerow(['name1'])
  323. writer.writerow(['name2'])
  324. csvfile.close()
  325. def test_get_saved_user_info(self):
  326. old_stdout = sys.stdout
  327. sys.stdout = open(os.devnull, 'w')
  328. cmd = FakeBindCmdInterpreter()
  329. users = cmd._get_saved_user_info('/notexist', 'cvs_file.cvs')
  330. self.assertEqual([], users)
  331. csvfilename = 'csv_file.csv'
  332. self._create_invalid_csv_file(csvfilename)
  333. users = cmd._get_saved_user_info('./', csvfilename)
  334. self.assertEqual([], users)
  335. os.remove(csvfilename)
  336. sys.stdout = old_stdout
  337. if __name__== "__main__":
  338. unittest.main()