cfgmgr_test.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. # Copyright (C) 2010 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. # $Id$
  16. #
  17. # Tests for the configuration manager module
  18. #
  19. import unittest
  20. import os
  21. from isc.config.cfgmgr import *
  22. from unittest_fakesession import FakeModuleCCSession
  23. class TestConfigManagerData(unittest.TestCase):
  24. def setUp(self):
  25. self.data_path = os.environ['CONFIG_TESTDATA_PATH']
  26. self.config_manager_data = ConfigManagerData(self.data_path)
  27. self.assert_(self.config_manager_data)
  28. def test_init(self):
  29. self.assertEqual(self.config_manager_data.data['version'],
  30. ConfigManagerData.CONFIG_VERSION)
  31. self.assertEqual(self.config_manager_data.data_path,
  32. self.data_path)
  33. self.assertEqual(self.config_manager_data.db_filename,
  34. self.data_path + os.sep + "b10-config.db")
  35. def test_read_from_file(self):
  36. ConfigManagerData.read_from_file(self.data_path)
  37. self.assertRaises(ConfigManagerDataEmpty,
  38. ConfigManagerData.read_from_file,
  39. "doesnotexist")
  40. self.assertRaises(ConfigManagerDataReadError,
  41. ConfigManagerData.read_from_file,
  42. self.data_path, "b10-config-bad1.db")
  43. self.assertRaises(ConfigManagerDataReadError,
  44. ConfigManagerData.read_from_file,
  45. self.data_path, "b10-config-bad2.db")
  46. self.assertRaises(ConfigManagerDataReadError,
  47. ConfigManagerData.read_from_file,
  48. self.data_path, "b10-config-bad3.db")
  49. def test_write_to_file(self):
  50. output_file_name = "b10-config-write-test";
  51. self.config_manager_data.write_to_file(output_file_name)
  52. new_config = ConfigManagerData(self.data_path, output_file_name)
  53. self.assertEqual(self.config_manager_data, new_config)
  54. def test_equality(self):
  55. # tests the __eq__ function. Equality is only defined
  56. # by equality of the .data element. If data_path or db_filename
  57. # are different, but the contents are the same, it's still
  58. # considered equal
  59. cfd1 = ConfigManagerData(self.data_path)
  60. cfd2 = ConfigManagerData(self.data_path)
  61. self.assertEqual(cfd1, cfd2)
  62. cfd2.data_path = "some/unknown/path"
  63. self.assertEqual(cfd1, cfd2)
  64. cfd2.db_filename = "bad_file.name"
  65. self.assertEqual(cfd1, cfd2)
  66. cfd2.data['test'] = { 'a': [ 1, 2, 3]}
  67. self.assertNotEqual(cfd1, cfd2)
  68. class TestConfigManager(unittest.TestCase):
  69. def setUp(self):
  70. self.data_path = os.environ['CONFIG_TESTDATA_PATH']
  71. self.fake_session = FakeModuleCCSession()
  72. self.cm = ConfigManager(self.data_path, self.fake_session)
  73. self.name = "TestModule"
  74. self.spec = isc.config.module_spec_from_file(self.data_path + os.sep + "/spec2.spec")
  75. def test_init(self):
  76. self.assert_(self.cm.module_specs == {})
  77. self.assert_(self.cm.data_path == self.data_path)
  78. self.assert_(self.cm.config != None)
  79. self.assert_(self.fake_session.has_subscription("ConfigManager"))
  80. self.assert_(self.fake_session.has_subscription("Boss", "ConfigManager"))
  81. self.assertFalse(self.cm.running)
  82. def test_notify_boss(self):
  83. self.cm.notify_boss()
  84. msg = self.fake_session.get_message("Boss", None)
  85. self.assert_(msg)
  86. # this one is actually wrong, but 'current status quo'
  87. self.assertEqual(msg, {"running": "configmanager"})
  88. def test_set_module_spec(self):
  89. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec1.spec")
  90. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  91. self.cm.set_module_spec(module_spec)
  92. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  93. def test_remove_module_spec(self):
  94. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec1.spec")
  95. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  96. self.cm.set_module_spec(module_spec)
  97. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  98. self.cm.remove_module_spec(module_spec.get_module_name())
  99. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  100. def test_get_module_spec(self):
  101. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec1.spec")
  102. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  103. self.cm.set_module_spec(module_spec)
  104. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  105. module_spec2 = self.cm.get_module_spec(module_spec.get_module_name())
  106. self.assertEqual(module_spec, module_spec2)
  107. def test_get_config_spec(self):
  108. config_spec = self.cm.get_config_spec()
  109. self.assertEqual(config_spec, {})
  110. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec1.spec")
  111. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  112. self.cm.set_module_spec(module_spec)
  113. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  114. config_spec = self.cm.get_config_spec()
  115. self.assertEqual(config_spec, { 'Spec1': None })
  116. self.cm.remove_module_spec('Spec1')
  117. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec2.spec")
  118. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  119. self.cm.set_module_spec(module_spec)
  120. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  121. config_spec = self.cm.get_config_spec()
  122. self.assertEqual(config_spec['Spec2'], module_spec.get_config_spec())
  123. config_spec = self.cm.get_config_spec('Spec2')
  124. self.assertEqual(config_spec['Spec2'], module_spec.get_config_spec())
  125. def test_get_commands_spec(self):
  126. commands_spec = self.cm.get_commands_spec()
  127. self.assertEqual(commands_spec, {})
  128. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec1.spec")
  129. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  130. self.cm.set_module_spec(module_spec)
  131. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  132. commands_spec = self.cm.get_commands_spec()
  133. self.assertEqual(commands_spec, { 'Spec1': None })
  134. self.cm.remove_module_spec('Spec1')
  135. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec2.spec")
  136. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  137. self.cm.set_module_spec(module_spec)
  138. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  139. commands_spec = self.cm.get_commands_spec()
  140. self.assertEqual(commands_spec['Spec2'], module_spec.get_commands_spec())
  141. commands_spec = self.cm.get_commands_spec('Spec2')
  142. self.assertEqual(commands_spec['Spec2'], module_spec.get_commands_spec())
  143. def test_read_config(self):
  144. self.assertEqual(self.cm.config.data, {'version': 1})
  145. self.cm.read_config()
  146. # due to what get written, the value here is what the last set_config command in test_handle_msg does
  147. self.assertEqual(self.cm.config.data, {'TestModule': {'test': 125}, 'version': 1})
  148. self.cm.data_path = "/no_such_path"
  149. self.cm.read_config()
  150. self.assertEqual(self.cm.config.data, {'version': 1})
  151. def test_write_config(self):
  152. # tested in ConfigManagerData tests
  153. pass
  154. def _handle_msg_helper(self, msg, expected_answer):
  155. answer = self.cm.handle_msg(msg)
  156. self.assertEqual(expected_answer, answer)
  157. def test_handle_msg(self):
  158. self._handle_msg_helper({}, { 'result': [ 1, 'Unknown message format: {}']})
  159. self._handle_msg_helper("", { 'result': [ 1, 'Unknown message format: ']})
  160. self._handle_msg_helper({ "command": [ "badcommand" ] }, { 'result': [ 1, "Unknown command: badcommand"]})
  161. self._handle_msg_helper({ "command": [ "get_commands_spec" ] }, { 'result': [ 0, {} ]})
  162. self._handle_msg_helper({ "command": [ "get_module_spec" ] }, { 'result': [ 0, {} ]})
  163. self._handle_msg_helper({ "command": [ "get_module_spec", { "module_name": "Spec2" } ] }, { 'result': [ 0, {} ]})
  164. #self._handle_msg_helper({ "command": [ "get_module_spec", { "module_name": "nosuchmodule" } ] },
  165. # {'result': [1, 'No specification for module nosuchmodule']})
  166. self._handle_msg_helper({ "command": [ "get_module_spec", 1 ] },
  167. {'result': [1, 'Bad get_module_spec command, argument not a dict']})
  168. self._handle_msg_helper({ "command": [ "get_module_spec", { } ] },
  169. {'result': [1, 'Bad module_name in get_module_spec command']})
  170. self._handle_msg_helper({ "command": [ "get_config" ] }, { 'result': [ 0, { 'version': 1} ]})
  171. self._handle_msg_helper({ "command": [ "get_config", { "module_name": "nosuchmodule" } ] },
  172. {'result': [0, { 'version': 1 }]})
  173. self._handle_msg_helper({ "command": [ "get_config", 1 ] },
  174. {'result': [1, 'Bad get_config command, argument not a dict']})
  175. self._handle_msg_helper({ "command": [ "get_config", { } ] },
  176. {'result': [1, 'Bad module_name in get_config command']})
  177. self._handle_msg_helper({ "command": [ "set_config" ] },
  178. {'result': [1, 'Wrong number of arguments']})
  179. self._handle_msg_helper({ "command": [ "set_config", [{}]] },
  180. {'result': [0]})
  181. self.assertEqual(len(self.fake_session.message_queue), 0)
  182. # the targets of some of these tests expect specific answers, put
  183. # those in our fake msgq first.
  184. my_ok_answer = { 'result': [ 0 ] }
  185. # Send the 'ok' that cfgmgr expects back to the fake queue first
  186. self.fake_session.group_sendmsg(my_ok_answer, "ConfigManager")
  187. # then send the command
  188. self._handle_msg_helper({ "command": [ "set_config", [self.name, { "test": 123 }] ] },
  189. my_ok_answer)
  190. # The cfgmgr should have eaten the ok message, and sent out an update again
  191. self.assertEqual(len(self.fake_session.message_queue), 1)
  192. self.assertEqual({'command': [ 'config_update', {'test': 123}]},
  193. self.fake_session.get_message(self.name, None))
  194. # and the queue should now be empty again
  195. self.assertEqual(len(self.fake_session.message_queue), 0)
  196. # below are variations of the theme above
  197. self.fake_session.group_sendmsg(my_ok_answer, "ConfigManager")
  198. self._handle_msg_helper({ "command": [ "set_config", [self.name, { "test": 124 }] ] },
  199. my_ok_answer)
  200. self.assertEqual(len(self.fake_session.message_queue), 1)
  201. self.assertEqual({'command': [ 'config_update', {'test': 124}]},
  202. self.fake_session.get_message(self.name, None))
  203. self.assertEqual(len(self.fake_session.message_queue), 0)
  204. # This is the last 'succes' one, the value set here is what test_read_config expects
  205. self.fake_session.group_sendmsg(my_ok_answer, "ConfigManager")
  206. self._handle_msg_helper({ "command": [ "set_config", [ { self.name: { "test": 125 } }] ] },
  207. my_ok_answer )
  208. self.assertEqual(len(self.fake_session.message_queue), 1)
  209. self.assertEqual({'command': [ 'config_update', {'test': 125}]},
  210. self.fake_session.get_message(self.name, None))
  211. self.assertEqual(len(self.fake_session.message_queue), 0)
  212. my_bad_answer = { 'result': [1, "bad_answer"] }
  213. self.fake_session.group_sendmsg(my_bad_answer, "ConfigManager")
  214. self._handle_msg_helper({ "command": [ "set_config", [ self.name, { "test": 125 }] ] },
  215. my_bad_answer )
  216. self.assertEqual(len(self.fake_session.message_queue), 1)
  217. self.assertEqual({'command': [ 'config_update', {'test': 125}]},
  218. self.fake_session.get_message(self.name, None))
  219. self.assertEqual(len(self.fake_session.message_queue), 0)
  220. self._handle_msg_helper({ "command": [ "set_config", [ ] ] },
  221. {'result': [1, 'Wrong number of arguments']} )
  222. self._handle_msg_helper({ "command": [ "set_config", [ self.name, { "test": 125 }] ] },
  223. { 'result': [1, 'No answer message from TestModule']} )
  224. #self.assertEqual(len(self.fake_session.message_queue), 1)
  225. #self.assertEqual({'config_update': {'test': 124}},
  226. # self.fake_session.get_message(self.name, None))
  227. #self.assertEqual({'version': 1, 'TestModule': {'test': 124}}, self.cm.config.data)
  228. #
  229. self._handle_msg_helper({ "command":
  230. ["module_spec", self.spec.get_full_spec()]
  231. },
  232. {'result': [0]})
  233. self._handle_msg_helper({ "command": [ "module_spec", { 'foo': 1 } ] },
  234. {'result': [1, 'Error in data definition: no module_name in module_spec']})
  235. self._handle_msg_helper({ "command": [ "get_module_spec" ] }, { 'result': [ 0, { self.spec.get_module_name(): self.spec.get_full_spec() } ]})
  236. self._handle_msg_helper({ "command": [ "get_commands_spec" ] }, { 'result': [ 0, { self.spec.get_module_name(): self.spec.get_commands_spec() } ]})
  237. # re-add this once we have new way to propagate spec changes (1 instead of the current 2 messages)
  238. #self.assertEqual(len(self.fake_session.message_queue), 2)
  239. # the name here is actually wrong (and hardcoded), but needed in the current version
  240. # TODO: fix that
  241. #self.assertEqual({'specification_update': [ self.name, self.spec ] },
  242. # self.fake_session.get_message("Cmd-Ctrld", None))
  243. #self.assertEqual({'commands_update': [ self.name, self.commands ] },
  244. # self.fake_session.get_message("Cmd-Ctrld", None))
  245. self._handle_msg_helper({ "command":
  246. ["shutdown"]
  247. },
  248. {'result': [0]})
  249. def test_run(self):
  250. self.fake_session.group_sendmsg({ "command": [ "get_commands_spec" ] }, "ConfigManager")
  251. self.cm.run()
  252. pass
  253. if __name__ == '__main__':
  254. if not 'CONFIG_TESTDATA_PATH' in os.environ:
  255. print("You need to set the environment variable CONFIG_TESTDATA_PATH to point to the directory containing the test data files")
  256. exit(1)
  257. unittest.main()