123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394 |
- import isc.log
- import isc.datasrc
- import isc.dns
- import unittest
- import os
- import shutil
- TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
- TESTDATA_WRITE_PATH = os.environ['TESTDATA_WRITE_PATH'] + os.sep
- READ_ZONE_DB_FILE = TESTDATA_PATH + "example.com.sqlite3"
- BROKEN_DB_FILE = TESTDATA_PATH + "brokendb.sqlite3"
- WRITE_ZONE_DB_FILE = TESTDATA_WRITE_PATH + "rwtest.sqlite3.copied"
- NEW_DB_FILE = TESTDATA_WRITE_PATH + "new_db.sqlite3"
- def add_rrset(rrset_list, name, rrclass, rrtype, ttl, rdatas):
- rrset_to_add = isc.dns.RRset(name, rrclass, rrtype, ttl)
- if rdatas is not None:
- for rdata in rdatas:
- rrset_to_add.add_rdata(isc.dns.Rdata(rrtype, rrclass, rdata))
- rrset_list.append(rrset_to_add)
- def rrsets_equal(a, b):
-
-
-
-
- return a.get_name() == b.get_name() and\
- a.get_class() == b.get_class() and\
- a.get_type() == b.get_type() and \
- a.get_ttl() == b.get_ttl() and\
- (a.get_type() == isc.dns.RRType.RRSIG() or
- sorted(a.get_rdata()) == sorted(b.get_rdata()))
- def check_for_rrset(expected_rrsets, rrset):
- for cur_rrset in expected_rrsets[:]:
- if rrsets_equal(cur_rrset, rrset):
- expected_rrsets.remove(cur_rrset)
- return True
- return False
- class DataSrcClient(unittest.TestCase):
- def test_construct(self):
-
- self.assertRaises(TypeError, isc.datasrc.ZoneIterator)
- def test_iterate(self):
- dsc = isc.datasrc.DataSourceClient(READ_ZONE_DB_FILE)
-
-
- rrs = dsc.get_iterator(isc.dns.Name("sql1.example.com."))
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- expected_rrset_list = []
- name = isc.dns.Name("sql1.example.com")
- rrclass = isc.dns.RRClass.IN()
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.DNSKEY(), isc.dns.RRTTL(3600),
- [
- "256 3 5 AwEAAdYdRhBAEY67R/8G1N5AjGF6asIiNh/pNGeQ8xDQP13J"+
- "N2lo+sNqWcmpYNhuVqRbLB+mamsU1XcCICSBvAlSmfz/ZUdafX23knAr"+
- "TlALxMmspcfdpqun3Yr3YYnztuj06rV7RqmveYckWvAUXVYMSMQZfJ30"+
- "5fs0dE/xLztL/CzZ",
- "257 3 5 AwEAAbaKDSa9XEFTsjSYpUTHRotTS9Tz3krfDucugW5UokGQ"+
- "KC26QlyHXlPTZkC+aRFUs/dicJX2kopndLcnlNAPWiKnKtrsFSCnIJDB"+
- "ZIyvcKq+9RXmV3HK3bUdHnQZ88IZWBRmWKfZ6wnzHo53kdYKAemTErkz"+
- "taX3lRRPLYWpxRcDPEjysXT3Lh0vfL5D+CIO1yKw/q7C+v6+/kYAxc2l"+
- "fbNE3HpklSuF+dyX4nXxWgzbcFuLz5Bwfq6ZJ9RYe/kNkA0uMWNa1KkG"+
- "eRh8gg22kgD/KT5hPTnpezUWLvoY5Qc7IB3T0y4n2JIwiF2ZrZYVrWgD"+
- "jRWAzGsxJiJyjd6w2k0="
- ])
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.NS(), isc.dns.RRTTL(3600),
- [
- "dns01.example.com.",
- "dns02.example.com.",
- "dns03.example.com."
- ])
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.NSEC(), isc.dns.RRTTL(7200),
- [
- "www.sql1.example.com. NS SOA RRSIG NSEC DNSKEY"
- ])
-
-
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.RRSIG(), isc.dns.RRTTL(3600), None)
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.SOA(), isc.dns.RRTTL(3600),
- [
- "master.example.com. admin.example.com. 678 3600 1800 2419200 7200"
- ])
- name = isc.dns.Name("www.sql1.example.com.")
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.A(), isc.dns.RRTTL(3600),
- [
- "192.0.2.100"
- ])
- name = isc.dns.Name("www.sql1.example.com.")
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.NSEC(), isc.dns.RRTTL(7200),
- [
- "sql1.example.com. A RRSIG NSEC"
- ])
- add_rrset(expected_rrset_list, name, rrclass,
- isc.dns.RRType.RRSIG(), isc.dns.RRTTL(3600), None)
-
-
- rrset_to_check = rrs.get_next_rrset()
- while (rrset_to_check != None):
- self.assertTrue(check_for_rrset(expected_rrset_list,
- rrset_to_check),
- "Unexpected rrset returned by iterator:\n" +
- rrset_to_check.to_text())
- rrset_to_check = rrs.get_next_rrset()
-
- self.assertEqual(0, len(expected_rrset_list),
- "RRset(s) not returned by iterator: " +
- str([rrset.to_text() for rrset in expected_rrset_list ]
- ))
-
-
- self.assertRaises(isc.datasrc.Error, rrs.get_next_rrset)
- rrets = dsc.get_iterator(isc.dns.Name("example.com"))
-
-
- self.assertEqual(55, len(list(rrets)))
-
-
- self.assertRaises(isc.datasrc.Error, rrs.get_next_rrset)
- self.assertRaises(TypeError, dsc.get_iterator, "asdf")
- def test_construct(self):
-
- self.assertRaises(TypeError, isc.datasrc.ZoneFinder)
- def test_find(self):
- dsc = isc.datasrc.DataSourceClient(READ_ZONE_DB_FILE)
- result, finder = dsc.find_zone(isc.dns.Name("example.com"))
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
- self.assertEqual("example.com.", finder.get_origin().to_text())
- result, rrset = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- result, rrset = finder.find(isc.dns.Name("www.sql1.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.DELEGATION, result)
- self.assertEqual("sql1.example.com. 3600 IN NS dns01.example.com.\n" +
- "sql1.example.com. 3600 IN NS dns02.example.com.\n" +
- "sql1.example.com. 3600 IN NS dns03.example.com.\n",
- rrset.to_text())
- result, rrset = finder.find(isc.dns.Name("doesnotexist.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.NXDOMAIN, result)
- self.assertEqual(None, rrset)
- result, rrset = finder.find(isc.dns.Name("www.some.other.domain"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.NXDOMAIN, result)
- self.assertEqual(None, rrset)
- result, rrset = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.TXT(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.NXRRSET, result)
- self.assertEqual(None, rrset)
- result, rrset = finder.find(isc.dns.Name("cname-ext.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.CNAME, result)
- self.assertEqual(
- "cname-ext.example.com. 3600 IN CNAME www.sql1.example.com.\n",
- rrset.to_text())
- self.assertRaises(TypeError, finder.find,
- "foo",
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertRaises(TypeError, finder.find,
- isc.dns.Name("cname-ext.example.com"),
- "foo",
- None,
- finder.FIND_DEFAULT)
- self.assertRaises(TypeError, finder.find,
- isc.dns.Name("cname-ext.example.com"),
- isc.dns.RRType.A(),
- None,
- "foo")
- class DataSrcUpdater(unittest.TestCase):
- def setUp(self):
-
- shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
- def test_construct(self):
-
- self.assertRaises(TypeError, isc.datasrc.ZoneUpdater)
- def test_update_delete_commit(self):
- dsc = isc.datasrc.DataSourceClient(WRITE_ZONE_DB_FILE)
-
- result, finder = dsc.find_zone(isc.dns.Name("example.com"))
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
- self.assertEqual("example.com.", finder.get_origin().to_text())
- result, rrset = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- rrset_to_delete = rrset;
-
-
- updater = dsc.get_updater(isc.dns.Name("example.com"), True)
- self.assertRaises(isc.datasrc.Error, updater.delete_rrset,
- rrset_to_delete)
- rrset_to_delete.remove_rrsig()
- updater.delete_rrset(rrset_to_delete)
-
-
- result, rrset = updater.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.NXDOMAIN, result)
- self.assertEqual(None, rrset)
- result, rrset = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- updater.commit()
-
- self.assertRaises(isc.datasrc.Error, updater.commit)
-
- result, rrset = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.NXDOMAIN, result)
- self.assertEqual(None, rrset)
-
- updater = dsc.get_updater(isc.dns.Name("example.com"), True)
- updater.add_rrset(rrset_to_delete)
- updater.commit()
-
- self.assertRaises(isc.datasrc.Error, updater.commit)
- result, rrset = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- def test_update_delete_abort(self):
- dsc = isc.datasrc.DataSourceClient(WRITE_ZONE_DB_FILE)
-
- result, finder = dsc.find_zone(isc.dns.Name("example.com"))
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
- self.assertEqual("example.com.", finder.get_origin().to_text())
- result, rrset = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- rrset_to_delete = rrset;
-
-
- updater = dsc.get_updater(isc.dns.Name("example.com"), True)
- self.assertRaises(isc.datasrc.Error, updater.delete_rrset,
- rrset_to_delete)
- rrset_to_delete.remove_rrsig()
- updater.delete_rrset(rrset_to_delete)
-
-
- result, rrset = updater.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.NXDOMAIN, result)
- self.assertEqual(None, rrset)
-
- updater = None
-
- result, rrset = finder.find(isc.dns.Name("www.example.com"),
- isc.dns.RRType.A(),
- None,
- finder.FIND_DEFAULT)
- self.assertEqual(finder.SUCCESS, result)
- self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
- rrset.to_text())
- def test_update_for_no_zone(self):
- dsc = isc.datasrc.DataSourceClient(WRITE_ZONE_DB_FILE)
- self.assertEqual(None,
- dsc.get_updater(isc.dns.Name("notexistent.example"),
- True))
- if __name__ == "__main__":
- isc.log.init("bind10")
- unittest.main()
|