|
@@ -0,0 +1,342 @@
|
|
|
+# Copyright (C) 2012 Internet Systems Consortium.
|
|
|
+#
|
|
|
+# Permission to use, copy, modify, and distribute this software for any
|
|
|
+# purpose with or without fee is hereby granted, provided that the above
|
|
|
+# copyright notice and this permission notice appear in all copies.
|
|
|
+#
|
|
|
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
|
|
|
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
|
|
|
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
|
|
|
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
|
|
|
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
|
|
|
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
|
|
|
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
|
|
|
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
|
+
|
|
|
+'''Tests for the loadzone module'''
|
|
|
+
|
|
|
+import unittest
|
|
|
+from loadzone import *
|
|
|
+from isc.dns import *
|
|
|
+from isc.datasrc import *
|
|
|
+import isc.log
|
|
|
+import bind10_config
|
|
|
+import os
|
|
|
+import shutil
|
|
|
+
|
|
|
+# Some common test parameters
|
|
|
+TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
|
|
|
+READ_ZONE_DB_FILE = TESTDATA_PATH + "rwtest.sqlite3" # original, to be copied
|
|
|
+LOCAL_TESTDATA_PATH = os.environ['LOCAL_TESTDATA_PATH'] + os.sep
|
|
|
+READ_ZONE_DB_FILE = TESTDATA_PATH + "rwtest.sqlite3" # original, to be copied
|
|
|
+NEW_ZONE_TXT_FILE = LOCAL_TESTDATA_PATH + "example.org.zone"
|
|
|
+ALT_NEW_ZONE_TXT_FILE = TESTDATA_PATH + "example.com.zone"
|
|
|
+TESTDATA_WRITE_PATH = os.environ['TESTDATA_WRITE_PATH'] + os.sep
|
|
|
+WRITE_ZONE_DB_FILE = TESTDATA_WRITE_PATH + "rwtest.sqlite3.copied"
|
|
|
+TEST_ZONE_NAME = Name('example.org')
|
|
|
+DATASRC_CONFIG = '{"database_file": "' + WRITE_ZONE_DB_FILE + '"}'
|
|
|
+
|
|
|
+# before/after SOAs: different in mname and serial
|
|
|
+ORIG_SOA_TXT = 'example.org. 3600 IN SOA ns1.example.org. ' +\
|
|
|
+ 'admin.example.org. 1234 3600 1800 2419200 7200\n'
|
|
|
+NEW_SOA_TXT = 'example.org. 3600 IN SOA ns.example.org. ' +\
|
|
|
+ 'admin.example.org. 1235 3600 1800 2419200 7200\n'
|
|
|
+# This is the brandnew SOA for a newly created zone
|
|
|
+ALT_NEW_SOA_TXT = 'example.com. 3600 IN SOA ns.example.com. ' +\
|
|
|
+ 'admin.example.com. 1234 3600 1800 2419200 7200\n'
|
|
|
+
|
|
|
+class TestLoadZoneRunner(unittest.TestCase):
|
|
|
+ def setUp(self):
|
|
|
+ shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
|
|
|
+
|
|
|
+ # default command line arguments
|
|
|
+ self.__args = ['-c', DATASRC_CONFIG, 'example.org', NEW_ZONE_TXT_FILE]
|
|
|
+ self.__runner = LoadZoneRunner(self.__args)
|
|
|
+
|
|
|
+ def tearDown(self):
|
|
|
+ # Delete the used DB file; if some of the tests unexpectedly fail
|
|
|
+ # unexpectedly in the middle of updating the DB, a lock could stay
|
|
|
+ # there and would affect the other tests that would otherwise succeed.
|
|
|
+ os.unlink(WRITE_ZONE_DB_FILE)
|
|
|
+
|
|
|
+ def test_init(self):
|
|
|
+ '''
|
|
|
+ Checks initial class attributes
|
|
|
+ '''
|
|
|
+ self.assertIsNone(self.__runner._zone_class)
|
|
|
+ self.assertIsNone(self.__runner._zone_name)
|
|
|
+ self.assertIsNone(self.__runner._zone_file)
|
|
|
+ self.assertIsNone(self.__runner._datasrc_config)
|
|
|
+ self.assertIsNone(self.__runner._datasrc_type)
|
|
|
+ self.assertEqual(10000, self.__runner._report_interval)
|
|
|
+ self.assertEqual('INFO', self.__runner._log_severity)
|
|
|
+ self.assertEqual(0, self.__runner._log_debuglevel)
|
|
|
+
|
|
|
+ def test_parse_args(self):
|
|
|
+ self.__runner._parse_args()
|
|
|
+ self.assertEqual(TEST_ZONE_NAME, self.__runner._zone_name)
|
|
|
+ self.assertEqual(NEW_ZONE_TXT_FILE, self.__runner._zone_file)
|
|
|
+ self.assertEqual(DATASRC_CONFIG, self.__runner._datasrc_config)
|
|
|
+ self.assertEqual('sqlite3', self.__runner._datasrc_type) # default
|
|
|
+ self.assertEqual(10000, self.__runner._report_interval) # default
|
|
|
+ self.assertEqual(RRClass.IN(), self.__runner._zone_class) # default
|
|
|
+ self.assertEqual('INFO', self.__runner._log_severity) # default
|
|
|
+ self.assertEqual(0, self.__runner._log_debuglevel)
|
|
|
+
|
|
|
+ def test_set_loglevel(self):
|
|
|
+ runner = LoadZoneRunner(['-d', '1'] + self.__args)
|
|
|
+ runner._parse_args()
|
|
|
+ self.assertEqual('DEBUG', runner._log_severity)
|
|
|
+ self.assertEqual(1, runner._log_debuglevel)
|
|
|
+
|
|
|
+ def test_parse_bad_args(self):
|
|
|
+ # There must be exactly 2 non-option arguments: zone name and zone file
|
|
|
+ self.assertRaises(BadArgument, LoadZoneRunner([])._parse_args)
|
|
|
+ self.assertRaises(BadArgument, LoadZoneRunner(['example']).
|
|
|
+ _parse_args)
|
|
|
+ self.assertRaises(BadArgument, LoadZoneRunner(self.__args + ['0']).
|
|
|
+ _parse_args)
|
|
|
+
|
|
|
+ # Bad zone name
|
|
|
+ args = ['example.org', 'example.zone'] # otherwise valid args
|
|
|
+ self.assertRaises(BadArgument,
|
|
|
+ LoadZoneRunner(['bad..name', 'example.zone'] + args).
|
|
|
+ _parse_args)
|
|
|
+
|
|
|
+ # Bad class name
|
|
|
+ self.assertRaises(BadArgument,
|
|
|
+ LoadZoneRunner(['-C', 'badclass'] + args).
|
|
|
+ _parse_args)
|
|
|
+ # Unsupported class
|
|
|
+ self.assertRaises(BadArgument,
|
|
|
+ LoadZoneRunner(['-C', 'CH'] + args)._parse_args)
|
|
|
+
|
|
|
+ # bad debug level
|
|
|
+ self.assertRaises(BadArgument,
|
|
|
+ LoadZoneRunner(['-d', '-10'] + args)._parse_args)
|
|
|
+
|
|
|
+ # bad report interval
|
|
|
+ self.assertRaises(BadArgument,
|
|
|
+ LoadZoneRunner(['-i', '-5'] + args)._parse_args)
|
|
|
+
|
|
|
+ # -c cannot be omitted unless it's type sqlite3 (right now)
|
|
|
+ self.assertRaises(BadArgument,
|
|
|
+ LoadZoneRunner(['-t', 'memory'] + args)._parse_args)
|
|
|
+
|
|
|
+ def test_get_datasrc_config(self):
|
|
|
+ # For sqlite3, we use the config with the well-known DB file.
|
|
|
+ expected_conf = \
|
|
|
+ '{"database_file": "' + bind10_config.DATA_PATH + '/zone.sqlite3"}'
|
|
|
+ self.assertEqual(expected_conf,
|
|
|
+ self.__runner._get_datasrc_config('sqlite3'))
|
|
|
+
|
|
|
+ # For other types, config must be given by hand for now
|
|
|
+ self.assertRaises(BadArgument, self.__runner._get_datasrc_config,
|
|
|
+ 'memory')
|
|
|
+
|
|
|
+ def __common_load_setup(self):
|
|
|
+ self.__runner._zone_class = RRClass.IN()
|
|
|
+ self.__runner._zone_name = TEST_ZONE_NAME
|
|
|
+ self.__runner._zone_file = NEW_ZONE_TXT_FILE
|
|
|
+ self.__runner._datasrc_type = 'sqlite3'
|
|
|
+ self.__runner._datasrc_config = DATASRC_CONFIG
|
|
|
+ self.__runner._report_interval = 1
|
|
|
+ self.__reports = []
|
|
|
+ self.__runner._report_progress = lambda x: self.__reports.append(x)
|
|
|
+
|
|
|
+ def __check_zone_soa(self, soa_txt, zone_name=TEST_ZONE_NAME):
|
|
|
+ """Check that the given SOA RR exists and matches the expected string
|
|
|
+
|
|
|
+ If soa_txt is None, the zone is expected to be non-existent.
|
|
|
+ Otherwise, if soa_txt is False, the zone should exist but SOA is
|
|
|
+ expected to be missing.
|
|
|
+
|
|
|
+ """
|
|
|
+
|
|
|
+ client = DataSourceClient('sqlite3', DATASRC_CONFIG)
|
|
|
+ result, finder = client.find_zone(zone_name)
|
|
|
+ if soa_txt is None:
|
|
|
+ self.assertEqual(client.NOTFOUND, result)
|
|
|
+ return
|
|
|
+ self.assertEqual(client.SUCCESS, result)
|
|
|
+ result, rrset, _ = finder.find(zone_name, RRType.SOA())
|
|
|
+ if soa_txt:
|
|
|
+ self.assertEqual(finder.SUCCESS, result)
|
|
|
+ self.assertEqual(soa_txt, rrset.to_text())
|
|
|
+ else:
|
|
|
+ self.assertEqual(finder.NXRRSET, result)
|
|
|
+
|
|
|
+ def test_load_update(self):
|
|
|
+ '''successful case to loading new contents to an existing zone.'''
|
|
|
+ self.__common_load_setup()
|
|
|
+ self.__check_zone_soa(ORIG_SOA_TXT)
|
|
|
+ self.__runner._do_load()
|
|
|
+ # In this test setup every loaded RR will be reported, and there will
|
|
|
+ # be 3 RRs
|
|
|
+ self.assertEqual([1, 2, 3], self.__reports)
|
|
|
+ self.__check_zone_soa(NEW_SOA_TXT)
|
|
|
+
|
|
|
+ def test_load_update_skipped_report(self):
|
|
|
+ '''successful loading, with reports for every 2 RRs'''
|
|
|
+ self.__common_load_setup()
|
|
|
+ self.__runner._report_interval = 2
|
|
|
+ self.__runner._do_load()
|
|
|
+ self.assertEqual([2], self.__reports)
|
|
|
+
|
|
|
+ def test_load_update_no_report(self):
|
|
|
+ '''successful loading, without progress reports'''
|
|
|
+ self.__common_load_setup()
|
|
|
+ self.__runner._report_interval = 0
|
|
|
+ self.__runner._do_load()
|
|
|
+ self.assertEqual([], self.__reports) # no report
|
|
|
+ self.__check_zone_soa(NEW_SOA_TXT) # but load is completed
|
|
|
+
|
|
|
+ def test_create_and_load(self):
|
|
|
+ '''successful case to loading contents to a new zone (created).'''
|
|
|
+ self.__common_load_setup()
|
|
|
+ self.__runner._zone_name = Name('example.com')
|
|
|
+ self.__runner._zone_file = ALT_NEW_ZONE_TXT_FILE
|
|
|
+ self.__check_zone_soa(None, zone_name=Name('example.com'))
|
|
|
+ self.__runner._do_load()
|
|
|
+ self.__check_zone_soa(ALT_NEW_SOA_TXT, zone_name=Name('example.com'))
|
|
|
+
|
|
|
+ def test_load_fail_badconfig(self):
|
|
|
+ '''Load attempt fails due to broken datasrc config.'''
|
|
|
+ self.__common_load_setup()
|
|
|
+ self.__runner._datasrc_config = "invalid config"
|
|
|
+ self.__check_zone_soa(ORIG_SOA_TXT)
|
|
|
+ self.assertRaises(LoadFailure, self.__runner._do_load)
|
|
|
+ self.__check_zone_soa(ORIG_SOA_TXT) # no change to the zone
|
|
|
+
|
|
|
+ def test_load_fail_badzone(self):
|
|
|
+ '''Load attempt fails due to broken zone file.'''
|
|
|
+ self.__common_load_setup()
|
|
|
+ self.__runner._zone_file = \
|
|
|
+ LOCAL_TESTDATA_PATH + '/broken-example.org.zone'
|
|
|
+ self.__check_zone_soa(ORIG_SOA_TXT)
|
|
|
+ self.assertRaises(LoadFailure, self.__runner._do_load)
|
|
|
+ self.__check_zone_soa(ORIG_SOA_TXT)
|
|
|
+
|
|
|
+ def test_load_fail_noloader(self):
|
|
|
+ '''Load attempt fails because loading isn't supported'''
|
|
|
+ self.__common_load_setup()
|
|
|
+ self.__runner._datasrc_type = 'memory'
|
|
|
+ self.__runner._datasrc_config = '{"type": "memory"}'
|
|
|
+ self.__check_zone_soa(ORIG_SOA_TXT)
|
|
|
+ self.assertRaises(LoadFailure, self.__runner._do_load)
|
|
|
+ self.__check_zone_soa(ORIG_SOA_TXT)
|
|
|
+
|
|
|
+ def test_load_fail_create_cancel(self):
|
|
|
+ '''Load attempt fails and new creation of zone is canceled'''
|
|
|
+ self.__common_load_setup()
|
|
|
+ self.__runner._zone_name = Name('example.com')
|
|
|
+ self.__runner._zone_file = 'no-such-file'
|
|
|
+ self.__check_zone_soa(None, zone_name=Name('example.com'))
|
|
|
+ self.assertRaises(LoadFailure, self.__runner._do_load)
|
|
|
+ # _do_load() should have once created the zone but then canceled it.
|
|
|
+ self.__check_zone_soa(None, zone_name=Name('example.com'))
|
|
|
+
|
|
|
+ def __common_post_load_setup(self, zone_file):
|
|
|
+ '''Common setup procedure for post load tests.'''
|
|
|
+ # replace the LoadZoneRunner's original _post_load_warning() for
|
|
|
+ # inspection
|
|
|
+ self.__warnings = []
|
|
|
+ self.__runner._post_load_warning = \
|
|
|
+ lambda msg: self.__warnings.append(msg)
|
|
|
+
|
|
|
+ # perform load and invoke checks
|
|
|
+ self.__common_load_setup()
|
|
|
+ self.__runner._zone_file = zone_file
|
|
|
+ self.__check_zone_soa(ORIG_SOA_TXT)
|
|
|
+ self.__runner._do_load()
|
|
|
+ self.__runner._post_load_checks()
|
|
|
+
|
|
|
+ def test_load_post_check_fail_soa(self):
|
|
|
+ '''Load succeeds but warns about missing SOA, should cause warn'''
|
|
|
+ self.__common_load_setup()
|
|
|
+ self.__common_post_load_setup(LOCAL_TESTDATA_PATH +
|
|
|
+ '/example-nosoa.org.zone')
|
|
|
+ self.__check_zone_soa(False)
|
|
|
+ self.assertEqual(1, len(self.__warnings))
|
|
|
+ self.assertEqual('zone has no SOA', self.__warnings[0])
|
|
|
+
|
|
|
+ def test_load_post_check_fail_ns(self):
|
|
|
+ '''Load succeeds but warns about missing NS, should cause warn'''
|
|
|
+ self.__common_load_setup()
|
|
|
+ self.__common_post_load_setup(LOCAL_TESTDATA_PATH +
|
|
|
+ '/example-nons.org.zone')
|
|
|
+ self.__check_zone_soa(NEW_SOA_TXT)
|
|
|
+ self.assertEqual(1, len(self.__warnings))
|
|
|
+ self.assertEqual('zone has no NS', self.__warnings[0])
|
|
|
+
|
|
|
+ def __interrupt_progress(self, loaded_rrs):
|
|
|
+ '''A helper emulating a signal in the middle of loading.
|
|
|
+
|
|
|
+ On the second progress report, it internally invokes the signal
|
|
|
+ handler to see if it stops the loading.
|
|
|
+
|
|
|
+ '''
|
|
|
+ self.__reports.append(loaded_rrs)
|
|
|
+ if len(self.__reports) == 2:
|
|
|
+ self.__runner._interrupt_handler()
|
|
|
+
|
|
|
+ def test_load_interrupted(self):
|
|
|
+ '''Load attempt fails due to signal interruption'''
|
|
|
+ self.__common_load_setup()
|
|
|
+ self.__runner._report_progress = lambda x: self.__interrupt_progress(x)
|
|
|
+ # The interrupting _report_progress() will terminate the loading
|
|
|
+ # in the middle. the number of reports is smaller, and the zone
|
|
|
+ # won't be changed.
|
|
|
+ self.assertRaises(LoadFailure, self.__runner._do_load)
|
|
|
+ self.assertEqual([1, 2], self.__reports)
|
|
|
+ self.__check_zone_soa(ORIG_SOA_TXT)
|
|
|
+
|
|
|
+ def test_load_interrupted_create_cancel(self):
|
|
|
+ '''Load attempt for a new zone fails due to signal interruption
|
|
|
+
|
|
|
+ It cancels the zone creation.
|
|
|
+
|
|
|
+ '''
|
|
|
+ self.__common_load_setup()
|
|
|
+ self.__runner._report_progress = lambda x: self.__interrupt_progress(x)
|
|
|
+ self.__runner._zone_name = Name('example.com')
|
|
|
+ self.__runner._zone_file = ALT_NEW_ZONE_TXT_FILE
|
|
|
+ self.__check_zone_soa(None, zone_name=Name('example.com'))
|
|
|
+ self.assertRaises(LoadFailure, self.__runner._do_load)
|
|
|
+ self.assertEqual([1, 2], self.__reports)
|
|
|
+ self.__check_zone_soa(None, zone_name=Name('example.com'))
|
|
|
+
|
|
|
+ def test_run_success(self):
|
|
|
+ '''Check for the top-level method.
|
|
|
+
|
|
|
+ Detailed behavior is tested in other tests. We only check the
|
|
|
+ return value of run(), and the zone is successfully loaded.
|
|
|
+
|
|
|
+ '''
|
|
|
+ self.__check_zone_soa(ORIG_SOA_TXT)
|
|
|
+ self.assertEqual(0, self.__runner.run())
|
|
|
+ self.__check_zone_soa(NEW_SOA_TXT)
|
|
|
+
|
|
|
+ def test_run_fail(self):
|
|
|
+ '''Check for the top-level method, failure case.
|
|
|
+
|
|
|
+ Similar to the success test, but loading will fail, and return
|
|
|
+ value should be 1.
|
|
|
+
|
|
|
+ '''
|
|
|
+ runner = LoadZoneRunner(['-c', DATASRC_CONFIG, 'example.org',
|
|
|
+ LOCAL_TESTDATA_PATH +
|
|
|
+ '/broken-example.org.zone'])
|
|
|
+ self.__check_zone_soa(ORIG_SOA_TXT)
|
|
|
+ self.assertEqual(1, runner.run())
|
|
|
+ self.__check_zone_soa(ORIG_SOA_TXT)
|
|
|
+
|
|
|
+if __name__== "__main__":
|
|
|
+ isc.log.resetUnitTestRootLogger()
|
|
|
+ # Disable the internal logging setup so the test output won't be too
|
|
|
+ # verbose by default.
|
|
|
+ LoadZoneRunner._config_log = lambda x: None
|
|
|
+
|
|
|
+ # Cancel signal handlers so we can stop tests when they hang
|
|
|
+ LoadZoneRunner._set_signal_handlers = lambda x: None
|
|
|
+ unittest.main()
|