loadzone_test.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. # Copyright (C) 2012 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. '''Tests for the loadzone module'''
  16. import unittest
  17. from loadzone import *
  18. from isc.dns import *
  19. from isc.datasrc import *
  20. import isc.log
  21. import bind10_config
  22. import os
  23. import shutil
  24. # Some common test parameters
  25. TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
  26. READ_ZONE_DB_FILE = TESTDATA_PATH + "rwtest.sqlite3" # original, to be copied
  27. LOCAL_TESTDATA_PATH = os.environ['LOCAL_TESTDATA_PATH'] + os.sep
  28. READ_ZONE_DB_FILE = TESTDATA_PATH + "rwtest.sqlite3" # original, to be copied
  29. NEW_ZONE_TXT_FILE = LOCAL_TESTDATA_PATH + "example.org.zone"
  30. ALT_NEW_ZONE_TXT_FILE = TESTDATA_PATH + "example.com.zone"
  31. TESTDATA_WRITE_PATH = os.environ['TESTDATA_WRITE_PATH'] + os.sep
  32. WRITE_ZONE_DB_FILE = TESTDATA_WRITE_PATH + "rwtest.sqlite3.copied"
  33. TEST_ZONE_NAME = Name('example.org')
  34. DATASRC_CONFIG = '{"database_file": "' + WRITE_ZONE_DB_FILE + '"}'
  35. # before/after SOAs: different in mname and serial
  36. ORIG_SOA_TXT = 'example.org. 3600 IN SOA ns1.example.org. ' +\
  37. 'admin.example.org. 1234 3600 1800 2419200 7200\n'
  38. NEW_SOA_TXT = 'example.org. 3600 IN SOA ns.example.org. ' +\
  39. 'admin.example.org. 1235 3600 1800 2419200 7200\n'
  40. # This is the brandnew SOA for a newly created zone
  41. ALT_NEW_SOA_TXT = 'example.com. 3600 IN SOA ns.example.com. ' +\
  42. 'admin.example.com. 1234 3600 1800 2419200 7200\n'
  43. class TestLoadZoneRunner(unittest.TestCase):
  44. def setUp(self):
  45. shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
  46. # default command line arguments
  47. self.__args = ['-c', DATASRC_CONFIG, 'example.org', NEW_ZONE_TXT_FILE]
  48. self.__runner = LoadZoneRunner(self.__args)
  49. def tearDown(self):
  50. # Delete the used DB file; if some of the tests unexpectedly fail
  51. # unexpectedly in the middle of updating the DB, a lock could stay
  52. # there and would affect the other tests that would otherwise succeed.
  53. os.unlink(WRITE_ZONE_DB_FILE)
  54. def test_init(self):
  55. '''
  56. Checks initial class attributes
  57. '''
  58. self.assertIsNone(self.__runner._zone_class)
  59. self.assertIsNone(self.__runner._zone_name)
  60. self.assertIsNone(self.__runner._zone_file)
  61. self.assertIsNone(self.__runner._datasrc_config)
  62. self.assertIsNone(self.__runner._datasrc_type)
  63. self.assertEqual(10000, self.__runner._report_interval)
  64. self.assertEqual('INFO', self.__runner._log_severity)
  65. self.assertEqual(0, self.__runner._log_debuglevel)
  66. def test_parse_args(self):
  67. self.__runner._parse_args()
  68. self.assertEqual(TEST_ZONE_NAME, self.__runner._zone_name)
  69. self.assertEqual(NEW_ZONE_TXT_FILE, self.__runner._zone_file)
  70. self.assertEqual(DATASRC_CONFIG, self.__runner._datasrc_config)
  71. self.assertEqual('sqlite3', self.__runner._datasrc_type) # default
  72. self.assertEqual(10000, self.__runner._report_interval) # default
  73. self.assertEqual(RRClass.IN(), self.__runner._zone_class) # default
  74. self.assertEqual('INFO', self.__runner._log_severity) # default
  75. self.assertEqual(0, self.__runner._log_debuglevel)
  76. def test_set_loglevel(self):
  77. runner = LoadZoneRunner(['-d', '1'] + self.__args)
  78. runner._parse_args()
  79. self.assertEqual('DEBUG', runner._log_severity)
  80. self.assertEqual(1, runner._log_debuglevel)
  81. def test_parse_bad_args(self):
  82. # There must be exactly 2 non-option arguments: zone name and zone file
  83. self.assertRaises(BadArgument, LoadZoneRunner([])._parse_args)
  84. self.assertRaises(BadArgument, LoadZoneRunner(['example']).
  85. _parse_args)
  86. self.assertRaises(BadArgument, LoadZoneRunner(self.__args + ['0']).
  87. _parse_args)
  88. # Bad zone name
  89. args = ['example.org', 'example.zone'] # otherwise valid args
  90. self.assertRaises(BadArgument,
  91. LoadZoneRunner(['bad..name', 'example.zone'] + args).
  92. _parse_args)
  93. # Bad class name
  94. self.assertRaises(BadArgument,
  95. LoadZoneRunner(['-C', 'badclass'] + args).
  96. _parse_args)
  97. # Unsupported class
  98. self.assertRaises(BadArgument,
  99. LoadZoneRunner(['-C', 'CH'] + args)._parse_args)
  100. # bad debug level
  101. self.assertRaises(BadArgument,
  102. LoadZoneRunner(['-d', '-10'] + args)._parse_args)
  103. # bad report interval
  104. self.assertRaises(BadArgument,
  105. LoadZoneRunner(['-i', '-5'] + args)._parse_args)
  106. # -c cannot be omitted unless it's type sqlite3 (right now)
  107. self.assertRaises(BadArgument,
  108. LoadZoneRunner(['-t', 'memory'] + args)._parse_args)
  109. def test_get_datasrc_config(self):
  110. # For sqlite3, we use the config with the well-known DB file.
  111. expected_conf = \
  112. '{"database_file": "' + bind10_config.DATA_PATH + '/zone.sqlite3"}'
  113. self.assertEqual(expected_conf,
  114. self.__runner._get_datasrc_config('sqlite3'))
  115. # For other types, config must be given by hand for now
  116. self.assertRaises(BadArgument, self.__runner._get_datasrc_config,
  117. 'memory')
  118. def __common_load_setup(self):
  119. self.__runner._zone_class = RRClass.IN()
  120. self.__runner._zone_name = TEST_ZONE_NAME
  121. self.__runner._zone_file = NEW_ZONE_TXT_FILE
  122. self.__runner._datasrc_type = 'sqlite3'
  123. self.__runner._datasrc_config = DATASRC_CONFIG
  124. self.__runner._report_interval = 1
  125. self.__reports = []
  126. self.__runner._report_progress = lambda x: self.__reports.append(x)
  127. def __check_zone_soa(self, soa_txt, zone_name=TEST_ZONE_NAME):
  128. """Check that the given SOA RR exists and matches the expected string
  129. If soa_txt is None, the zone is expected to be non-existent.
  130. Otherwise, if soa_txt is False, the zone should exist but SOA is
  131. expected to be missing.
  132. """
  133. client = DataSourceClient('sqlite3', DATASRC_CONFIG)
  134. result, finder = client.find_zone(zone_name)
  135. if soa_txt is None:
  136. self.assertEqual(client.NOTFOUND, result)
  137. return
  138. self.assertEqual(client.SUCCESS, result)
  139. result, rrset, _ = finder.find(zone_name, RRType.SOA())
  140. if soa_txt:
  141. self.assertEqual(finder.SUCCESS, result)
  142. self.assertEqual(soa_txt, rrset.to_text())
  143. else:
  144. self.assertEqual(finder.NXRRSET, result)
  145. def test_load_update(self):
  146. '''successful case to loading new contents to an existing zone.'''
  147. self.__common_load_setup()
  148. self.__check_zone_soa(ORIG_SOA_TXT)
  149. self.__runner._do_load()
  150. # In this test setup every loaded RR will be reported, and there will
  151. # be 3 RRs
  152. self.assertEqual([1, 2, 3], self.__reports)
  153. self.__check_zone_soa(NEW_SOA_TXT)
  154. def test_load_update_skipped_report(self):
  155. '''successful loading, with reports for every 2 RRs'''
  156. self.__common_load_setup()
  157. self.__runner._report_interval = 2
  158. self.__runner._do_load()
  159. self.assertEqual([2], self.__reports)
  160. def test_load_update_no_report(self):
  161. '''successful loading, without progress reports'''
  162. self.__common_load_setup()
  163. self.__runner._report_interval = 0
  164. self.__runner._do_load()
  165. self.assertEqual([], self.__reports) # no report
  166. self.__check_zone_soa(NEW_SOA_TXT) # but load is completed
  167. def test_create_and_load(self):
  168. '''successful case to loading contents to a new zone (created).'''
  169. self.__common_load_setup()
  170. self.__runner._zone_name = Name('example.com')
  171. self.__runner._zone_file = ALT_NEW_ZONE_TXT_FILE
  172. self.__check_zone_soa(None, zone_name=Name('example.com'))
  173. self.__runner._do_load()
  174. self.__check_zone_soa(ALT_NEW_SOA_TXT, zone_name=Name('example.com'))
  175. def test_load_fail_badconfig(self):
  176. '''Load attempt fails due to broken datasrc config.'''
  177. self.__common_load_setup()
  178. self.__runner._datasrc_config = "invalid config"
  179. self.__check_zone_soa(ORIG_SOA_TXT)
  180. self.assertRaises(LoadFailure, self.__runner._do_load)
  181. self.__check_zone_soa(ORIG_SOA_TXT) # no change to the zone
  182. def test_load_fail_badzone(self):
  183. '''Load attempt fails due to broken zone file.'''
  184. self.__common_load_setup()
  185. self.__runner._zone_file = \
  186. LOCAL_TESTDATA_PATH + '/broken-example.org.zone'
  187. self.__check_zone_soa(ORIG_SOA_TXT)
  188. self.assertRaises(LoadFailure, self.__runner._do_load)
  189. self.__check_zone_soa(ORIG_SOA_TXT)
  190. def test_load_fail_noloader(self):
  191. '''Load attempt fails because loading isn't supported'''
  192. self.__common_load_setup()
  193. self.__runner._datasrc_type = 'memory'
  194. self.__runner._datasrc_config = '{"type": "memory"}'
  195. self.__check_zone_soa(ORIG_SOA_TXT)
  196. self.assertRaises(LoadFailure, self.__runner._do_load)
  197. self.__check_zone_soa(ORIG_SOA_TXT)
  198. def test_load_fail_create_cancel(self):
  199. '''Load attempt fails and new creation of zone is canceled'''
  200. self.__common_load_setup()
  201. self.__runner._zone_name = Name('example.com')
  202. self.__runner._zone_file = 'no-such-file'
  203. self.__check_zone_soa(None, zone_name=Name('example.com'))
  204. self.assertRaises(LoadFailure, self.__runner._do_load)
  205. # _do_load() should have once created the zone but then canceled it.
  206. self.__check_zone_soa(None, zone_name=Name('example.com'))
  207. def __common_post_load_setup(self, zone_file):
  208. '''Common setup procedure for post load tests.'''
  209. # replace the LoadZoneRunner's original _post_load_warning() for
  210. # inspection
  211. self.__warnings = []
  212. self.__runner._post_load_warning = \
  213. lambda msg: self.__warnings.append(msg)
  214. # perform load and invoke checks
  215. self.__common_load_setup()
  216. self.__runner._zone_file = zone_file
  217. self.__check_zone_soa(ORIG_SOA_TXT)
  218. self.__runner._do_load()
  219. self.__runner._post_load_checks()
  220. def test_load_post_check_fail_soa(self):
  221. '''Load succeeds but warns about missing SOA, should cause warn'''
  222. self.__common_load_setup()
  223. self.__common_post_load_setup(LOCAL_TESTDATA_PATH +
  224. '/example-nosoa.org.zone')
  225. self.__check_zone_soa(False)
  226. self.assertEqual(1, len(self.__warnings))
  227. self.assertEqual('zone has no SOA', self.__warnings[0])
  228. def test_load_post_check_fail_ns(self):
  229. '''Load succeeds but warns about missing NS, should cause warn'''
  230. self.__common_load_setup()
  231. self.__common_post_load_setup(LOCAL_TESTDATA_PATH +
  232. '/example-nons.org.zone')
  233. self.__check_zone_soa(NEW_SOA_TXT)
  234. self.assertEqual(1, len(self.__warnings))
  235. self.assertEqual('zone has no NS', self.__warnings[0])
  236. def __interrupt_progress(self, loaded_rrs):
  237. '''A helper emulating a signal in the middle of loading.
  238. On the second progress report, it internally invokes the signal
  239. handler to see if it stops the loading.
  240. '''
  241. self.__reports.append(loaded_rrs)
  242. if len(self.__reports) == 2:
  243. self.__runner._interrupt_handler()
  244. def test_load_interrupted(self):
  245. '''Load attempt fails due to signal interruption'''
  246. self.__common_load_setup()
  247. self.__runner._report_progress = lambda x: self.__interrupt_progress(x)
  248. # The interrupting _report_progress() will terminate the loading
  249. # in the middle. the number of reports is smaller, and the zone
  250. # won't be changed.
  251. self.assertRaises(LoadFailure, self.__runner._do_load)
  252. self.assertEqual([1, 2], self.__reports)
  253. self.__check_zone_soa(ORIG_SOA_TXT)
  254. def test_load_interrupted_create_cancel(self):
  255. '''Load attempt for a new zone fails due to signal interruption
  256. It cancels the zone creation.
  257. '''
  258. self.__common_load_setup()
  259. self.__runner._report_progress = lambda x: self.__interrupt_progress(x)
  260. self.__runner._zone_name = Name('example.com')
  261. self.__runner._zone_file = ALT_NEW_ZONE_TXT_FILE
  262. self.__check_zone_soa(None, zone_name=Name('example.com'))
  263. self.assertRaises(LoadFailure, self.__runner._do_load)
  264. self.assertEqual([1, 2], self.__reports)
  265. self.__check_zone_soa(None, zone_name=Name('example.com'))
  266. def test_run_success(self):
  267. '''Check for the top-level method.
  268. Detailed behavior is tested in other tests. We only check the
  269. return value of run(), and the zone is successfully loaded.
  270. '''
  271. self.__check_zone_soa(ORIG_SOA_TXT)
  272. self.assertEqual(0, self.__runner.run())
  273. self.__check_zone_soa(NEW_SOA_TXT)
  274. def test_run_fail(self):
  275. '''Check for the top-level method, failure case.
  276. Similar to the success test, but loading will fail, and return
  277. value should be 1.
  278. '''
  279. runner = LoadZoneRunner(['-c', DATASRC_CONFIG, 'example.org',
  280. LOCAL_TESTDATA_PATH +
  281. '/broken-example.org.zone'])
  282. self.__check_zone_soa(ORIG_SOA_TXT)
  283. self.assertEqual(1, runner.run())
  284. self.__check_zone_soa(ORIG_SOA_TXT)
  285. if __name__== "__main__":
  286. isc.log.resetUnitTestRootLogger()
  287. # Disable the internal logging setup so the test output won't be too
  288. # verbose by default.
  289. LoadZoneRunner._config_log = lambda x: None
  290. # Cancel signal handlers so we can stop tests when they hang
  291. LoadZoneRunner._set_signal_handlers = lambda x: None
  292. unittest.main()