|
@@ -36,7 +36,8 @@ TEST_ZONE_RECORD = Question(TEST_ZONE_NAME, TEST_RRCLASS, UPDATE_RRTYPE)
|
|
|
TEST_CLIENT6 = ('2001:db8::1', 53, 0, 0)
|
|
|
TEST_CLIENT4 = ('192.0.2.1', 53)
|
|
|
|
|
|
-def create_update_msg(zones=[TEST_ZONE_RECORD], prerequisites=[]):
|
|
|
+def create_update_msg(zones=[TEST_ZONE_RECORD], prerequisites=[],
|
|
|
+ updates=[]):
|
|
|
msg = Message(Message.RENDER)
|
|
|
msg.set_qid(5353) # arbitrary chosen
|
|
|
msg.set_opcode(Opcode.UPDATE())
|
|
@@ -45,6 +46,8 @@ def create_update_msg(zones=[TEST_ZONE_RECORD], prerequisites=[]):
|
|
|
msg.add_question(z)
|
|
|
for p in prerequisites:
|
|
|
msg.add_rrset(SECTION_PREREQUISITE, p)
|
|
|
+ for u in updates:
|
|
|
+ msg.add_rrset(SECTION_UPDATE, u)
|
|
|
|
|
|
renderer = MessageRenderer()
|
|
|
msg.to_wire(renderer)
|
|
@@ -150,6 +153,47 @@ class SessionTest(unittest.TestCase):
|
|
|
# zone class doesn't match
|
|
|
self.check_notauth(Name('example.org'), RRClass.CH())
|
|
|
|
|
|
+ def rrset_as_rrs_helper(self, rr, l):
|
|
|
+ l.append(rr.to_text())
|
|
|
+
|
|
|
+ def test_rrset_as_rrset(self):
|
|
|
+ rrset = isc.dns.RRset(isc.dns.Name("www.example.org"),
|
|
|
+ TEST_RRCLASS,
|
|
|
+ isc.dns.RRType.A(),
|
|
|
+ isc.dns.RRTTL(3600))
|
|
|
+ rrset.add_rdata(isc.dns.Rdata(rrset.get_type(),
|
|
|
+ rrset.get_class(),
|
|
|
+ "192.0.2.1"))
|
|
|
+
|
|
|
+ l = []
|
|
|
+ rrset_as_rrs(rrset, self.rrset_as_rrs_helper, rrset, l)
|
|
|
+ self.assertEqual(["www.example.org. 3600 IN A 192.0.2.1\n"], l)
|
|
|
+
|
|
|
+ rrset.add_rdata(isc.dns.Rdata(rrset.get_type(),
|
|
|
+ rrset.get_class(),
|
|
|
+ "192.0.2.2"))
|
|
|
+ rrset.add_rdata(isc.dns.Rdata(rrset.get_type(),
|
|
|
+ rrset.get_class(),
|
|
|
+ "192.0.2.3"))
|
|
|
+
|
|
|
+ # if the helper is called directly, the list should have
|
|
|
+ # one entry, with a multiline string
|
|
|
+ # but through the helper, there should be several 1-line entries
|
|
|
+ l = []
|
|
|
+ self.rrset_as_rrs_helper(rrset, l)
|
|
|
+ self.assertEqual(["www.example.org. 3600 IN A 192.0.2.1\n" +
|
|
|
+ "www.example.org. 3600 IN A 192.0.2.2\n" +
|
|
|
+ "www.example.org. 3600 IN A 192.0.2.3\n"
|
|
|
+ ], l)
|
|
|
+
|
|
|
+ # but through the helper, there should be several 1-line entries
|
|
|
+ l = []
|
|
|
+ rrset_as_rrs(rrset, self.rrset_as_rrs_helper, rrset, l)
|
|
|
+ self.assertEqual(["www.example.org. 3600 IN A 192.0.2.1\n",
|
|
|
+ "www.example.org. 3600 IN A 192.0.2.2\n",
|
|
|
+ "www.example.org. 3600 IN A 192.0.2.3\n",
|
|
|
+ ], l)
|
|
|
+
|
|
|
def __prereq_helper(self, method, expected, rrset):
|
|
|
'''Calls the given method with self.__datasrc_client
|
|
|
and the given rrset, and compares the return value.
|
|
@@ -396,6 +440,60 @@ class SessionTest(unittest.TestCase):
|
|
|
else:
|
|
|
self.assertEqual(UPDATE_ERROR, result)
|
|
|
|
|
|
+ # TODO: remove dupe with above one
|
|
|
+ def check_prescan_result(self, expected, updates, expected_soa = None):
|
|
|
+ '''Helper method for checking the result of a prerequisite check;
|
|
|
+ creates an update session, and fills it with the list of rrsets
|
|
|
+ from 'updates'. Then checks if __do_prescan()
|
|
|
+ returns the Rcode specified in 'expected'.'''
|
|
|
+ msg_data, msg = create_update_msg([TEST_ZONE_RECORD],
|
|
|
+ [], updates)
|
|
|
+ zconfig = ZoneConfig([], TEST_RRCLASS, self.__datasrc_client)
|
|
|
+ session = UpdateSession(msg, msg_data, TEST_CLIENT4, zconfig)
|
|
|
+ # compare the to_text output of the rcodes (nicer error messages)
|
|
|
+ # This call itself should also be done by handle(),
|
|
|
+ # but just for better failures, it is first called on its own
|
|
|
+ self.assertEqual(expected.to_text(),
|
|
|
+ session._UpdateSession__do_prescan(self.__datasrc_client,
|
|
|
+ TEST_ZONE_NAME,
|
|
|
+ TEST_RRCLASS).to_text())
|
|
|
+ # If there is an expected soa, check it
|
|
|
+ self.assertEqual(str(expected_soa),
|
|
|
+ str(session._UpdateSession__added_soa))
|
|
|
+
|
|
|
+ # REMOVED, don't mess with actual data during prescan tests
|
|
|
+ # Now see if handle finds the same result
|
|
|
+ #(result, _, _) = session.handle()
|
|
|
+ #self.assertEqual(expected,
|
|
|
+ # session._UpdateSession__message.get_rcode())
|
|
|
+ ## And that the result looks right
|
|
|
+ #if expected == Rcode.NOERROR():
|
|
|
+ # self.assertEqual(UPDATE_SUCCESS, result)
|
|
|
+ #else:
|
|
|
+ # self.assertEqual(UPDATE_ERROR, result)
|
|
|
+
|
|
|
+ # TODO XXX: remove dupe with above
|
|
|
+ def check_full_handle_result(self, expected, updates):
|
|
|
+ '''Helper method for checking the result of a full handle;
|
|
|
+ creates an update session, and fills it with the list of rrsets
|
|
|
+ from 'updates'. Then checks if __handle()
|
|
|
+ results in a response with rcode 'expected'.'''
|
|
|
+ msg_data, msg = create_update_msg([TEST_ZONE_RECORD],
|
|
|
+ [], updates)
|
|
|
+ zconfig = ZoneConfig([], TEST_RRCLASS, self.__datasrc_client)
|
|
|
+ session = UpdateSession(msg, msg_data, TEST_CLIENT4, zconfig)
|
|
|
+
|
|
|
+ # Now see if handle finds the same result
|
|
|
+ (result, _, _) = session.handle()
|
|
|
+ self.assertEqual(expected.to_text(),
|
|
|
+ session._UpdateSession__message.get_rcode().to_text())
|
|
|
+ # And that the result looks right
|
|
|
+ if expected == Rcode.NOERROR():
|
|
|
+ self.assertEqual(UPDATE_SUCCESS, result)
|
|
|
+ else:
|
|
|
+ self.assertEqual(UPDATE_ERROR, result)
|
|
|
+
|
|
|
+
|
|
|
def test_check_prerequisites(self):
|
|
|
# This test checks if the actual prerequisite-type-specific
|
|
|
# methods are called.
|
|
@@ -599,6 +697,398 @@ class SessionTest(unittest.TestCase):
|
|
|
"foo"))
|
|
|
self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])
|
|
|
|
|
|
+
|
|
|
+ def __prereq_helper(self, method, expected, rrset):
|
|
|
+ '''Calls the given method with self.__datasrc_client
|
|
|
+ and the given rrset, and compares the return value.
|
|
|
+ Function does not do much but makes the code look nicer'''
|
|
|
+ self.assertEqual(expected, method(self.__datasrc_client, rrset))
|
|
|
+
|
|
|
+ def initialize_update_rrsets(self):
|
|
|
+ '''Prepare a number of RRsets to be used in several update tests
|
|
|
+ The rrsets are stored in self'''
|
|
|
+ rrset_update_a = isc.dns.RRset(isc.dns.Name("www.example.org"),
|
|
|
+ TEST_RRCLASS,
|
|
|
+ isc.dns.RRType.A(),
|
|
|
+ isc.dns.RRTTL(3600))
|
|
|
+ rrset_update_a.add_rdata(isc.dns.Rdata(rrset_update_a.get_type(),
|
|
|
+ rrset_update_a.get_class(),
|
|
|
+ "192.0.2.2"))
|
|
|
+ rrset_update_a.add_rdata(isc.dns.Rdata(rrset_update_a.get_type(),
|
|
|
+ rrset_update_a.get_class(),
|
|
|
+ "192.0.2.3"))
|
|
|
+ self.rrset_update_a = rrset_update_a
|
|
|
+
|
|
|
+ rrset_update_soa = isc.dns.RRset(isc.dns.Name("example.org"),
|
|
|
+ TEST_RRCLASS,
|
|
|
+ isc.dns.RRType.SOA(),
|
|
|
+ isc.dns.RRTTL(3600))
|
|
|
+ rrset_update_soa.add_rdata(isc.dns.Rdata(rrset_update_soa.get_type(),
|
|
|
+ rrset_update_soa.get_class(),
|
|
|
+ "ns1.example.org. " +
|
|
|
+ "admin.example.org. " +
|
|
|
+ "1233 3600 1800 2419200 7200"))
|
|
|
+ self.rrset_update_soa = rrset_update_soa
|
|
|
+
|
|
|
+ rrset_update_soa_del = isc.dns.RRset(isc.dns.Name("example.org"),
|
|
|
+ isc.dns.RRClass.NONE(),
|
|
|
+ isc.dns.RRType.SOA(),
|
|
|
+ isc.dns.RRTTL(0))
|
|
|
+ rrset_update_soa_del.add_rdata(isc.dns.Rdata(rrset_update_soa_del.get_type(),
|
|
|
+ rrset_update_soa_del.get_class(),
|
|
|
+ "ns1.example.org. " +
|
|
|
+ "admin.example.org. " +
|
|
|
+ "1233 3600 1800 2419200 7200"))
|
|
|
+ self.rrset_update_soa_del = rrset_update_soa_del
|
|
|
+
|
|
|
+
|
|
|
+ rrset_update_soa2 = isc.dns.RRset(isc.dns.Name("example.org"),
|
|
|
+ TEST_RRCLASS,
|
|
|
+ isc.dns.RRType.SOA(),
|
|
|
+ isc.dns.RRTTL(3600))
|
|
|
+ rrset_update_soa2.add_rdata(isc.dns.Rdata(rrset_update_soa.get_type(),
|
|
|
+ rrset_update_soa.get_class(),
|
|
|
+ "ns1.example.org. " +
|
|
|
+ "admin.example.org. " +
|
|
|
+ "4000 3600 1800 2419200 7200"))
|
|
|
+ self.rrset_update_soa2 = rrset_update_soa2
|
|
|
+
|
|
|
+ rrset_update_del_name = isc.dns.RRset(isc.dns.Name("www.example.org"),
|
|
|
+ isc.dns.RRClass.ANY(),
|
|
|
+ isc.dns.RRType.ANY(),
|
|
|
+ isc.dns.RRTTL(0))
|
|
|
+ self.rrset_update_del_name = rrset_update_del_name
|
|
|
+
|
|
|
+ rrset_update_del_name_apex = isc.dns.RRset(isc.dns.Name("example.org"),
|
|
|
+ isc.dns.RRClass.ANY(),
|
|
|
+ isc.dns.RRType.ANY(),
|
|
|
+ isc.dns.RRTTL(0))
|
|
|
+ self.rrset_update_del_name_apex = rrset_update_del_name_apex
|
|
|
+
|
|
|
+ rrset_update_del_rrset = isc.dns.RRset(isc.dns.Name("www.example.org"),
|
|
|
+ isc.dns.RRClass.ANY(),
|
|
|
+ isc.dns.RRType.A(),
|
|
|
+ isc.dns.RRTTL(0))
|
|
|
+ self.rrset_update_del_rrset = rrset_update_del_rrset
|
|
|
+
|
|
|
+ rrset_update_del_rrset_apex = isc.dns.RRset(isc.dns.Name("example.org"),
|
|
|
+ isc.dns.RRClass.ANY(),
|
|
|
+ isc.dns.RRType.A(),
|
|
|
+ isc.dns.RRTTL(0))
|
|
|
+ self.rrset_update_del_rrset_apex = rrset_update_del_rrset_apex
|
|
|
+
|
|
|
+ rrset_update_del_rrset_part = isc.dns.RRset(isc.dns.Name("www.example.org"),
|
|
|
+ isc.dns.RRClass.NONE(),
|
|
|
+ isc.dns.RRType.A(),
|
|
|
+ isc.dns.RRTTL(0))
|
|
|
+ rrset_update_del_rrset_part.add_rdata(isc.dns.Rdata(rrset_update_a.get_type(),
|
|
|
+ rrset_update_del_rrset_part.get_class(),
|
|
|
+ "\# 04 c0 00 02 02"))
|
|
|
+ rrset_update_del_rrset_part.add_rdata(isc.dns.Rdata(rrset_update_a.get_type(),
|
|
|
+ rrset_update_del_rrset_part.get_class(),
|
|
|
+ "\# 04 c0 00 02 03"))
|
|
|
+ self.rrset_update_del_rrset_part = rrset_update_del_rrset_part
|
|
|
+
|
|
|
+ rrset_update_del_rrset_ns = isc.dns.RRset(isc.dns.Name("example.org"),
|
|
|
+ isc.dns.RRClass.NONE(),
|
|
|
+ isc.dns.RRType.NS(),
|
|
|
+ isc.dns.RRTTL(0))
|
|
|
+ rrset_update_del_rrset_ns.add_rdata(isc.dns.Rdata(rrset_update_del_rrset_ns.get_type(),
|
|
|
+ rrset_update_del_rrset_ns.get_class(),
|
|
|
+ b'\x03ns1\x07example\x03org\x00'))
|
|
|
+ rrset_update_del_rrset_ns.add_rdata(isc.dns.Rdata(rrset_update_del_rrset_ns.get_type(),
|
|
|
+ rrset_update_del_rrset_ns.get_class(),
|
|
|
+ b'\x03ns2\x07example\x03org\x00'))
|
|
|
+ rrset_update_del_rrset_ns.add_rdata(isc.dns.Rdata(rrset_update_del_rrset_ns.get_type(),
|
|
|
+ rrset_update_del_rrset_ns.get_class(),
|
|
|
+ b'\x03ns3\x07example\x03org\x00'))
|
|
|
+ self.rrset_update_del_rrset_ns = rrset_update_del_rrset_ns
|
|
|
+
|
|
|
+
|
|
|
+ def test_prescan(self):
|
|
|
+ '''Test whether the prescan succeeds on data that is ok, and whether
|
|
|
+ if notices the SOA if present'''
|
|
|
+ # prepare a set of correct update statements
|
|
|
+ self.initialize_update_rrsets()
|
|
|
+
|
|
|
+ self.check_prescan_result(Rcode.NOERROR(), [ self.rrset_update_a ])
|
|
|
+
|
|
|
+ # check if soa is noticed
|
|
|
+ self.check_prescan_result(Rcode.NOERROR(), [ self.rrset_update_soa ],
|
|
|
+ self.rrset_update_soa)
|
|
|
+
|
|
|
+ # Other types of succesful prechecks
|
|
|
+ self.check_prescan_result(Rcode.NOERROR(), [ self.rrset_update_soa2 ],
|
|
|
+ self.rrset_update_soa2)
|
|
|
+ self.check_prescan_result(Rcode.NOERROR(),
|
|
|
+ [ self.rrset_update_del_name ])
|
|
|
+ self.check_prescan_result(Rcode.NOERROR(),
|
|
|
+ [ self.rrset_update_del_name_apex ])
|
|
|
+ self.check_prescan_result(Rcode.NOERROR(), [ self.rrset_update_del_rrset ])
|
|
|
+ self.check_prescan_result(Rcode.NOERROR(), [ self.rrset_update_del_rrset_apex ])
|
|
|
+ self.check_prescan_result(Rcode.NOERROR(), [ self.rrset_update_del_rrset_part ])
|
|
|
+
|
|
|
+ # and check a few permutations of the above
|
|
|
+ # all of them (with one of the soas)
|
|
|
+ self.check_prescan_result(Rcode.NOERROR(),
|
|
|
+ [
|
|
|
+ self.rrset_update_a,
|
|
|
+ self.rrset_update_soa,
|
|
|
+ self.rrset_update_del_name,
|
|
|
+ self.rrset_update_del_name_apex,
|
|
|
+ self.rrset_update_del_rrset,
|
|
|
+ self.rrset_update_del_rrset_apex,
|
|
|
+ self.rrset_update_del_rrset_part
|
|
|
+ ],
|
|
|
+ self.rrset_update_soa)
|
|
|
+
|
|
|
+ # Two soas. Should we reject or simply use the last?
|
|
|
+ # (RFC is not really explicit on this, but between the lines I read
|
|
|
+ # use the last)
|
|
|
+ # TODO this fails ;)
|
|
|
+ self.check_prescan_result(Rcode.NOERROR(),
|
|
|
+ [ self.rrset_update_soa,
|
|
|
+ self.rrset_update_soa2 ],
|
|
|
+ self.rrset_update_soa2)
|
|
|
+ self.check_prescan_result(Rcode.NOERROR(),
|
|
|
+ [ self.rrset_update_soa2,
|
|
|
+ self.rrset_update_soa ],
|
|
|
+ self.rrset_update_soa)
|
|
|
+
|
|
|
+ self.check_prescan_result(Rcode.NOERROR(),
|
|
|
+ [
|
|
|
+ self.rrset_update_del_rrset_apex,
|
|
|
+ self.rrset_update_del_name,
|
|
|
+ self.rrset_update_del_name_apex,
|
|
|
+ self.rrset_update_del_rrset_part,
|
|
|
+ self.rrset_update_a,
|
|
|
+ self.rrset_update_del_rrset,
|
|
|
+ self.rrset_update_soa
|
|
|
+ ],
|
|
|
+ self.rrset_update_soa)
|
|
|
+
|
|
|
+ def test_prescan_failures(self):
|
|
|
+ '''Test whether prescan fails on bad data'''
|
|
|
+ # out of zone data
|
|
|
+ rrset = isc.dns.RRset(isc.dns.Name("different.zone"),
|
|
|
+ isc.dns.RRClass.ANY(),
|
|
|
+ isc.dns.RRType.TXT(),
|
|
|
+ isc.dns.RRTTL(0));
|
|
|
+ self.check_prescan_result(Rcode.NOTZONE(), [ rrset ])
|
|
|
+
|
|
|
+
|
|
|
+ # forbidden type, zone class
|
|
|
+ rrset = isc.dns.RRset(TEST_ZONE_NAME,
|
|
|
+ TEST_RRCLASS,
|
|
|
+ isc.dns.RRType.ANY(),
|
|
|
+ isc.dns.RRTTL(0));
|
|
|
+ rrset.add_rdata(isc.dns.Rdata(rrset.get_type(), rrset.get_class(),
|
|
|
+ "\# 00"))
|
|
|
+ self.check_prescan_result(Rcode.FORMERR(), [ rrset ])
|
|
|
+
|
|
|
+ # non-zero TTL, class ANY
|
|
|
+ rrset = isc.dns.RRset(TEST_ZONE_NAME,
|
|
|
+ isc.dns.RRClass.ANY(),
|
|
|
+ isc.dns.RRType.TXT(),
|
|
|
+ isc.dns.RRTTL(1));
|
|
|
+ self.check_prescan_result(Rcode.FORMERR(), [ rrset ])
|
|
|
+
|
|
|
+ # non-zero Rdata, class ANY
|
|
|
+ rrset = isc.dns.RRset(TEST_ZONE_NAME,
|
|
|
+ isc.dns.RRClass.ANY(),
|
|
|
+ isc.dns.RRType.TXT(),
|
|
|
+ isc.dns.RRTTL(0));
|
|
|
+ rrset.add_rdata(isc.dns.Rdata(rrset.get_type(), rrset.get_class(),
|
|
|
+ "foo"))
|
|
|
+ self.check_prescan_result(Rcode.FORMERR(), [ rrset ])
|
|
|
+
|
|
|
+ # forbidden type, class ANY
|
|
|
+ rrset = isc.dns.RRset(TEST_ZONE_NAME,
|
|
|
+ isc.dns.RRClass.ANY(),
|
|
|
+ isc.dns.RRType.AXFR(),
|
|
|
+ isc.dns.RRTTL(0));
|
|
|
+ rrset.add_rdata(isc.dns.Rdata(rrset.get_type(), rrset.get_class(),
|
|
|
+ "\# 00"))
|
|
|
+ self.check_prescan_result(Rcode.FORMERR(), [ rrset ])
|
|
|
+
|
|
|
+ # non-zero TTL, class NONE
|
|
|
+ rrset = isc.dns.RRset(TEST_ZONE_NAME,
|
|
|
+ isc.dns.RRClass.NONE(),
|
|
|
+ isc.dns.RRType.TXT(),
|
|
|
+ isc.dns.RRTTL(1));
|
|
|
+ self.check_prescan_result(Rcode.FORMERR(), [ rrset ])
|
|
|
+
|
|
|
+ # forbidden type, class NONE
|
|
|
+ rrset = isc.dns.RRset(TEST_ZONE_NAME,
|
|
|
+ isc.dns.RRClass.NONE(),
|
|
|
+ isc.dns.RRType.AXFR(),
|
|
|
+ isc.dns.RRTTL(0));
|
|
|
+ rrset.add_rdata(isc.dns.Rdata(rrset.get_type(), rrset.get_class(),
|
|
|
+ "\# 00"))
|
|
|
+ self.check_prescan_result(Rcode.FORMERR(), [ rrset ])
|
|
|
+
|
|
|
+ def check_inzone_data(self, expected_result, name, rrtype,
|
|
|
+ expected_rrset = None):
|
|
|
+ '''Does a find on TEST_ZONE for the given rrset's name and type,
|
|
|
+ then checks if the result matches the expected result.
|
|
|
+ If so, and if expected_rrset is given, they are compared as
|
|
|
+ well.'''
|
|
|
+ _, finder = self.__datasrc_client.find_zone(TEST_ZONE_NAME)
|
|
|
+ result, found_rrset, _ = finder.find(name, rrtype,
|
|
|
+ finder.NO_WILDCARD |
|
|
|
+ finder.FIND_GLUE_OK)
|
|
|
+ self.assertEqual(expected_result, result)
|
|
|
+ # Sigh. Need rrsets.compare() again.
|
|
|
+ # To be sure, compare name, class, type, and ttl
|
|
|
+ if expected_rrset is not None:
|
|
|
+ self.assertEqual(expected_rrset.get_name(), found_rrset.get_name())
|
|
|
+ self.assertEqual(expected_rrset.get_class(), found_rrset.get_class())
|
|
|
+ self.assertEqual(expected_rrset.get_type(), found_rrset.get_type())
|
|
|
+ self.assertEqual(expected_rrset.get_ttl().to_text(), found_rrset.get_ttl().to_text())
|
|
|
+ expected_rdata = [ rdata.to_text() for rdata in expected_rrset.get_rdata() ]
|
|
|
+ found_rdata = [ rdata.to_text() for rdata in found_rrset.get_rdata() ]
|
|
|
+ expected_rdata.sort()
|
|
|
+ found_rdata.sort()
|
|
|
+ self.assertEqual(expected_rdata, found_rdata)
|
|
|
+
|
|
|
+ def check_inzone_data_all(self, expected_result, expected_rrset):
|
|
|
+ pass
|
|
|
+
|
|
|
+ def test_update_add_delete_rrset(self):
|
|
|
+ self.initialize_update_rrsets()
|
|
|
+
|
|
|
+ # initially, the www should only contain one rr
|
|
|
+ orig_a_rrset = isc.dns.RRset(isc.dns.Name("www.example.org"),
|
|
|
+ TEST_RRCLASS,
|
|
|
+ isc.dns.RRType.A(),
|
|
|
+ isc.dns.RRTTL(3600))
|
|
|
+ orig_a_rrset.add_rdata(isc.dns.Rdata(orig_a_rrset.get_type(),
|
|
|
+ orig_a_rrset.get_class(),
|
|
|
+ "192.0.2.1"))
|
|
|
+
|
|
|
+ # during this test, we will extend it at some point
|
|
|
+ extended_a_rrset = isc.dns.RRset(isc.dns.Name("www.example.org"),
|
|
|
+ TEST_RRCLASS,
|
|
|
+ isc.dns.RRType.A(),
|
|
|
+ isc.dns.RRTTL(3600))
|
|
|
+ extended_a_rrset.add_rdata(isc.dns.Rdata(extended_a_rrset.get_type(),
|
|
|
+ extended_a_rrset.get_class(),
|
|
|
+ "192.0.2.1"))
|
|
|
+ extended_a_rrset.add_rdata(isc.dns.Rdata(extended_a_rrset.get_type(),
|
|
|
+ extended_a_rrset.get_class(),
|
|
|
+ "192.0.2.2"))
|
|
|
+ extended_a_rrset.add_rdata(isc.dns.Rdata(extended_a_rrset.get_type(),
|
|
|
+ extended_a_rrset.get_class(),
|
|
|
+ "192.0.2.3"))
|
|
|
+
|
|
|
+ # Sanity check, make sure original data is really there before updates
|
|
|
+ self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
|
|
|
+ isc.dns.Name("www.example.org"),
|
|
|
+ isc.dns.RRType.A(),
|
|
|
+ orig_a_rrset)
|
|
|
+
|
|
|
+ # Add two rrs
|
|
|
+ self.check_full_handle_result(Rcode.NOERROR(), [ self.rrset_update_a ])
|
|
|
+
|
|
|
+ self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
|
|
|
+ isc.dns.Name("www.example.org"),
|
|
|
+ isc.dns.RRType.A(),
|
|
|
+ extended_a_rrset)
|
|
|
+
|
|
|
+ # Now delete those two again
|
|
|
+ self.check_full_handle_result(Rcode.NOERROR(), [ self.rrset_update_del_rrset_part ])
|
|
|
+ self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
|
|
|
+ isc.dns.Name("www.example.org"),
|
|
|
+ isc.dns.RRType.A(),
|
|
|
+ orig_a_rrset)
|
|
|
+
|
|
|
+
|
|
|
+ # Check that if we update the SOA, it is updated to our value
|
|
|
+ self.check_full_handle_result(Rcode.NOERROR(), [ self.rrset_update_soa2 ])
|
|
|
+ self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
|
|
|
+ isc.dns.Name("example.org"),
|
|
|
+ isc.dns.RRType.SOA(),
|
|
|
+ self.rrset_update_soa2)
|
|
|
+
|
|
|
+ def test_update_delete_name(self):
|
|
|
+ self.initialize_update_rrsets()
|
|
|
+
|
|
|
+ # And delete the entire name
|
|
|
+ self.check_full_handle_result(Rcode.NOERROR(), [ self.rrset_update_del_name ])
|
|
|
+ self.check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
|
|
|
+ isc.dns.Name("www.example.org"),
|
|
|
+ isc.dns.RRType.A())
|
|
|
+
|
|
|
+ def test_update_apex_special_cases(self):
|
|
|
+ self.initialize_update_rrsets()
|
|
|
+
|
|
|
+ # the original SOA
|
|
|
+ orig_soa_rrset = isc.dns.RRset(isc.dns.Name("example.org"),
|
|
|
+ TEST_RRCLASS,
|
|
|
+ isc.dns.RRType.SOA(),
|
|
|
+ isc.dns.RRTTL(3600))
|
|
|
+ orig_soa_rrset.add_rdata(isc.dns.Rdata(orig_soa_rrset.get_type(),
|
|
|
+ orig_soa_rrset.get_class(),
|
|
|
+ "ns1.example.org. " +
|
|
|
+ "admin.example.org. " +
|
|
|
+ "1234 3600 1800 2419200 7200"))
|
|
|
+
|
|
|
+ # We will delete some of the NS records
|
|
|
+ orig_ns_rrset = isc.dns.RRset(isc.dns.Name("example.org"),
|
|
|
+ TEST_RRCLASS,
|
|
|
+ isc.dns.RRType.NS(),
|
|
|
+ isc.dns.RRTTL(3600))
|
|
|
+ orig_ns_rrset.add_rdata(isc.dns.Rdata(orig_ns_rrset.get_type(),
|
|
|
+ orig_ns_rrset.get_class(),
|
|
|
+ "ns1.example.org."))
|
|
|
+ orig_ns_rrset.add_rdata(isc.dns.Rdata(orig_ns_rrset.get_type(),
|
|
|
+ orig_ns_rrset.get_class(),
|
|
|
+ "ns2.example.org."))
|
|
|
+ orig_ns_rrset.add_rdata(isc.dns.Rdata(orig_ns_rrset.get_type(),
|
|
|
+ orig_ns_rrset.get_class(),
|
|
|
+ "ns3.example.org."))
|
|
|
+
|
|
|
+ short_ns_rrset = isc.dns.RRset(isc.dns.Name("example.org"),
|
|
|
+ TEST_RRCLASS,
|
|
|
+ isc.dns.RRType.NS(),
|
|
|
+ isc.dns.RRTTL(3600))
|
|
|
+ short_ns_rrset.add_rdata(isc.dns.Rdata(short_ns_rrset.get_type(),
|
|
|
+ short_ns_rrset.get_class(),
|
|
|
+ "ns3.example.org."))
|
|
|
+
|
|
|
+ # Sanity check, make sure original data is really there before updates
|
|
|
+ self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
|
|
|
+ isc.dns.Name("example.org"),
|
|
|
+ isc.dns.RRType.NS(),
|
|
|
+ orig_ns_rrset)
|
|
|
+
|
|
|
+ # Check that we cannot delete the SOA record by direction deletion
|
|
|
+ #self.check_full_handle_result(Rcode.NOERROR(), [ self.rrset_update_soa_del ])
|
|
|
+ self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
|
|
|
+ isc.dns.Name("example.org"),
|
|
|
+ isc.dns.RRType.SOA(),
|
|
|
+ orig_soa_rrset)
|
|
|
+
|
|
|
+ # And that soa and NS at apex are not deleted if you delete the apex by name
|
|
|
+ #self.check_full_handle_result(Rcode.NOERROR(), [ self.rrset_update_del_name_apex ])
|
|
|
+ self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
|
|
|
+ isc.dns.Name("example.org"),
|
|
|
+ isc.dns.RRType.SOA(),
|
|
|
+ orig_soa_rrset)
|
|
|
+ self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
|
|
|
+ isc.dns.Name("example.org"),
|
|
|
+ isc.dns.RRType.NS(),
|
|
|
+ orig_ns_rrset)
|
|
|
+
|
|
|
+ # And if we delete the NS at the apex specifically, it should still
|
|
|
+ # keep one record
|
|
|
+ self.check_full_handle_result(Rcode.NOERROR(), [ self.rrset_update_del_rrset_ns ])
|
|
|
+ self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
|
|
|
+ isc.dns.Name("example.org"),
|
|
|
+ isc.dns.RRType.NS(),
|
|
|
+ short_ns_rrset)
|
|
|
+
|
|
|
+
|
|
|
if __name__ == "__main__":
|
|
|
isc.log.init("bind10")
|
|
|
isc.log.resetUnitTestRootLogger()
|