123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342 |
- # Copyright (C) 2012 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- '''Tests for the loadzone module'''
- import unittest
- from loadzone import *
- from isc.dns import *
- from isc.datasrc import *
- import isc.log
- import bind10_config
- import os
- import shutil
- # Some common test parameters
- TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
- READ_ZONE_DB_FILE = TESTDATA_PATH + "rwtest.sqlite3" # original, to be copied
- LOCAL_TESTDATA_PATH = os.environ['LOCAL_TESTDATA_PATH'] + os.sep
- READ_ZONE_DB_FILE = TESTDATA_PATH + "rwtest.sqlite3" # original, to be copied
- NEW_ZONE_TXT_FILE = LOCAL_TESTDATA_PATH + "example.org.zone"
- ALT_NEW_ZONE_TXT_FILE = TESTDATA_PATH + "example.com.zone"
- TESTDATA_WRITE_PATH = os.environ['TESTDATA_WRITE_PATH'] + os.sep
- WRITE_ZONE_DB_FILE = TESTDATA_WRITE_PATH + "rwtest.sqlite3.copied"
- TEST_ZONE_NAME = Name('example.org')
- DATASRC_CONFIG = '{"database_file": "' + WRITE_ZONE_DB_FILE + '"}'
- # before/after SOAs: different in mname and serial
- ORIG_SOA_TXT = 'example.org. 3600 IN SOA ns1.example.org. ' +\
- 'admin.example.org. 1234 3600 1800 2419200 7200\n'
- NEW_SOA_TXT = 'example.org. 3600 IN SOA ns.example.org. ' +\
- 'admin.example.org. 1235 3600 1800 2419200 7200\n'
- # This is the brandnew SOA for a newly created zone
- ALT_NEW_SOA_TXT = 'example.com. 3600 IN SOA ns.example.com. ' +\
- 'admin.example.com. 1234 3600 1800 2419200 7200\n'
- class TestLoadZoneRunner(unittest.TestCase):
- def setUp(self):
- shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
- # default command line arguments
- self.__args = ['-c', DATASRC_CONFIG, 'example.org', NEW_ZONE_TXT_FILE]
- self.__runner = LoadZoneRunner(self.__args)
- def tearDown(self):
- # Delete the used DB file; if some of the tests unexpectedly fail
- # unexpectedly in the middle of updating the DB, a lock could stay
- # there and would affect the other tests that would otherwise succeed.
- os.unlink(WRITE_ZONE_DB_FILE)
- def test_init(self):
- '''
- Checks initial class attributes
- '''
- self.assertIsNone(self.__runner._zone_class)
- self.assertIsNone(self.__runner._zone_name)
- self.assertIsNone(self.__runner._zone_file)
- self.assertIsNone(self.__runner._datasrc_config)
- self.assertIsNone(self.__runner._datasrc_type)
- self.assertEqual(10000, self.__runner._report_interval)
- self.assertEqual('INFO', self.__runner._log_severity)
- self.assertEqual(0, self.__runner._log_debuglevel)
- def test_parse_args(self):
- self.__runner._parse_args()
- self.assertEqual(TEST_ZONE_NAME, self.__runner._zone_name)
- self.assertEqual(NEW_ZONE_TXT_FILE, self.__runner._zone_file)
- self.assertEqual(DATASRC_CONFIG, self.__runner._datasrc_config)
- self.assertEqual('sqlite3', self.__runner._datasrc_type) # default
- self.assertEqual(10000, self.__runner._report_interval) # default
- self.assertEqual(RRClass.IN(), self.__runner._zone_class) # default
- self.assertEqual('INFO', self.__runner._log_severity) # default
- self.assertEqual(0, self.__runner._log_debuglevel)
- def test_set_loglevel(self):
- runner = LoadZoneRunner(['-d', '1'] + self.__args)
- runner._parse_args()
- self.assertEqual('DEBUG', runner._log_severity)
- self.assertEqual(1, runner._log_debuglevel)
- def test_parse_bad_args(self):
- # There must be exactly 2 non-option arguments: zone name and zone file
- self.assertRaises(BadArgument, LoadZoneRunner([])._parse_args)
- self.assertRaises(BadArgument, LoadZoneRunner(['example']).
- _parse_args)
- self.assertRaises(BadArgument, LoadZoneRunner(self.__args + ['0']).
- _parse_args)
- # Bad zone name
- args = ['example.org', 'example.zone'] # otherwise valid args
- self.assertRaises(BadArgument,
- LoadZoneRunner(['bad..name', 'example.zone'] + args).
- _parse_args)
- # Bad class name
- self.assertRaises(BadArgument,
- LoadZoneRunner(['-C', 'badclass'] + args).
- _parse_args)
- # Unsupported class
- self.assertRaises(BadArgument,
- LoadZoneRunner(['-C', 'CH'] + args)._parse_args)
- # bad debug level
- self.assertRaises(BadArgument,
- LoadZoneRunner(['-d', '-10'] + args)._parse_args)
- # bad report interval
- self.assertRaises(BadArgument,
- LoadZoneRunner(['-i', '-5'] + args)._parse_args)
- # -c cannot be omitted unless it's type sqlite3 (right now)
- self.assertRaises(BadArgument,
- LoadZoneRunner(['-t', 'memory'] + args)._parse_args)
- def test_get_datasrc_config(self):
- # For sqlite3, we use the config with the well-known DB file.
- expected_conf = \
- '{"database_file": "' + bind10_config.DATA_PATH + '/zone.sqlite3"}'
- self.assertEqual(expected_conf,
- self.__runner._get_datasrc_config('sqlite3'))
- # For other types, config must be given by hand for now
- self.assertRaises(BadArgument, self.__runner._get_datasrc_config,
- 'memory')
- def __common_load_setup(self):
- self.__runner._zone_class = RRClass.IN()
- self.__runner._zone_name = TEST_ZONE_NAME
- self.__runner._zone_file = NEW_ZONE_TXT_FILE
- self.__runner._datasrc_type = 'sqlite3'
- self.__runner._datasrc_config = DATASRC_CONFIG
- self.__runner._report_interval = 1
- self.__reports = []
- self.__runner._report_progress = lambda x: self.__reports.append(x)
- def __check_zone_soa(self, soa_txt, zone_name=TEST_ZONE_NAME):
- """Check that the given SOA RR exists and matches the expected string
- If soa_txt is None, the zone is expected to be non-existent.
- Otherwise, if soa_txt is False, the zone should exist but SOA is
- expected to be missing.
- """
- client = DataSourceClient('sqlite3', DATASRC_CONFIG)
- result, finder = client.find_zone(zone_name)
- if soa_txt is None:
- self.assertEqual(client.NOTFOUND, result)
- return
- self.assertEqual(client.SUCCESS, result)
- result, rrset, _ = finder.find(zone_name, RRType.SOA())
- if soa_txt:
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual(soa_txt, rrset.to_text())
- else:
- self.assertEqual(finder.NXRRSET, result)
- def test_load_update(self):
- '''successful case to loading new contents to an existing zone.'''
- self.__common_load_setup()
- self.__check_zone_soa(ORIG_SOA_TXT)
- self.__runner._do_load()
- # In this test setup every loaded RR will be reported, and there will
- # be 3 RRs
- self.assertEqual([1, 2, 3], self.__reports)
- self.__check_zone_soa(NEW_SOA_TXT)
- def test_load_update_skipped_report(self):
- '''successful loading, with reports for every 2 RRs'''
- self.__common_load_setup()
- self.__runner._report_interval = 2
- self.__runner._do_load()
- self.assertEqual([2], self.__reports)
- def test_load_update_no_report(self):
- '''successful loading, without progress reports'''
- self.__common_load_setup()
- self.__runner._report_interval = 0
- self.__runner._do_load()
- self.assertEqual([], self.__reports) # no report
- self.__check_zone_soa(NEW_SOA_TXT) # but load is completed
- def test_create_and_load(self):
- '''successful case to loading contents to a new zone (created).'''
- self.__common_load_setup()
- self.__runner._zone_name = Name('example.com')
- self.__runner._zone_file = ALT_NEW_ZONE_TXT_FILE
- self.__check_zone_soa(None, zone_name=Name('example.com'))
- self.__runner._do_load()
- self.__check_zone_soa(ALT_NEW_SOA_TXT, zone_name=Name('example.com'))
- def test_load_fail_badconfig(self):
- '''Load attempt fails due to broken datasrc config.'''
- self.__common_load_setup()
- self.__runner._datasrc_config = "invalid config"
- self.__check_zone_soa(ORIG_SOA_TXT)
- self.assertRaises(LoadFailure, self.__runner._do_load)
- self.__check_zone_soa(ORIG_SOA_TXT) # no change to the zone
- def test_load_fail_badzone(self):
- '''Load attempt fails due to broken zone file.'''
- self.__common_load_setup()
- self.__runner._zone_file = \
- LOCAL_TESTDATA_PATH + '/broken-example.org.zone'
- self.__check_zone_soa(ORIG_SOA_TXT)
- self.assertRaises(LoadFailure, self.__runner._do_load)
- self.__check_zone_soa(ORIG_SOA_TXT)
- def test_load_fail_noloader(self):
- '''Load attempt fails because loading isn't supported'''
- self.__common_load_setup()
- self.__runner._datasrc_type = 'memory'
- self.__runner._datasrc_config = '{"type": "memory"}'
- self.__check_zone_soa(ORIG_SOA_TXT)
- self.assertRaises(LoadFailure, self.__runner._do_load)
- self.__check_zone_soa(ORIG_SOA_TXT)
- def test_load_fail_create_cancel(self):
- '''Load attempt fails and new creation of zone is canceled'''
- self.__common_load_setup()
- self.__runner._zone_name = Name('example.com')
- self.__runner._zone_file = 'no-such-file'
- self.__check_zone_soa(None, zone_name=Name('example.com'))
- self.assertRaises(LoadFailure, self.__runner._do_load)
- # _do_load() should have once created the zone but then canceled it.
- self.__check_zone_soa(None, zone_name=Name('example.com'))
- def __common_post_load_setup(self, zone_file):
- '''Common setup procedure for post load tests.'''
- # replace the LoadZoneRunner's original _post_load_warning() for
- # inspection
- self.__warnings = []
- self.__runner._post_load_warning = \
- lambda msg: self.__warnings.append(msg)
- # perform load and invoke checks
- self.__common_load_setup()
- self.__runner._zone_file = zone_file
- self.__check_zone_soa(ORIG_SOA_TXT)
- self.__runner._do_load()
- self.__runner._post_load_checks()
- def test_load_post_check_fail_soa(self):
- '''Load succeeds but warns about missing SOA, should cause warn'''
- self.__common_load_setup()
- self.__common_post_load_setup(LOCAL_TESTDATA_PATH +
- '/example-nosoa.org.zone')
- self.__check_zone_soa(False)
- self.assertEqual(1, len(self.__warnings))
- self.assertEqual('zone has no SOA', self.__warnings[0])
- def test_load_post_check_fail_ns(self):
- '''Load succeeds but warns about missing NS, should cause warn'''
- self.__common_load_setup()
- self.__common_post_load_setup(LOCAL_TESTDATA_PATH +
- '/example-nons.org.zone')
- self.__check_zone_soa(NEW_SOA_TXT)
- self.assertEqual(1, len(self.__warnings))
- self.assertEqual('zone has no NS', self.__warnings[0])
- def __interrupt_progress(self, loaded_rrs):
- '''A helper emulating a signal in the middle of loading.
- On the second progress report, it internally invokes the signal
- handler to see if it stops the loading.
- '''
- self.__reports.append(loaded_rrs)
- if len(self.__reports) == 2:
- self.__runner._interrupt_handler()
- def test_load_interrupted(self):
- '''Load attempt fails due to signal interruption'''
- self.__common_load_setup()
- self.__runner._report_progress = lambda x: self.__interrupt_progress(x)
- # The interrupting _report_progress() will terminate the loading
- # in the middle. the number of reports is smaller, and the zone
- # won't be changed.
- self.assertRaises(LoadFailure, self.__runner._do_load)
- self.assertEqual([1, 2], self.__reports)
- self.__check_zone_soa(ORIG_SOA_TXT)
- def test_load_interrupted_create_cancel(self):
- '''Load attempt for a new zone fails due to signal interruption
- It cancels the zone creation.
- '''
- self.__common_load_setup()
- self.__runner._report_progress = lambda x: self.__interrupt_progress(x)
- self.__runner._zone_name = Name('example.com')
- self.__runner._zone_file = ALT_NEW_ZONE_TXT_FILE
- self.__check_zone_soa(None, zone_name=Name('example.com'))
- self.assertRaises(LoadFailure, self.__runner._do_load)
- self.assertEqual([1, 2], self.__reports)
- self.__check_zone_soa(None, zone_name=Name('example.com'))
- def test_run_success(self):
- '''Check for the top-level method.
- Detailed behavior is tested in other tests. We only check the
- return value of run(), and the zone is successfully loaded.
- '''
- self.__check_zone_soa(ORIG_SOA_TXT)
- self.assertEqual(0, self.__runner.run())
- self.__check_zone_soa(NEW_SOA_TXT)
- def test_run_fail(self):
- '''Check for the top-level method, failure case.
- Similar to the success test, but loading will fail, and return
- value should be 1.
- '''
- runner = LoadZoneRunner(['-c', DATASRC_CONFIG, 'example.org',
- LOCAL_TESTDATA_PATH +
- '/broken-example.org.zone'])
- self.__check_zone_soa(ORIG_SOA_TXT)
- self.assertEqual(1, runner.run())
- self.__check_zone_soa(ORIG_SOA_TXT)
- if __name__== "__main__":
- isc.log.resetUnitTestRootLogger()
- # Disable the internal logging setup so the test output won't be too
- # verbose by default.
- LoadZoneRunner._config_log = lambda x: None
- # Cancel signal handlers so we can stop tests when they hang
- LoadZoneRunner._set_signal_handlers = lambda x: None
- unittest.main()
|