cfgmgr_test.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. # Copyright (C) 2010 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. #
  16. # Tests for the configuration manager module
  17. #
  18. import unittest
  19. import os
  20. from isc.config.cfgmgr import *
  21. from isc.config import config_data
  22. from unittest_fakesession import FakeModuleCCSession
  23. class TestConfigManagerData(unittest.TestCase):
  24. def setUp(self):
  25. self.data_path = os.environ['CONFIG_TESTDATA_PATH']
  26. self.writable_data_path = os.environ['CONFIG_WR_TESTDATA_PATH']
  27. self.config_manager_data = ConfigManagerData(self.writable_data_path)
  28. self.assert_(self.config_manager_data)
  29. def test_abs_file(self):
  30. """
  31. Test what happens if we give the config manager an absolute path.
  32. It shouldn't append the data path to it.
  33. """
  34. abs_path = self.data_path + os.sep + "b10-config-imaginary.db"
  35. data = ConfigManagerData(os.getcwd(), abs_path)
  36. self.assertEqual(abs_path, data.db_filename)
  37. self.assertEqual(self.data_path, data.data_path)
  38. def test_init(self):
  39. self.assertEqual(self.config_manager_data.data['version'],
  40. config_data.BIND10_CONFIG_DATA_VERSION)
  41. self.assertEqual(self.config_manager_data.data_path,
  42. self.writable_data_path)
  43. self.assertEqual(self.config_manager_data.db_filename,
  44. self.writable_data_path + os.sep + "b10-config.db")
  45. def test_read_from_file(self):
  46. ConfigManagerData.read_from_file(self.writable_data_path)
  47. self.assertRaises(ConfigManagerDataEmpty,
  48. ConfigManagerData.read_from_file,
  49. "doesnotexist")
  50. self.assertRaises(ConfigManagerDataReadError,
  51. ConfigManagerData.read_from_file,
  52. self.data_path, "b10-config-bad1.db")
  53. self.assertRaises(ConfigManagerDataReadError,
  54. ConfigManagerData.read_from_file,
  55. self.data_path, "b10-config-bad2.db")
  56. self.assertRaises(ConfigManagerDataReadError,
  57. ConfigManagerData.read_from_file,
  58. self.data_path, "b10-config-bad3.db")
  59. self.assertRaises(ConfigManagerDataReadError,
  60. ConfigManagerData.read_from_file,
  61. self.data_path, "b10-config-bad4.db")
  62. def test_write_to_file(self):
  63. output_file_name = "b10-config-write-test"
  64. self.config_manager_data.write_to_file(output_file_name)
  65. new_config = ConfigManagerData(self.data_path, output_file_name)
  66. self.assertEqual(self.config_manager_data, new_config)
  67. os.remove(output_file_name)
  68. def test_equality(self):
  69. # tests the __eq__ function. Equality is only defined
  70. # by equality of the .data element. If data_path or db_filename
  71. # are different, but the contents are the same, it's still
  72. # considered equal
  73. cfd1 = ConfigManagerData(self.data_path)
  74. cfd2 = ConfigManagerData(self.data_path)
  75. self.assertEqual(cfd1, cfd2)
  76. cfd2.data_path = "some/unknown/path"
  77. self.assertEqual(cfd1, cfd2)
  78. cfd2.db_filename = "bad_file.name"
  79. self.assertEqual(cfd1, cfd2)
  80. cfd2.data['test'] = { 'a': [ 1, 2, 3]}
  81. self.assertNotEqual(cfd1, cfd2)
  82. class TestConfigManager(unittest.TestCase):
  83. def setUp(self):
  84. self.data_path = os.environ['CONFIG_TESTDATA_PATH']
  85. self.writable_data_path = os.environ['CONFIG_WR_TESTDATA_PATH']
  86. self.fake_session = FakeModuleCCSession()
  87. self.cm = ConfigManager(self.writable_data_path,
  88. session=self.fake_session)
  89. self.name = "TestModule"
  90. self.spec = isc.config.module_spec_from_file(self.data_path + os.sep + "/spec2.spec")
  91. def test_paths(self):
  92. """
  93. Test data_path and database filename is passed trough to
  94. underlying ConfigManagerData.
  95. """
  96. cm = ConfigManager("/data/path", "filename", self.fake_session)
  97. self.assertEqual("/data/path/filename", cm.config.db_filename)
  98. # It should preserve it while reading
  99. cm.read_config()
  100. self.assertEqual("/data/path/filename", cm.config.db_filename)
  101. def test_init(self):
  102. self.assert_(self.cm.module_specs == {})
  103. self.assert_(self.cm.data_path == self.writable_data_path)
  104. self.assert_(self.cm.config != None)
  105. self.assert_(self.fake_session.has_subscription("ConfigManager"))
  106. self.assert_(self.fake_session.has_subscription("Boss", "ConfigManager"))
  107. self.assertFalse(self.cm.running)
  108. def test_notify_boss(self):
  109. self.cm.notify_boss()
  110. msg = self.fake_session.get_message("Boss", None)
  111. self.assert_(msg)
  112. # this one is actually wrong, but 'current status quo'
  113. self.assertEqual(msg, {"running": "configmanager"})
  114. def test_set_module_spec(self):
  115. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec1.spec")
  116. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  117. self.cm.set_module_spec(module_spec)
  118. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  119. def test_remove_module_spec(self):
  120. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec1.spec")
  121. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  122. self.cm.set_module_spec(module_spec)
  123. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  124. self.cm.remove_module_spec(module_spec.get_module_name())
  125. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  126. def test_get_module_spec(self):
  127. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec1.spec")
  128. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  129. self.cm.set_module_spec(module_spec)
  130. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  131. module_spec2 = self.cm.get_module_spec(module_spec.get_module_name())
  132. self.assertEqual(module_spec, module_spec2)
  133. def test_get_config_spec(self):
  134. config_spec = self.cm.get_config_spec()
  135. self.assertEqual(config_spec, {})
  136. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec1.spec")
  137. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  138. self.cm.set_module_spec(module_spec)
  139. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  140. config_spec = self.cm.get_config_spec()
  141. self.assertEqual(config_spec, { 'Spec1': None })
  142. self.cm.remove_module_spec('Spec1')
  143. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec2.spec")
  144. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  145. self.cm.set_module_spec(module_spec)
  146. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  147. config_spec = self.cm.get_config_spec()
  148. self.assertEqual(config_spec['Spec2'], module_spec.get_config_spec())
  149. config_spec = self.cm.get_config_spec('Spec2')
  150. self.assertEqual(config_spec['Spec2'], module_spec.get_config_spec())
  151. def test_get_commands_spec(self):
  152. commands_spec = self.cm.get_commands_spec()
  153. self.assertEqual(commands_spec, {})
  154. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec1.spec")
  155. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  156. self.cm.set_module_spec(module_spec)
  157. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  158. commands_spec = self.cm.get_commands_spec()
  159. self.assertEqual(commands_spec, { 'Spec1': None })
  160. self.cm.remove_module_spec('Spec1')
  161. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec2.spec")
  162. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  163. self.cm.set_module_spec(module_spec)
  164. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  165. commands_spec = self.cm.get_commands_spec()
  166. self.assertEqual(commands_spec['Spec2'], module_spec.get_commands_spec())
  167. commands_spec = self.cm.get_commands_spec('Spec2')
  168. self.assertEqual(commands_spec['Spec2'], module_spec.get_commands_spec())
  169. def test_read_config(self):
  170. self.assertEqual(self.cm.config.data, {'version': config_data.BIND10_CONFIG_DATA_VERSION})
  171. self.cm.read_config()
  172. # due to what get written, the value here is what the last set_config command in test_handle_msg does
  173. self.assertEqual(self.cm.config.data, {'TestModule': {'test': 125}, 'version': config_data.BIND10_CONFIG_DATA_VERSION})
  174. self.cm.data_path = "/no_such_path"
  175. self.cm.read_config()
  176. self.assertEqual(self.cm.config.data, {'version': config_data.BIND10_CONFIG_DATA_VERSION})
  177. def test_write_config(self):
  178. # tested in ConfigManagerData tests
  179. pass
  180. def _handle_msg_helper(self, msg, expected_answer):
  181. answer = self.cm.handle_msg(msg)
  182. self.assertEqual(expected_answer, answer)
  183. def test_handle_msg(self):
  184. self._handle_msg_helper({}, { 'result': [ 1, 'Unknown message format: {}']})
  185. self._handle_msg_helper("", { 'result': [ 1, 'Unknown message format: ']})
  186. self._handle_msg_helper({ "command": [ "badcommand" ] }, { 'result': [ 1, "Unknown command: badcommand"]})
  187. self._handle_msg_helper({ "command": [ "get_commands_spec" ] }, { 'result': [ 0, {} ]})
  188. self._handle_msg_helper({ "command": [ "get_module_spec" ] }, { 'result': [ 0, {} ]})
  189. self._handle_msg_helper({ "command": [ "get_module_spec", { "module_name": "Spec2" } ] }, { 'result': [ 0, {} ]})
  190. #self._handle_msg_helper({ "command": [ "get_module_spec", { "module_name": "nosuchmodule" } ] },
  191. # {'result': [1, 'No specification for module nosuchmodule']})
  192. self._handle_msg_helper({ "command": [ "get_module_spec", 1 ] },
  193. {'result': [1, 'Bad get_module_spec command, argument not a dict']})
  194. self._handle_msg_helper({ "command": [ "get_module_spec", { } ] },
  195. {'result': [1, 'Bad module_name in get_module_spec command']})
  196. self._handle_msg_helper({ "command": [ "get_config" ] }, { 'result': [ 0, { 'version': config_data.BIND10_CONFIG_DATA_VERSION } ]})
  197. self._handle_msg_helper({ "command": [ "get_config", { "module_name": "nosuchmodule" } ] },
  198. {'result': [0, { 'version': config_data.BIND10_CONFIG_DATA_VERSION }]})
  199. self._handle_msg_helper({ "command": [ "get_config", 1 ] },
  200. {'result': [1, 'Bad get_config command, argument not a dict']})
  201. self._handle_msg_helper({ "command": [ "get_config", { } ] },
  202. {'result': [1, 'Bad module_name in get_config command']})
  203. self._handle_msg_helper({ "command": [ "set_config" ] },
  204. {'result': [1, 'Wrong number of arguments']})
  205. self._handle_msg_helper({ "command": [ "set_config", [{}]] },
  206. {'result': [0]})
  207. self.assertEqual(len(self.fake_session.message_queue), 0)
  208. # the targets of some of these tests expect specific answers, put
  209. # those in our fake msgq first.
  210. my_ok_answer = { 'result': [ 0 ] }
  211. # Send the 'ok' that cfgmgr expects back to the fake queue first
  212. self.fake_session.group_sendmsg(my_ok_answer, "ConfigManager")
  213. # then send the command
  214. self._handle_msg_helper({ "command": [ "set_config", [self.name, { "test": 123 }] ] },
  215. my_ok_answer)
  216. # The cfgmgr should have eaten the ok message, and sent out an update again
  217. self.assertEqual(len(self.fake_session.message_queue), 1)
  218. self.assertEqual({'command': [ 'config_update', {'test': 123}]},
  219. self.fake_session.get_message(self.name, None))
  220. # and the queue should now be empty again
  221. self.assertEqual(len(self.fake_session.message_queue), 0)
  222. # below are variations of the theme above
  223. self.fake_session.group_sendmsg(my_ok_answer, "ConfigManager")
  224. self._handle_msg_helper({ "command": [ "set_config", [self.name, { "test": 124 }] ] },
  225. my_ok_answer)
  226. self.assertEqual(len(self.fake_session.message_queue), 1)
  227. self.assertEqual({'command': [ 'config_update', {'test': 124}]},
  228. self.fake_session.get_message(self.name, None))
  229. self.assertEqual(len(self.fake_session.message_queue), 0)
  230. # This is the last 'succes' one, the value set here is what test_read_config expects
  231. self.fake_session.group_sendmsg(my_ok_answer, "ConfigManager")
  232. self._handle_msg_helper({ "command": [ "set_config", [ { self.name: { "test": 125 } }] ] },
  233. my_ok_answer )
  234. self.assertEqual(len(self.fake_session.message_queue), 1)
  235. self.assertEqual({'command': [ 'config_update', {'test': 125}]},
  236. self.fake_session.get_message(self.name, None))
  237. self.assertEqual(len(self.fake_session.message_queue), 0)
  238. my_bad_answer = { 'result': [1, "bad_answer"] }
  239. self.fake_session.group_sendmsg(my_bad_answer, "ConfigManager")
  240. self._handle_msg_helper({ "command": [ "set_config", [ self.name, { "test": 125 }] ] },
  241. my_bad_answer )
  242. self.assertEqual(len(self.fake_session.message_queue), 1)
  243. self.assertEqual({'command': [ 'config_update', {'test': 125}]},
  244. self.fake_session.get_message(self.name, None))
  245. self.assertEqual(len(self.fake_session.message_queue), 0)
  246. self.fake_session.group_sendmsg(None, 'ConfigManager')
  247. self._handle_msg_helper({ "command": [ "set_config", [ ] ] },
  248. {'result': [1, 'Wrong number of arguments']} )
  249. self._handle_msg_helper({ "command": [ "set_config", [ self.name, { "test": 125 }] ] },
  250. { 'result': [1, 'No answer message from TestModule']} )
  251. #self.assertEqual(len(self.fake_session.message_queue), 1)
  252. #self.assertEqual({'config_update': {'test': 124}},
  253. # self.fake_session.get_message(self.name, None))
  254. #self.assertEqual({'version': 1, 'TestModule': {'test': 124}}, self.cm.config.data)
  255. #
  256. self._handle_msg_helper({ "command":
  257. ["module_spec", self.spec.get_full_spec()]
  258. },
  259. {'result': [0]})
  260. self._handle_msg_helper({ "command": [ "module_spec", { 'foo': 1 } ] },
  261. {'result': [1, 'Error in data definition: no module_name in module_spec']})
  262. self._handle_msg_helper({ "command": [ "get_module_spec" ] }, { 'result': [ 0, { self.spec.get_module_name(): self.spec.get_full_spec() } ]})
  263. self._handle_msg_helper({ "command": [ "get_commands_spec" ] }, { 'result': [ 0, { self.spec.get_module_name(): self.spec.get_commands_spec() } ]})
  264. # re-add this once we have new way to propagate spec changes (1 instead of the current 2 messages)
  265. #self.assertEqual(len(self.fake_session.message_queue), 2)
  266. # the name here is actually wrong (and hardcoded), but needed in the current version
  267. # TODO: fix that
  268. #self.assertEqual({'specification_update': [ self.name, self.spec ] },
  269. # self.fake_session.get_message("Cmdctl", None))
  270. #self.assertEqual({'commands_update': [ self.name, self.commands ] },
  271. # self.fake_session.get_message("Cmdctl", None))
  272. self._handle_msg_helper({ "command":
  273. ["shutdown"]
  274. },
  275. {'result': [0]})
  276. def test_set_config_all(self):
  277. my_ok_answer = { 'result': [ 0 ] }
  278. self.assertEqual({"version": 2}, self.cm.config.data)
  279. self.fake_session.group_sendmsg(my_ok_answer, "ConfigManager")
  280. self.cm._handle_set_config_all({"test": { "value1": 123 }})
  281. self.assertEqual({"version": config_data.BIND10_CONFIG_DATA_VERSION,
  282. "test": { "value1": 123 }
  283. }, self.cm.config.data)
  284. self.fake_session.group_sendmsg(my_ok_answer, "ConfigManager")
  285. self.cm._handle_set_config_all({"test": { "value1": 124 }})
  286. self.assertEqual({"version": config_data.BIND10_CONFIG_DATA_VERSION,
  287. "test": { "value1": 124 }
  288. }, self.cm.config.data)
  289. self.fake_session.group_sendmsg(my_ok_answer, "ConfigManager")
  290. self.cm._handle_set_config_all({"test": { "value2": True }})
  291. self.assertEqual({"version": config_data.BIND10_CONFIG_DATA_VERSION,
  292. "test": { "value1": 124,
  293. "value2": True
  294. }
  295. }, self.cm.config.data)
  296. self.fake_session.group_sendmsg(my_ok_answer, "ConfigManager")
  297. self.cm._handle_set_config_all({"test": { "value3": [ 1, 2, 3 ] }})
  298. self.assertEqual({"version": config_data.BIND10_CONFIG_DATA_VERSION,
  299. "test": { "value1": 124,
  300. "value2": True,
  301. "value3": [ 1, 2, 3 ]
  302. }
  303. }, self.cm.config.data)
  304. self.fake_session.group_sendmsg(my_ok_answer, "ConfigManager")
  305. self.cm._handle_set_config_all({"test": { "value2": False }})
  306. self.assertEqual({"version": config_data.BIND10_CONFIG_DATA_VERSION,
  307. "test": { "value1": 124,
  308. "value2": False,
  309. "value3": [ 1, 2, 3 ]
  310. }
  311. }, self.cm.config.data)
  312. self.fake_session.group_sendmsg(my_ok_answer, "ConfigManager")
  313. self.cm._handle_set_config_all({"test": { "value1": None }})
  314. self.assertEqual({"version": config_data.BIND10_CONFIG_DATA_VERSION,
  315. "test": { "value2": False,
  316. "value3": [ 1, 2, 3 ]
  317. }
  318. }, self.cm.config.data)
  319. self.fake_session.group_sendmsg(my_ok_answer, "ConfigManager")
  320. self.cm._handle_set_config_all({"test": { "value3": [ 1 ] }})
  321. self.assertEqual({"version": config_data.BIND10_CONFIG_DATA_VERSION,
  322. "test": { "value2": False,
  323. "value3": [ 1 ]
  324. }
  325. }, self.cm.config.data)
  326. def test_run(self):
  327. self.fake_session.group_sendmsg({ "command": [ "get_commands_spec" ] }, "ConfigManager")
  328. self.fake_session.group_sendmsg({ "command": [ "shutdown" ] }, "ConfigManager")
  329. self.cm.run()
  330. pass
  331. if __name__ == '__main__':
  332. if not 'CONFIG_TESTDATA_PATH' in os.environ or not 'CONFIG_WR_TESTDATA_PATH' in os.environ:
  333. print("You need to set the environment variable CONFIG_TESTDATA_PATH and CONFIG_WR_TESTDATA_PATH to point to the directory containing the test data files")
  334. exit(1)
  335. unittest.main()