cfgmgr_test.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. # Copyright (C) 2010 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. #
  16. # Tests for the configuration manager module
  17. #
  18. import unittest
  19. import os
  20. from isc.config.cfgmgr import *
  21. from unittest_fakesession import FakeModuleCCSession
  22. class TestConfigManagerData(unittest.TestCase):
  23. def setUp(self):
  24. self.data_path = os.environ['CONFIG_TESTDATA_PATH']
  25. self.config_manager_data = ConfigManagerData(self.data_path)
  26. self.assert_(self.config_manager_data)
  27. def test_init(self):
  28. self.assertEqual(self.config_manager_data.data['version'],
  29. ConfigManagerData.CONFIG_VERSION)
  30. self.assertEqual(self.config_manager_data.data_path,
  31. self.data_path)
  32. self.assertEqual(self.config_manager_data.db_filename,
  33. self.data_path + os.sep + "b10-config.db")
  34. def test_read_from_file(self):
  35. ConfigManagerData.read_from_file(self.data_path)
  36. self.assertRaises(ConfigManagerDataEmpty,
  37. ConfigManagerData.read_from_file,
  38. "doesnotexist")
  39. self.assertRaises(ConfigManagerDataReadError,
  40. ConfigManagerData.read_from_file,
  41. self.data_path, "b10-config-bad1.db")
  42. self.assertRaises(ConfigManagerDataReadError,
  43. ConfigManagerData.read_from_file,
  44. self.data_path, "b10-config-bad2.db")
  45. self.assertRaises(ConfigManagerDataReadError,
  46. ConfigManagerData.read_from_file,
  47. self.data_path, "b10-config-bad3.db")
  48. def test_write_to_file(self):
  49. output_file_name = "b10-config-write-test";
  50. self.config_manager_data.write_to_file(output_file_name)
  51. new_config = ConfigManagerData(self.data_path, output_file_name)
  52. self.assertEqual(self.config_manager_data, new_config)
  53. def test_equality(self):
  54. # tests the __eq__ function. Equality is only defined
  55. # by equality of the .data element. If data_path or db_filename
  56. # are different, but the contents are the same, it's still
  57. # considered equal
  58. cfd1 = ConfigManagerData(self.data_path)
  59. cfd2 = ConfigManagerData(self.data_path)
  60. self.assertEqual(cfd1, cfd2)
  61. cfd2.data_path = "some/unknown/path"
  62. self.assertEqual(cfd1, cfd2)
  63. cfd2.db_filename = "bad_file.name"
  64. self.assertEqual(cfd1, cfd2)
  65. cfd2.data['test'] = { 'a': [ 1, 2, 3]}
  66. self.assertNotEqual(cfd1, cfd2)
  67. class TestConfigManager(unittest.TestCase):
  68. def setUp(self):
  69. self.data_path = os.environ['CONFIG_TESTDATA_PATH']
  70. self.fake_session = FakeModuleCCSession()
  71. self.cm = ConfigManager(self.data_path, self.fake_session)
  72. self.name = "TestModule"
  73. self.spec = isc.config.module_spec_from_file(self.data_path + os.sep + "/spec2.spec")
  74. def test_init(self):
  75. self.assert_(self.cm.module_specs == {})
  76. self.assert_(self.cm.data_path == self.data_path)
  77. self.assert_(self.cm.config != None)
  78. self.assert_(self.fake_session.has_subscription("ConfigManager"))
  79. self.assert_(self.fake_session.has_subscription("Boss", "ConfigManager"))
  80. self.assertFalse(self.cm.running)
  81. def test_notify_boss(self):
  82. self.cm.notify_boss()
  83. msg = self.fake_session.get_message("Boss", None)
  84. self.assert_(msg)
  85. # this one is actually wrong, but 'current status quo'
  86. self.assertEqual(msg, {"running": "configmanager"})
  87. def test_set_module_spec(self):
  88. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec1.spec")
  89. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  90. self.cm.set_module_spec(module_spec)
  91. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  92. def test_remove_module_spec(self):
  93. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec1.spec")
  94. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  95. self.cm.set_module_spec(module_spec)
  96. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  97. self.cm.remove_module_spec(module_spec.get_module_name())
  98. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  99. def test_get_module_spec(self):
  100. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec1.spec")
  101. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  102. self.cm.set_module_spec(module_spec)
  103. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  104. module_spec2 = self.cm.get_module_spec(module_spec.get_module_name())
  105. self.assertEqual(module_spec, module_spec2)
  106. def test_get_config_spec(self):
  107. config_spec = self.cm.get_config_spec()
  108. self.assertEqual(config_spec, {})
  109. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec1.spec")
  110. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  111. self.cm.set_module_spec(module_spec)
  112. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  113. config_spec = self.cm.get_config_spec()
  114. self.assertEqual(config_spec, { 'Spec1': None })
  115. self.cm.remove_module_spec('Spec1')
  116. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec2.spec")
  117. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  118. self.cm.set_module_spec(module_spec)
  119. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  120. config_spec = self.cm.get_config_spec()
  121. self.assertEqual(config_spec['Spec2'], module_spec.get_config_spec())
  122. config_spec = self.cm.get_config_spec('Spec2')
  123. self.assertEqual(config_spec['Spec2'], module_spec.get_config_spec())
  124. def test_get_commands_spec(self):
  125. commands_spec = self.cm.get_commands_spec()
  126. self.assertEqual(commands_spec, {})
  127. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec1.spec")
  128. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  129. self.cm.set_module_spec(module_spec)
  130. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  131. commands_spec = self.cm.get_commands_spec()
  132. self.assertEqual(commands_spec, { 'Spec1': None })
  133. self.cm.remove_module_spec('Spec1')
  134. module_spec = isc.config.module_spec.module_spec_from_file(self.data_path + os.sep + "spec2.spec")
  135. self.assert_(module_spec.get_module_name() not in self.cm.module_specs)
  136. self.cm.set_module_spec(module_spec)
  137. self.assert_(module_spec.get_module_name() in self.cm.module_specs)
  138. commands_spec = self.cm.get_commands_spec()
  139. self.assertEqual(commands_spec['Spec2'], module_spec.get_commands_spec())
  140. commands_spec = self.cm.get_commands_spec('Spec2')
  141. self.assertEqual(commands_spec['Spec2'], module_spec.get_commands_spec())
  142. def test_read_config(self):
  143. self.assertEqual(self.cm.config.data, {'version': 1})
  144. self.cm.read_config()
  145. # due to what get written, the value here is what the last set_config command in test_handle_msg does
  146. self.assertEqual(self.cm.config.data, {'TestModule': {'test': 125}, 'version': 1})
  147. self.cm.data_path = "/no_such_path"
  148. self.cm.read_config()
  149. self.assertEqual(self.cm.config.data, {'version': 1})
  150. def test_write_config(self):
  151. # tested in ConfigManagerData tests
  152. pass
  153. def _handle_msg_helper(self, msg, expected_answer):
  154. answer = self.cm.handle_msg(msg)
  155. self.assertEqual(expected_answer, answer)
  156. def test_handle_msg(self):
  157. self._handle_msg_helper({}, { 'result': [ 1, 'Unknown message format: {}']})
  158. self._handle_msg_helper("", { 'result': [ 1, 'Unknown message format: ']})
  159. self._handle_msg_helper({ "command": [ "badcommand" ] }, { 'result': [ 1, "Unknown command: badcommand"]})
  160. self._handle_msg_helper({ "command": [ "get_commands_spec" ] }, { 'result': [ 0, {} ]})
  161. self._handle_msg_helper({ "command": [ "get_module_spec" ] }, { 'result': [ 0, {} ]})
  162. self._handle_msg_helper({ "command": [ "get_module_spec", { "module_name": "Spec2" } ] }, { 'result': [ 0, {} ]})
  163. #self._handle_msg_helper({ "command": [ "get_module_spec", { "module_name": "nosuchmodule" } ] },
  164. # {'result': [1, 'No specification for module nosuchmodule']})
  165. self._handle_msg_helper({ "command": [ "get_module_spec", 1 ] },
  166. {'result': [1, 'Bad get_module_spec command, argument not a dict']})
  167. self._handle_msg_helper({ "command": [ "get_module_spec", { } ] },
  168. {'result': [1, 'Bad module_name in get_module_spec command']})
  169. self._handle_msg_helper({ "command": [ "get_config" ] }, { 'result': [ 0, { 'version': 1} ]})
  170. self._handle_msg_helper({ "command": [ "get_config", { "module_name": "nosuchmodule" } ] },
  171. {'result': [0, { 'version': 1 }]})
  172. self._handle_msg_helper({ "command": [ "get_config", 1 ] },
  173. {'result': [1, 'Bad get_config command, argument not a dict']})
  174. self._handle_msg_helper({ "command": [ "get_config", { } ] },
  175. {'result': [1, 'Bad module_name in get_config command']})
  176. self._handle_msg_helper({ "command": [ "set_config" ] },
  177. {'result': [1, 'Wrong number of arguments']})
  178. self._handle_msg_helper({ "command": [ "set_config", [{}]] },
  179. {'result': [0]})
  180. self.assertEqual(len(self.fake_session.message_queue), 0)
  181. # the targets of some of these tests expect specific answers, put
  182. # those in our fake msgq first.
  183. my_ok_answer = { 'result': [ 0 ] }
  184. # Send the 'ok' that cfgmgr expects back to the fake queue first
  185. self.fake_session.group_sendmsg(my_ok_answer, "ConfigManager")
  186. # then send the command
  187. self._handle_msg_helper({ "command": [ "set_config", [self.name, { "test": 123 }] ] },
  188. my_ok_answer)
  189. # The cfgmgr should have eaten the ok message, and sent out an update again
  190. self.assertEqual(len(self.fake_session.message_queue), 1)
  191. self.assertEqual({'command': [ 'config_update', {'test': 123}]},
  192. self.fake_session.get_message(self.name, None))
  193. # and the queue should now be empty again
  194. self.assertEqual(len(self.fake_session.message_queue), 0)
  195. # below are variations of the theme above
  196. self.fake_session.group_sendmsg(my_ok_answer, "ConfigManager")
  197. self._handle_msg_helper({ "command": [ "set_config", [self.name, { "test": 124 }] ] },
  198. my_ok_answer)
  199. self.assertEqual(len(self.fake_session.message_queue), 1)
  200. self.assertEqual({'command': [ 'config_update', {'test': 124}]},
  201. self.fake_session.get_message(self.name, None))
  202. self.assertEqual(len(self.fake_session.message_queue), 0)
  203. # This is the last 'succes' one, the value set here is what test_read_config expects
  204. self.fake_session.group_sendmsg(my_ok_answer, "ConfigManager")
  205. self._handle_msg_helper({ "command": [ "set_config", [ { self.name: { "test": 125 } }] ] },
  206. my_ok_answer )
  207. self.assertEqual(len(self.fake_session.message_queue), 1)
  208. self.assertEqual({'command': [ 'config_update', {'test': 125}]},
  209. self.fake_session.get_message(self.name, None))
  210. self.assertEqual(len(self.fake_session.message_queue), 0)
  211. my_bad_answer = { 'result': [1, "bad_answer"] }
  212. self.fake_session.group_sendmsg(my_bad_answer, "ConfigManager")
  213. self._handle_msg_helper({ "command": [ "set_config", [ self.name, { "test": 125 }] ] },
  214. my_bad_answer )
  215. self.assertEqual(len(self.fake_session.message_queue), 1)
  216. self.assertEqual({'command': [ 'config_update', {'test': 125}]},
  217. self.fake_session.get_message(self.name, None))
  218. self.assertEqual(len(self.fake_session.message_queue), 0)
  219. self._handle_msg_helper({ "command": [ "set_config", [ ] ] },
  220. {'result': [1, 'Wrong number of arguments']} )
  221. self._handle_msg_helper({ "command": [ "set_config", [ self.name, { "test": 125 }] ] },
  222. { 'result': [1, 'No answer message from TestModule']} )
  223. #self.assertEqual(len(self.fake_session.message_queue), 1)
  224. #self.assertEqual({'config_update': {'test': 124}},
  225. # self.fake_session.get_message(self.name, None))
  226. #self.assertEqual({'version': 1, 'TestModule': {'test': 124}}, self.cm.config.data)
  227. #
  228. self._handle_msg_helper({ "command":
  229. ["module_spec", self.spec.get_full_spec()]
  230. },
  231. {'result': [0]})
  232. self._handle_msg_helper({ "command": [ "module_spec", { 'foo': 1 } ] },
  233. {'result': [1, 'Error in data definition: no module_name in module_spec']})
  234. self._handle_msg_helper({ "command": [ "get_module_spec" ] }, { 'result': [ 0, { self.spec.get_module_name(): self.spec.get_config_spec() } ]})
  235. self._handle_msg_helper({ "command": [ "get_commands_spec" ] }, { 'result': [ 0, { self.spec.get_module_name(): self.spec.get_commands_spec() } ]})
  236. # re-add this once we have new way to propagate spec changes (1 instead of the current 2 messages)
  237. #self.assertEqual(len(self.fake_session.message_queue), 2)
  238. # the name here is actually wrong (and hardcoded), but needed in the current version
  239. # TODO: fix that
  240. #self.assertEqual({'specification_update': [ self.name, self.spec ] },
  241. # self.fake_session.get_message("Cmd-Ctrld", None))
  242. #self.assertEqual({'commands_update': [ self.name, self.commands ] },
  243. # self.fake_session.get_message("Cmd-Ctrld", None))
  244. self._handle_msg_helper({ "command":
  245. ["shutdown"]
  246. },
  247. {'result': [0]})
  248. def test_run(self):
  249. self.fake_session.group_sendmsg({ "command": [ "get_commands_spec" ] }, "ConfigManager")
  250. self.cm.run()
  251. pass
  252. if __name__ == '__main__':
  253. if not 'CONFIG_TESTDATA_PATH' in os.environ:
  254. print("You need to set the environment variable CONFIG_TESTDATA_PATH to point to the directory containing the test data files")
  255. exit(1)
  256. unittest.main()