123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475 |
- # Copyright (C) 2011 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- import isc.log
- import isc.datasrc
- import isc.dns
- import unittest
- import os
- import shutil
- import json
- TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
- TESTDATA_WRITE_PATH = os.environ['TESTDATA_WRITE_PATH'] + os.sep
- READ_ZONE_DB_FILE = TESTDATA_PATH + "example.com.sqlite3"
- WRITE_ZONE_DB_FILE = TESTDATA_WRITE_PATH + "rwtest.sqlite3.copied"
- READ_ZONE_DB_CONFIG = "{ \"database_file\": \"" + READ_ZONE_DB_FILE + "\" }"
- WRITE_ZONE_DB_CONFIG = "{ \"database_file\": \"" + WRITE_ZONE_DB_FILE + "\"}"
- def add_rrset(rrset_list, name, rrclass, rrtype, ttl, rdatas):
- rrset_to_add = isc.dns.RRset(name, rrclass, rrtype, ttl)
- if rdatas is not None:
- for rdata in rdatas:
- rrset_to_add.add_rdata(isc.dns.Rdata(rrtype, rrclass, rdata))
- rrset_list.append(rrset_to_add)
- # helper function, we have no direct rrset comparison atm
- def rrsets_equal(a, b):
- # no accessor for sigs either (so this only checks name, class, type, ttl,
- # and rdata)
- # also, because of the fake data in rrsigs, if the type is rrsig, the
- # rdata is not checked
- return a.get_name() == b.get_name() and\
- a.get_class() == b.get_class() and\
- a.get_type() == b.get_type() and \
- a.get_ttl() == b.get_ttl() and\
- (a.get_type() == isc.dns.RRType.RRSIG() or
- sorted(a.get_rdata()) == sorted(b.get_rdata()))
- # returns true if rrset is in expected_rrsets
- # will remove the rrset from expected_rrsets if found
- def check_for_rrset(expected_rrsets, rrset):
- for cur_rrset in expected_rrsets[:]:
- if rrsets_equal(cur_rrset, rrset):
- expected_rrsets.remove(cur_rrset)
- return True
- return False
- class DataSrcClient(unittest.TestCase):
- def test_constructors(self):
- # can't construct directly
- self.assertRaises(TypeError, isc.datasrc.ZoneIterator)
- self.assertRaises(TypeError, isc.datasrc.DataSourceClient, 1, "{}")
- self.assertRaises(TypeError, isc.datasrc.DataSourceClient, "sqlite3", 1)
- self.assertRaises(isc.datasrc.Error,
- isc.datasrc.DataSourceClient, "foo", "{}")
- self.assertRaises(isc.datasrc.Error,
- isc.datasrc.DataSourceClient, "sqlite3", "")
- self.assertRaises(isc.datasrc.Error,
- isc.datasrc.DataSourceClient, "sqlite3", "{}")
- self.assertRaises(isc.datasrc.Error,
- isc.datasrc.DataSourceClient, "sqlite3",
- "{ \"foo\": 1 }")
- self.assertRaises(isc.datasrc.Error,
- isc.datasrc.DataSourceClient, "memory",
- "{ \"foo\": 1 }")
- def test_iterate(self):
- dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
- # for RRSIGS, the TTL's are currently modified. This test should
- # start failing when we fix that.
- rrs = dsc.get_iterator(isc.dns.Name("sql1.example.com."))
- # we do not know the order in which they are returned by the iterator
- # but we do want to check them, so we put all records into one list
- # sort it (doesn't matter which way it is sorted, as long as it is
- # sorted)
- # RRset is (atm) an unorderable type, and within an rrset, the
- # rdatas and rrsigs may also be in random order. In theory the
- # rrsets themselves can be returned in any order.
- #
- # So we create a second list with all rrsets we expect, and for each
- # rrset we get from the iterator, see if it is in that list, and
- # remove it.
- #
- # When the iterator is empty, we check no rrsets are left in the
- # list of expected ones
- expected_rrset_list = []
- name = isc.dns.Name("sql1.example.com")
- rrclass = isc.dns.RRClass.IN()
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.DNSKEY(), isc.dns.RRTTL(3600),
- [
- "256 3 5 AwEAAdYdRhBAEY67R/8G1N5AjGF6asIiNh/pNGeQ8xDQP13J"+
- "N2lo+sNqWcmpYNhuVqRbLB+mamsU1XcCICSBvAlSmfz/ZUdafX23knAr"+
- "TlALxMmspcfdpqun3Yr3YYnztuj06rV7RqmveYckWvAUXVYMSMQZfJ30"+
- "5fs0dE/xLztL/CzZ",
- "257 3 5 AwEAAbaKDSa9XEFTsjSYpUTHRotTS9Tz3krfDucugW5UokGQ"+
- "KC26QlyHXlPTZkC+aRFUs/dicJX2kopndLcnlNAPWiKnKtrsFSCnIJDB"+
- "ZIyvcKq+9RXmV3HK3bUdHnQZ88IZWBRmWKfZ6wnzHo53kdYKAemTErkz"+
- "taX3lRRPLYWpxRcDPEjysXT3Lh0vfL5D+CIO1yKw/q7C+v6+/kYAxc2l"+
- "fbNE3HpklSuF+dyX4nXxWgzbcFuLz5Bwfq6ZJ9RYe/kNkA0uMWNa1KkG"+
- "eRh8gg22kgD/KT5hPTnpezUWLvoY5Qc7IB3T0y4n2JIwiF2ZrZYVrWgD"+
- "jRWAzGsxJiJyjd6w2k0="
- ])
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.NS(), isc.dns.RRTTL(3600),
- [
- "dns01.example.com.",
- "dns02.example.com.",
- "dns03.example.com."
- ])
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.NSEC(), isc.dns.RRTTL(7200),
- [
- "www.sql1.example.com. NS SOA RRSIG NSEC DNSKEY"
- ])
- # For RRSIGS, we can't add the fake data through the API, so we
- # simply pass no rdata at all (which is skipped by the check later)
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.RRSIG(), isc.dns.RRTTL(3600), None)
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.SOA(), isc.dns.RRTTL(3600),
- [
- "master.example.com. admin.example.com. 678 3600 1800 2419200 7200"
- ])
- name = isc.dns.Name("www.sql1.example.com.")
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.A(), isc.dns.RRTTL(3600),
- [
- "192.0.2.100"
- ])
- name = isc.dns.Name("www.sql1.example.com.")
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.NSEC(), isc.dns.RRTTL(7200),
- [
- "sql1.example.com. A RRSIG NSEC"
- ])
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.RRSIG(), isc.dns.RRTTL(3600), None)
- # rrs is an iterator, but also has direct get_next_rrset(), use
- # the latter one here
- rrset_to_check = rrs.get_next_rrset()
- while (rrset_to_check != None):
- self.assertTrue(check_for_rrset(expected_rrset_list,
- rrset_to_check),
- "Unexpected rrset returned by iterator:\n" +
- rrset_to_check.to_text())
- rrset_to_check = rrs.get_next_rrset()
- # Now check there are none left
- self.assertEqual(0, len(expected_rrset_list),
- "RRset(s) not returned by iterator: " +
- str([rrset.to_text() for rrset in expected_rrset_list ]
- ))
- # TODO should we catch this (iterating past end) and just return None
- # instead of failing?
- self.assertRaises(isc.datasrc.Error, rrs.get_next_rrset)
- rrets = dsc.get_iterator(isc.dns.Name("example.com"))
- # there are more than 80 RRs in this zone... let's just count them
- # (already did a full check of the smaller zone above)
- self.assertEqual(55, len(list(rrets)))
- # TODO should we catch this (iterating past end) and just return None
- # instead of failing?
- self.assertRaises(isc.datasrc.Error, rrs.get_next_rrset)
- self.assertRaises(TypeError, dsc.get_iterator, "asdf")
- def test_construct(self):
- # can't construct directly
- self.assertRaises(TypeError, isc.datasrc.ZoneFinder)
- def test_find(self):
- dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
- result, finder = dsc.find_zone(isc.dns.Name("example.com"))
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
- self.assertEqual("example.com.", finder.get_origin().to_text())
- result, rrset = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- result, rrset = finder.find(isc.dns.Name("www.sql1.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.DELEGATION, result)
- self.assertEqual("sql1.example.com. 3600 IN NS dns01.example.com.\n" +
- "sql1.example.com. 3600 IN NS dns02.example.com.\n" +
- "sql1.example.com. 3600 IN NS dns03.example.com.\n",
- rrset.to_text())
- result, rrset = finder.find(isc.dns.Name("doesnotexist.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.NXDOMAIN, result)
- self.assertEqual(None, rrset)
- result, rrset = finder.find(isc.dns.Name("www.some.other.domain"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.NXDOMAIN, result)
- self.assertEqual(None, rrset)
- result, rrset = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.TXT(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.NXRRSET, result)
- self.assertEqual(None, rrset)
- result, rrset = finder.find(isc.dns.Name("cname-ext.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.CNAME, result)
- self.assertEqual(
- "cname-ext.example.com. 3600 IN CNAME www.sql1.example.com.\n",
- rrset.to_text())
- result, rrset = finder.find(isc.dns.Name("foo.wild.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.WILDCARD, result)
- self.assertEqual("foo.wild.example.com. 3600 IN A 192.0.2.255\n",
- rrset.to_text())
- result, rrset = finder.find(isc.dns.Name("foo.wild.example.com"),
- isc.dns.RRType.TXT(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.WILDCARD_NXRRSET, result)
- self.assertEqual(None, rrset)
- self.assertRaises(TypeError, finder.find,
- "foo",
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertRaises(TypeError, finder.find,
- isc.dns.Name("cname-ext.example.com"),
- "foo",
- None,
- finder.FIND_DEFAULT)
- self.assertRaises(TypeError, finder.find,
- isc.dns.Name("cname-ext.example.com"),
- isc.dns.RRType.A(),
- None,
- "foo")
- def test_find_previous(self):
- dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
- result, finder = dsc.find_zone(isc.dns.Name("example.com"))
- self.assertEqual(finder.SUCCESS, result)
- prev = finder.find_previous_name(isc.dns.Name("bbb.example.com"))
- self.assertEqual("example.com.", prev.to_text())
- prev = finder.find_previous_name(isc.dns.Name("zzz.example.com"))
- self.assertEqual("www.example.com.", prev.to_text())
- prev = finder.find_previous_name(prev)
- self.assertEqual("*.wild.example.com.", prev.to_text())
- self.assertRaises(isc.datasrc.NotImplemented,
- finder.find_previous_name,
- isc.dns.Name("com"))
- class DataSrcUpdater(unittest.TestCase):
- def setUp(self):
- # Make a fresh copy of the writable database with all original content
- shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
- def test_construct(self):
- # can't construct directly
- self.assertRaises(TypeError, isc.datasrc.ZoneUpdater)
- def test_update_delete_commit(self):
- dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
- # first make sure, through a separate finder, that some record exists
- result, finder = dsc.find_zone(isc.dns.Name("example.com"))
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
- self.assertEqual("example.com.", finder.get_origin().to_text())
- result, rrset = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- rrset_to_delete = rrset;
- # can't delete rrset with associated sig. Abuse that to force an
- # exception first, then remove the sig, then delete the record
- updater = dsc.get_updater(isc.dns.Name("example.com"), True)
- self.assertRaises(isc.datasrc.Error, updater.delete_rrset,
- rrset_to_delete)
- rrset_to_delete.remove_rrsig()
- updater.delete_rrset(rrset_to_delete)
- # The record should be gone in the updater, but not in the original
- # finder (since we have not committed)
- result, rrset = updater.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.NXDOMAIN, result)
- self.assertEqual(None, rrset)
- result, rrset = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- updater.commit()
- # second commit should raise exception
- self.assertRaises(isc.datasrc.Error, updater.commit)
- # the record should be gone now in the 'real' finder as well
- result, rrset = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.NXDOMAIN, result)
- self.assertEqual(None, rrset)
- # now add it again
- updater = dsc.get_updater(isc.dns.Name("example.com"), True)
- updater.add_rrset(rrset_to_delete)
- updater.commit()
- # second commit should throw
- self.assertRaises(isc.datasrc.Error, updater.commit)
- result, rrset = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- def test_two_modules(self):
- # load two modules, and check if they don't interfere
- mem_cfg = { "type": "memory", "class": "IN", "zones": [] };
- dsc_mem = isc.datasrc.DataSourceClient("memory", json.dumps(mem_cfg))
- dsc_sql = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
- # check if exceptions are working
- self.assertRaises(isc.datasrc.Error, isc.datasrc.DataSourceClient,
- "memory", "{}")
- self.assertRaises(isc.datasrc.Error, isc.datasrc.DataSourceClient,
- "sqlite3", "{}")
- # see if a lookup succeeds in sqlite3 ds
- result, finder = dsc_sql.find_zone(isc.dns.Name("example.com"))
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
- self.assertEqual("example.com.", finder.get_origin().to_text())
- result, rrset = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- # see if a lookup fails in mem ds
- result, finder = dsc_mem.find_zone(isc.dns.Name("example.com"))
- self.assertEqual(finder.NXDOMAIN, result)
- def test_update_delete_abort(self):
- # we don't do enything with this one, just making sure loading two
- # datasources
- dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
- # first make sure, through a separate finder, that some record exists
- result, finder = dsc.find_zone(isc.dns.Name("example.com"))
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
- self.assertEqual("example.com.", finder.get_origin().to_text())
- result, rrset = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- rrset_to_delete = rrset;
- # can't delete rrset with associated sig. Abuse that to force an
- # exception first, then remove the sig, then delete the record
- updater = dsc.get_updater(isc.dns.Name("example.com"), True)
- self.assertRaises(isc.datasrc.Error, updater.delete_rrset,
- rrset_to_delete)
- rrset_to_delete.remove_rrsig()
- updater.delete_rrset(rrset_to_delete)
- # The record should be gone in the updater, but not in the original
- # finder (since we have not committed)
- result, rrset = updater.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.NXDOMAIN, result)
- self.assertEqual(None, rrset)
- # destroy the updater, which should make it roll back
- updater = None
- # the record should still be available in the 'real' finder as well
- result, rrset = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- def test_update_for_no_zone(self):
- dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
- self.assertEqual(None,
- dsc.get_updater(isc.dns.Name("notexistent.example"),
- True))
- if __name__ == "__main__":
- isc.log.init("bind10")
- unittest.main()
|