|
- # Copyright (C) 2011 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- import isc.log
- import isc.datasrc
- from isc.datasrc import ZoneFinder, ZoneJournalReader
- from isc.dns import *
- from isc.testutils.rrset_utils import rrsets_equal
- import unittest
- import sqlite3
- import os
- import shutil
- import sys
- import json
- TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
- TESTDATA_WRITE_PATH = os.environ['TESTDATA_WRITE_PATH'] + os.sep
- READ_ZONE_DB_FILE = TESTDATA_PATH + "example.com.sqlite3"
- WRITE_ZONE_DB_FILE = TESTDATA_WRITE_PATH + "rwtest.sqlite3.copied"
- READ_ZONE_DB_CONFIG = "{ \"database_file\": \"" + READ_ZONE_DB_FILE + "\" }"
- WRITE_ZONE_DB_CONFIG = "{ \"database_file\": \"" + WRITE_ZONE_DB_FILE + "\"}"
- def add_rrset(rrset_list, name, rrclass, rrtype, ttl, rdatas):
- rrset_to_add = isc.dns.RRset(name, rrclass, rrtype, ttl)
- if rdatas is not None:
- for rdata in rdatas:
- rrset_to_add.add_rdata(isc.dns.Rdata(rrtype, rrclass, rdata))
- rrset_list.append(rrset_to_add)
- # returns true if rrset is in expected_rrsets
- # will remove the rrset from expected_rrsets if found
- def check_for_rrset(expected_rrsets, rrset):
- for cur_rrset in expected_rrsets[:]:
- if rrsets_equal(cur_rrset, rrset):
- expected_rrsets.remove(cur_rrset)
- return True
- return False
- def create_soa(serial):
- soa = RRset(Name('example.org'), RRClass.IN(), RRType.SOA(), RRTTL(3600))
- soa.add_rdata(Rdata(RRType.SOA(), RRClass.IN(),
- 'ns1.example.org. admin.example.org. ' +
- str(serial) + ' 3600 1800 2419200 7200'))
- return soa
- def test_findall_common(self, tested):
- """
- Common part of the find_all test. It tests a find_all method on the passed
- object.
- """
- # Some "failure" responses
- result, rrset, _ = tested.find_all(isc.dns.Name("www.sql1.example.com"),
- ZoneFinder.FIND_DEFAULT)
- self.assertEqual(ZoneFinder.DELEGATION, result)
- expected = RRset(Name('sql1.example.com.'), RRClass.IN(), RRType.NS(),
- RRTTL(3600))
- expected.add_rdata(Rdata(RRType.NS(), RRClass.IN(),
- 'dns01.example.com.'))
- expected.add_rdata(Rdata(RRType.NS(), RRClass.IN(),
- 'dns02.example.com.'))
- expected.add_rdata(Rdata(RRType.NS(), RRClass.IN(),
- 'dns03.example.com.'))
- self.assertTrue(rrsets_equal(expected, rrset))
- result, rrset, _ = tested.find_all(isc.dns.Name("nxdomain.example.com"),
- ZoneFinder.FIND_DEFAULT)
- self.assertEqual(ZoneFinder.NXDOMAIN, result)
- self.assertIsNone(None, rrset)
- # A success. It should return the list now.
- # This also tests we can ommit the options parameter
- result, rrsets, _ = tested.find_all(isc.dns.Name("mix.example.com."))
- self.assertEqual(ZoneFinder.SUCCESS, result)
- self.assertEqual(2, len(rrsets))
- rrsets.sort(key=lambda rrset: rrset.get_type().to_text())
- expected = [
- RRset(Name('mix.example.com.'), RRClass.IN(), RRType.A(),
- RRTTL(3600)),
- RRset(Name('mix.example.com.'), RRClass.IN(), RRType.AAAA(),
- RRTTL(3600))
- ]
- expected[0].add_rdata(Rdata(RRType.A(), RRClass.IN(), "192.0.2.1"))
- expected[0].add_rdata(Rdata(RRType.A(), RRClass.IN(), "192.0.2.2"))
- expected[1].add_rdata(Rdata(RRType.AAAA(), RRClass.IN(),
- "2001:db8::1"))
- expected[1].add_rdata(Rdata(RRType.AAAA(), RRClass.IN(),
- "2001:db8::2"))
- for (rrset, exp) in zip(rrsets, expected):
- self.assertTrue(rrsets_equal(exp, rrset))
- # Check the reference counts on them. The getrefcount returns one more,
- # as for the reference in its own parameter - see its docs.
- # Two - one for the variable, one for parameter
- self.assertEqual(2, sys.getrefcount(rrsets))
- for rrset in rrsets:
- # 3 - one as the element of list, one for the rrset variable
- # and one for the parameter.
- self.assertEqual(3, sys.getrefcount(rrset))
- class DataSrcClient(unittest.TestCase):
- def test_(self):
- # can't construct directly
- self.assertRaises(TypeError, isc.datasrc.ZoneIterator)
- self.assertRaises(TypeError, isc.datasrc.DataSourceClient, 1, "{}")
- self.assertRaises(TypeError, isc.datasrc.DataSourceClient, "sqlite3", 1)
- self.assertRaises(isc.datasrc.Error,
- isc.datasrc.DataSourceClient, "foo", "{}")
- self.assertRaises(isc.datasrc.Error,
- isc.datasrc.DataSourceClient, "sqlite3", "")
- self.assertRaises(isc.datasrc.Error,
- isc.datasrc.DataSourceClient, "sqlite3", "{}")
- self.assertRaises(isc.datasrc.Error,
- isc.datasrc.DataSourceClient, "sqlite3",
- "{ \"foo\": 1 }")
- self.assertRaises(isc.datasrc.Error,
- isc.datasrc.DataSourceClient, "memory",
- "{ \"foo\": 1 }")
- def test_iterate(self):
- dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
- # for RRSIGS, the TTL's are currently modified. This test should
- # start failing when we fix that.
- rrs = dsc.get_iterator(isc.dns.Name("sql1.example.com."), True)
- # we do not know the order in which they are returned by the iterator
- # but we do want to check them, so we put all records into one list
- # sort it (doesn't matter which way it is sorted, as long as it is
- # sorted)
- # RRset is (atm) an unorderable type, and within an rrset, the
- # rdatas and rrsigs may also be in random order. In theory the
- # rrsets themselves can be returned in any order.
- #
- # So we create a second list with all rrsets we expect, and for each
- # rrset we get from the iterator, see if it is in that list, and
- # remove it.
- #
- # When the iterator is empty, we check no rrsets are left in the
- # list of expected ones
- expected_rrset_list = []
- name = isc.dns.Name("sql1.example.com")
- rrclass = isc.dns.RRClass.IN()
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.DNSKEY(), isc.dns.RRTTL(3600),
- [
- "256 3 5 AwEAAdYdRhBAEY67R/8G1N5AjGF6asIiNh/pNGeQ8xDQP13J"+
- "N2lo+sNqWcmpYNhuVqRbLB+mamsU1XcCICSBvAlSmfz/ZUdafX23knAr"+
- "TlALxMmspcfdpqun3Yr3YYnztuj06rV7RqmveYckWvAUXVYMSMQZfJ30"+
- "5fs0dE/xLztL/CzZ"
- ])
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.DNSKEY(), isc.dns.RRTTL(3600),
- [
- "257 3 5 AwEAAbaKDSa9XEFTsjSYpUTHRotTS9Tz3krfDucugW5UokGQ"+
- "KC26QlyHXlPTZkC+aRFUs/dicJX2kopndLcnlNAPWiKnKtrsFSCnIJDB"+
- "ZIyvcKq+9RXmV3HK3bUdHnQZ88IZWBRmWKfZ6wnzHo53kdYKAemTErkz"+
- "taX3lRRPLYWpxRcDPEjysXT3Lh0vfL5D+CIO1yKw/q7C+v6+/kYAxc2l"+
- "fbNE3HpklSuF+dyX4nXxWgzbcFuLz5Bwfq6ZJ9RYe/kNkA0uMWNa1KkG"+
- "eRh8gg22kgD/KT5hPTnpezUWLvoY5Qc7IB3T0y4n2JIwiF2ZrZYVrWgD"+
- "jRWAzGsxJiJyjd6w2k0="
- ])
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.NS(), isc.dns.RRTTL(3600),
- [
- "dns01.example.com."
- ])
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.NS(), isc.dns.RRTTL(3600),
- [
- "dns02.example.com."
- ])
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.NS(), isc.dns.RRTTL(3600),
- [
- "dns03.example.com."
- ])
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.NSEC(), isc.dns.RRTTL(7200),
- [
- "www.sql1.example.com. NS SOA RRSIG NSEC DNSKEY"
- ])
- # For RRSIGS, we can't add the fake data through the API, so we
- # simply pass no rdata at all (which is skipped by the check later)
-
- # Since we passed separate_rrs = True to get_iterator, we get several
- # sets of RRSIGs, one for each TTL
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.RRSIG(), isc.dns.RRTTL(3600), None)
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.RRSIG(), isc.dns.RRTTL(3600), None)
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.RRSIG(), isc.dns.RRTTL(3600), None)
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.RRSIG(), isc.dns.RRTTL(3600), None)
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.RRSIG(), isc.dns.RRTTL(7200), None)
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.SOA(), isc.dns.RRTTL(3600),
- [
- "master.example.com. admin.example.com. 678 3600 1800 2419200 7200"
- ])
- name = isc.dns.Name("www.sql1.example.com.")
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.A(), isc.dns.RRTTL(3600),
- [
- "192.0.2.100"
- ])
- name = isc.dns.Name("www.sql1.example.com.")
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.NSEC(), isc.dns.RRTTL(7200),
- [
- "sql1.example.com. A RRSIG NSEC"
- ])
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.RRSIG(), isc.dns.RRTTL(3600), None)
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.RRSIG(), isc.dns.RRTTL(7200), None)
- # rrs is an iterator, but also has direct get_next_rrset(), use
- # the latter one here
- rrset_to_check = rrs.get_next_rrset()
- while (rrset_to_check != None):
- self.assertTrue(check_for_rrset(expected_rrset_list,
- rrset_to_check),
- "Unexpected rrset returned by iterator:\n" +
- rrset_to_check.to_text())
- rrset_to_check = rrs.get_next_rrset()
- # Now check there are none left
- self.assertEqual(0, len(expected_rrset_list),
- "RRset(s) not returned by iterator: " +
- str([rrset.get_name().to_text() + '/' +
- rrset.get_type().to_text() for rrset in
- expected_rrset_list ]
- ))
- # TODO should we catch this (iterating past end) and just return None
- # instead of failing?
- self.assertRaises(isc.datasrc.Error, rrs.get_next_rrset)
- # Without the separate_rrs argument, it should return 55 RRsets
- dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
- rrets = dsc.get_iterator(isc.dns.Name("example.com"))
- # there are more than 80 RRs in this zone... let's just count them
- # (already did a full check of the smaller zone above)
- # There are 40 non-RRSIG RRsets and 32 dinstinct RRSIGs.
- self.assertEqual(72, len(list(rrets)))
- # same test, but now with explicit False argument for separate_rrs
- dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
- rrets = dsc.get_iterator(isc.dns.Name("example.com"), False)
- # there are more than 80 RRs in this zone... let's just count them
- # (already did a full check of the smaller zone above)
- self.assertEqual(72, len(list(rrets)))
- dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
- rrets = dsc.get_iterator(isc.dns.Name("example.com"), True)
- # there are more than 80 RRs in this zone... let's just count them
- # (already did a full check of the smaller zone above)
- self.assertEqual(84, len(list(rrets)))
- # TODO should we catch this (iterating past end) and just return None
- # instead of failing?
- self.assertRaises(isc.datasrc.Error, rrs.get_next_rrset)
- self.assertRaises(TypeError, dsc.get_iterator, "asdf")
- def test_iterator_soa(self):
- dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
- iterator = dsc.get_iterator(isc.dns.Name("sql1.example.com."))
- expected_soa = isc.dns.RRset(isc.dns.Name("sql1.example.com."),
- isc.dns.RRClass.IN(),
- isc.dns.RRType.SOA(),
- isc.dns.RRTTL(3600))
- expected_soa.add_rdata(isc.dns.Rdata(isc.dns.RRType.SOA(),
- isc.dns.RRClass.IN(),
- "master.example.com. " +
- "admin.example.com. 678 " +
- "3600 1800 2419200 7200"))
- self.assertTrue(rrsets_equal(expected_soa, iterator.get_soa()))
- def test_construct(self):
- # can't construct directly
- self.assertRaises(TypeError, isc.datasrc.ZoneFinder)
- def test_findoptions(self):
- '''A simple test to confirm no option is specified by default.
- '''
- self.assertFalse(ZoneFinder.FIND_DEFAULT & ZoneFinder.FIND_GLUE_OK)
- self.assertFalse(ZoneFinder.FIND_DEFAULT & ZoneFinder.FIND_DNSSEC)
- self.assertFalse(ZoneFinder.FIND_DEFAULT & ZoneFinder.NO_WILDCARD)
- def test_findresults(self):
- '''A simple test to confirm result codes are (defined and) different
- for some combinations.
- '''
- self.assertNotEqual(ZoneFinder.SUCCESS, ZoneFinder.DELEGATION)
- self.assertNotEqual(ZoneFinder.DELEGATION, ZoneFinder.NXDOMAIN)
- self.assertNotEqual(ZoneFinder.NXDOMAIN, ZoneFinder.NXRRSET)
- self.assertNotEqual(ZoneFinder.NXRRSET, ZoneFinder.CNAME)
- self.assertNotEqual(ZoneFinder.CNAME, ZoneFinder.DNAME)
- def test_findresultflags(self):
- '''A simple test just confirming the flags are all different.'''
- self.assertNotEqual(ZoneFinder.RESULT_WILDCARD,
- ZoneFinder.RESULT_NSEC_SIGNED)
- self.assertNotEqual(ZoneFinder.RESULT_NSEC_SIGNED,
- ZoneFinder.RESULT_NSEC3_SIGNED)
- self.assertNotEqual(ZoneFinder.RESULT_NSEC3_SIGNED,
- ZoneFinder.RESULT_WILDCARD)
- def test_findall(self):
- """
- A test for the find_all method.
- """
- dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
- result, finder = dsc.find_zone(isc.dns.Name("example.com"))
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
- self.assertEqual("example.com.", finder.get_origin().to_text())
- test_findall_common(self, finder)
- def test_find(self):
- dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
- result, finder = dsc.find_zone(isc.dns.Name("example.com"))
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
- self.assertEqual("example.com.", finder.get_origin().to_text())
- result, rrset, _ = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- # Check the optional parameters are optional
- result, rrset, _ = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A())
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- result, rrset, _ = finder.find(isc.dns.Name("www.sql1.example.com"),
- isc.dns.RRType.A(),
- finder.FIND_DEFAULT)
- self.assertEqual(finder.DELEGATION, result)
- self.assertEqual("sql1.example.com. 3600 IN NS dns01.example.com.\n" +
- "sql1.example.com. 3600 IN NS dns02.example.com.\n" +
- "sql1.example.com. 3600 IN NS dns03.example.com.\n",
- rrset.to_text())
- result, rrset, _ = finder.find(isc.dns.Name("doesnotexist.example.com"),
- isc.dns.RRType.A(),
- finder.FIND_DEFAULT)
- self.assertEqual(finder.NXDOMAIN, result)
- self.assertEqual(None, rrset)
- self.assertRaises(isc.datasrc.OutOfZone, finder.find,
- isc.dns.Name("www.some.other.domain"),
- isc.dns.RRType.A())
- result, rrset, _ = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.TXT(),
- finder.FIND_DEFAULT)
- self.assertEqual(finder.NXRRSET, result)
- self.assertEqual(None, rrset)
- result, rrset, _ = finder.find(isc.dns.Name("cname-ext.example.com"),
- isc.dns.RRType.A(),
- finder.FIND_DEFAULT)
- self.assertEqual(finder.CNAME, result)
- self.assertEqual(
- "cname-ext.example.com. 3600 IN CNAME www.sql1.example.com.\n",
- rrset.to_text())
- result, rrset, flags = \
- finder.find(isc.dns.Name("foo.wild.example.com"),
- isc.dns.RRType.A(), finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual(finder.RESULT_WILDCARD, flags)
- self.assertEqual("foo.wild.example.com. 3600 IN A 192.0.2.255\n",
- rrset.to_text())
- result, rrset, _ = finder.find(isc.dns.Name("foo.wild.example.com"),
- isc.dns.RRType.TXT(),
- finder.FIND_DEFAULT)
- self.assertEqual(finder.NXRRSET, result)
- self.assertTrue(finder.RESULT_WILDCARD, flags)
- self.assertEqual(None, rrset)
- self.assertRaises(TypeError, finder.find,
- "foo",
- isc.dns.RRType.A(),
- finder.FIND_DEFAULT)
- self.assertRaises(TypeError, finder.find,
- isc.dns.Name("cname-ext.example.com"),
- "foo",
- finder.FIND_DEFAULT)
- self.assertRaises(TypeError, finder.find,
- isc.dns.Name("cname-ext.example.com"),
- isc.dns.RRType.A(),
- "foo")
- def test_find_previous(self):
- dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
- result, finder = dsc.find_zone(isc.dns.Name("example.com"))
- self.assertEqual(finder.SUCCESS, result)
- prev = finder.find_previous_name(isc.dns.Name("bbb.example.com"))
- self.assertEqual("example.com.", prev.to_text())
- prev = finder.find_previous_name(isc.dns.Name("zzz.example.com"))
- self.assertEqual("www.example.com.", prev.to_text())
- prev = finder.find_previous_name(prev)
- self.assertEqual("*.wild.example.com.", prev.to_text())
- self.assertRaises(isc.datasrc.NotImplemented,
- finder.find_previous_name,
- isc.dns.Name("com"))
- class DataSrcUpdater(unittest.TestCase):
- def setUp(self):
- # Make a fresh copy of the writable database with all original content
- shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
- def test_findall(self):
- """
- The same test as DataSrcClient.test_findall, but on an updater
- instead of a finder.
- """
- dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
- updater = dsc.get_updater(isc.dns.Name("example.com"), False)
- test_findall_common(self, updater)
- def test_construct(self):
- # can't construct directly
- self.assertRaises(TypeError, isc.datasrc.ZoneUpdater)
- def test_update_finder(self):
- # Check basic behavior of updater's finder
- dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
- updater = dsc.get_updater(isc.dns.Name("example.com"), False)
- result, rrset, _ = updater.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- ZoneFinder.FIND_DEFAULT)
- self.assertEqual(ZoneFinder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- # Omit optional parameters
- result, rrset, _ = updater.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A())
- self.assertEqual(ZoneFinder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- def test_update_delete_commit(self):
- dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
- # first make sure, through a separate finder, that some record exists
- result, finder = dsc.find_zone(isc.dns.Name("example.com"))
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
- self.assertEqual("example.com.", finder.get_origin().to_text())
- result, rrset, _ = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- rrset_to_delete = rrset;
- # can't delete rrset with associated sig. Abuse that to force an
- # exception first, then remove the sig, then delete the record
- updater = dsc.get_updater(isc.dns.Name("example.com"), True)
- self.assertRaises(isc.datasrc.Error, updater.delete_rrset,
- rrset_to_delete)
- rrset_to_delete.remove_rrsig()
- updater.delete_rrset(rrset_to_delete)
- # The record should be gone in the updater, but not in the original
- # finder (since we have not committed)
- result, rrset, _ = updater.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- finder.FIND_DEFAULT)
- self.assertEqual(finder.NXDOMAIN, result)
- self.assertEqual(None, rrset)
- result, rrset, _ = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- updater.commit()
- # second commit should raise exception
- self.assertRaises(isc.datasrc.Error, updater.commit)
- # the record should be gone now in the 'real' finder as well
- result, rrset, _ = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- finder.FIND_DEFAULT)
- self.assertEqual(finder.NXDOMAIN, result)
- self.assertEqual(None, rrset)
- # now add it again
- updater = dsc.get_updater(isc.dns.Name("example.com"), True)
- updater.add_rrset(rrset_to_delete)
- updater.commit()
- # second commit should throw
- self.assertRaises(isc.datasrc.Error, updater.commit)
- result, rrset, _ = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- def test_two_modules(self):
- # load two modules, and check if they don't interfere
- mem_cfg = { "type": "memory", "class": "IN", "zones": [] };
- dsc_mem = isc.datasrc.DataSourceClient("memory", json.dumps(mem_cfg))
- dsc_sql = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
- # check if exceptions are working
- self.assertRaises(isc.datasrc.Error, isc.datasrc.DataSourceClient,
- "memory", "{}")
- self.assertRaises(isc.datasrc.Error, isc.datasrc.DataSourceClient,
- "sqlite3", "{}")
- # see if a lookup succeeds in sqlite3 ds
- result, finder = dsc_sql.find_zone(isc.dns.Name("example.com"))
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
- self.assertEqual("example.com.", finder.get_origin().to_text())
- result, rrset, _ = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- # see if a lookup fails in mem ds
- result, finder = dsc_mem.find_zone(isc.dns.Name("example.com"))
- self.assertEqual(finder.NXDOMAIN, result)
- def test_update_delete_abort(self):
- # we don't do enything with this one, just making sure loading two
- # datasources
- dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
- # first make sure, through a separate finder, that some record exists
- result, finder = dsc.find_zone(isc.dns.Name("example.com"))
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
- self.assertEqual("example.com.", finder.get_origin().to_text())
- result, rrset, _ = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- rrset_to_delete = rrset;
- # can't delete rrset with associated sig. Abuse that to force an
- # exception first, then remove the sig, then delete the record
- updater = dsc.get_updater(isc.dns.Name("example.com"), True)
- self.assertRaises(isc.datasrc.Error, updater.delete_rrset,
- rrset_to_delete)
- rrset_to_delete.remove_rrsig()
- updater.delete_rrset(rrset_to_delete)
- # The record should be gone in the updater, but not in the original
- # finder (since we have not committed)
- result, rrset, _ = updater.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- finder.FIND_DEFAULT)
- self.assertEqual(finder.NXDOMAIN, result)
- self.assertEqual(None, rrset)
- # destroy the updater, which should make it roll back
- updater = None
- # the record should still be available in the 'real' finder as well
- result, rrset, _ = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- def test_update_for_no_zone(self):
- dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
- self.assertEqual(None,
- dsc.get_updater(isc.dns.Name("notexistent.example"),
- True))
- def test_client_reference(self):
- # Temporarily create various objects using factory methods of the
- # client. The created objects won't be stored anywhere and
- # immediately released. The creation shouldn't affect the reference
- # to the base client.
- dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
- orig_ref = sys.getrefcount(dsc)
- dsc.find_zone(isc.dns.Name("example.com"))
- self.assertEqual(orig_ref, sys.getrefcount(dsc))
- dsc.get_iterator(isc.dns.Name("example.com."))
- self.assertEqual(orig_ref, sys.getrefcount(dsc))
- dsc.get_updater(isc.dns.Name("example.com"), True)
- self.assertEqual(orig_ref, sys.getrefcount(dsc))
- def test_iterate_over_empty_zone(self):
- # empty the test zone first
- dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
- updater = dsc.get_updater(isc.dns.Name("example.com"), True)
- updater.commit()
- # Check the iterator behavior for the empty zone.
- iterator = dsc.get_iterator(isc.dns.Name("example.com."))
- self.assertEqual(None, iterator.get_soa())
- self.assertEqual(None, iterator.get_next_rrset())
- class JournalWrite(unittest.TestCase):
- def setUp(self):
- # Make a fresh copy of the writable database with all original content
- shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
- self.dsc = isc.datasrc.DataSourceClient("sqlite3",
- WRITE_ZONE_DB_CONFIG)
- self.updater = self.dsc.get_updater(Name("example.com"), False, True)
- def tearDown(self):
- self.dsc = None
- self.updater = None
- def check_journal(self, expected_list):
- # This assumes sqlite3 DB and directly fetches stored data from
- # the DB file. It should be generalized using ZoneJournalReader
- # once it's supported.
- conn = sqlite3.connect(WRITE_ZONE_DB_FILE)
- cur = conn.cursor()
- cur.execute('SELECT name, rrtype, ttl, rdata FROM diffs ORDER BY id')
- actual_list = cur.fetchall()
- self.assertEqual(len(expected_list), len(actual_list))
- for (expected, actual) in zip(expected_list, actual_list):
- self.assertEqual(expected, actual)
- conn.close()
- def create_a(self, address):
- a_rr = RRset(Name('www.example.org'), RRClass.IN(), RRType.A(),
- RRTTL(3600))
- a_rr.add_rdata(Rdata(RRType.A(), RRClass.IN(), address))
- return (a_rr)
- def test_journal_write(self):
- # This is a straightforward port of the C++ 'journal' test
- # Note: we add/delete 'out of zone' data (example.org in the
- # example.com zone for convenience.
- self.updater.delete_rrset(create_soa(1234))
- self.updater.delete_rrset(self.create_a('192.0.2.2'))
- self.updater.add_rrset(create_soa(1235))
- self.updater.add_rrset(self.create_a('192.0.2.2'))
- self.updater.commit()
- expected = []
- expected.append(("example.org.", "SOA", 3600,
- "ns1.example.org. admin.example.org. " +
- "1234 3600 1800 2419200 7200"))
- expected.append(("www.example.org.", "A", 3600, "192.0.2.2"))
- expected.append(("example.org.", "SOA", 3600,
- "ns1.example.org. admin.example.org. " +
- "1235 3600 1800 2419200 7200"))
- expected.append(("www.example.org.", "A", 3600, "192.0.2.2"))
- self.check_journal(expected)
- def test_journal_write_multiple(self):
- # This is a straightforward port of the C++ 'journalMultiple' test
- expected = []
- for i in range(1, 100):
- self.updater.delete_rrset(create_soa(1234 + i - 1))
- expected.append(("example.org.", "SOA", 3600,
- "ns1.example.org. admin.example.org. " +
- str(1234 + i - 1) + " 3600 1800 2419200 7200"))
- self.updater.add_rrset(create_soa(1234 + i))
- expected.append(("example.org.", "SOA", 3600,
- "ns1.example.org. admin.example.org. " +
- str(1234 + i) + " 3600 1800 2419200 7200"))
- self.updater.commit()
- self.check_journal(expected)
- def test_journal_write_bad_sequence(self):
- # This is a straightforward port of the C++ 'journalBadSequence' test
- # Delete A before SOA
- self.assertRaises(isc.datasrc.Error, self.updater.delete_rrset,
- self.create_a('192.0.2.1'))
- # Add before delete
- self.updater = self.dsc.get_updater(Name("example.com"), False, True)
- self.assertRaises(isc.datasrc.Error, self.updater.add_rrset,
- create_soa(1234))
- # Add A before SOA
- self.updater = self.dsc.get_updater(Name("example.com"), False, True)
- self.updater.delete_rrset(create_soa(1234))
- self.assertRaises(isc.datasrc.Error, self.updater.add_rrset,
- self.create_a('192.0.2.1'))
- # Commit before add
- self.updater = self.dsc.get_updater(Name("example.com"), False, True)
- self.updater.delete_rrset(create_soa(1234))
- self.assertRaises(isc.datasrc.Error, self.updater.commit)
- # Delete two SOAs
- self.updater = self.dsc.get_updater(Name("example.com"), False, True)
- self.updater.delete_rrset(create_soa(1234))
- self.assertRaises(isc.datasrc.Error, self.updater.delete_rrset,
- create_soa(1235))
- # Add two SOAs
- self.updater = self.dsc.get_updater(Name("example.com"), False, True)
- self.updater.delete_rrset(create_soa(1234))
- self.updater.add_rrset(create_soa(1235))
- self.assertRaises(isc.datasrc.Error, self.updater.add_rrset,
- create_soa(1236))
- def test_journal_write_onerase(self):
- self.updater = None
- self.assertRaises(isc.datasrc.Error, self.dsc.get_updater,
- Name("example.com"), True, True)
- def test_journal_write_badparam(self):
- dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
- self.assertRaises(TypeError, dsc.get_updater, 0, False, True)
- self.assertRaises(TypeError, dsc.get_updater, Name('example.com'),
- False, 0)
- self.assertRaises(TypeError, dsc.get_updater, Name("example.com"),
- 1, True)
- class JournalRead(unittest.TestCase):
- def setUp(self):
- # Make a fresh copy of the writable database with all original content
- self.zname = Name('example.com')
- shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
- self.dsc = isc.datasrc.DataSourceClient("sqlite3",
- WRITE_ZONE_DB_CONFIG)
- self.reader = None
- def tearDown(self):
- # Some tests leave the reader in the middle of sequence, holding
- # the lock. Since the unittest framework keeps each test object
- # until the end of the entire tests, we need to make sure the reader
- # is released at the end of each test. The client shouldn't do harm
- # but we clean it up, too, just in case.
- self.dsc = None
- self.reader = None
- def make_simple_diff(self, begin_soa):
- updater = self.dsc.get_updater(self.zname, False, True)
- updater.delete_rrset(begin_soa)
- updater.add_rrset(create_soa(1235))
- updater.commit()
- def test_journal_reader(self):
- # This is a straightforward port of the C++ 'journalReader' test
- self.make_simple_diff(create_soa(1234))
- result, self.reader = self.dsc.get_journal_reader(self.zname, 1234,
- 1235)
- self.assertEqual(ZoneJournalReader.SUCCESS, result)
- self.assertNotEqual(None, self.reader)
- rrsets_equal(create_soa(1234), self.reader.get_next_diff())
- rrsets_equal(create_soa(1235), self.reader.get_next_diff())
- self.assertEqual(None, self.reader.get_next_diff())
- self.assertRaises(ValueError, self.reader.get_next_diff)
- def test_journal_reader_with_large_serial(self):
- # similar to the previous one, but use a very large serial to check
- # if the python wrapper code has unexpected integer overflow
- self.make_simple_diff(create_soa(4294967295))
- result, self.reader = self.dsc.get_journal_reader(self.zname,
- 4294967295, 1235)
- self.assertNotEqual(None, self.reader)
- # dump to text and compare them in case create_soa happens to have
- # an overflow bug
- self.assertEqual('example.org. 3600 IN SOA ns1.example.org. ' + \
- 'admin.example.org. 4294967295 3600 1800 ' + \
- '2419200 7200\n',
- self.reader.get_next_diff().to_text())
- def test_journal_reader_large_journal(self):
- # This is a straightforward port of the C++ 'readLargeJournal' test.
- # In this test we use the ZoneJournalReader object as a Python
- # iterator.
- updater = self.dsc.get_updater(self.zname, False, True)
- expected = []
- for i in range(0, 100):
- rrset = create_soa(1234 + i)
- updater.delete_rrset(rrset)
- expected.append(rrset)
- rrset = create_soa(1234 + i + 1)
- updater.add_rrset(rrset)
- expected.append(rrset)
- updater.commit()
- _, self.reader = self.dsc.get_journal_reader(self.zname, 1234, 1334)
- self.assertNotEqual(None, self.reader)
- i = 0
- for rr in self.reader:
- self.assertNotEqual(len(expected), i)
- rrsets_equal(expected[i], rr)
- i += 1
- self.assertEqual(len(expected), i)
- def test_journal_reader_no_range(self):
- # This is a straightforward port of the C++ 'readJournalForNoRange'
- # test
- self.make_simple_diff(create_soa(1234))
- result, self.reader = self.dsc.get_journal_reader(self.zname, 1200,
- 1235)
- self.assertEqual(ZoneJournalReader.NO_SUCH_VERSION, result)
- self.assertEqual(None, self.reader)
- def test_journal_reader_no_zone(self):
- # This is a straightforward port of the C++ 'journalReaderForNXZone'
- # test
- result, self.reader = self.dsc.get_journal_reader(Name('nosuchzone'),
- 0, 1)
- self.assertEqual(ZoneJournalReader.NO_SUCH_ZONE, result)
- self.assertEqual(None, self.reader)
- def test_journal_reader_bad_params(self):
- self.assertRaises(TypeError, self.dsc.get_journal_reader,
- 'example.com.', 0, 1)
- self.assertRaises(TypeError, self.dsc.get_journal_reader,
- self.zname, 'must be int', 1)
- self.assertRaises(TypeError, self.dsc.get_journal_reader,
- self.zname, 0, 'must be int')
- def test_journal_reader_direct_construct(self):
- # ZoneJournalReader can only be constructed via a factory
- self.assertRaises(TypeError, ZoneJournalReader)
- if __name__ == "__main__":
- isc.log.init("bind10")
- isc.log.resetUnitTestRootLogger()
- unittest.main()
|