|
@@ -218,8 +218,8 @@ class TestXfroutSessionBase(unittest.TestCase):
|
|
|
return msg.get_tsig_record() is not None
|
|
|
|
|
|
def create_request_data(self, with_question=True, with_tsig=False,
|
|
|
- zone_name=TEST_ZONE_NAME, ixfr=None,
|
|
|
- soa_class=TEST_RRCLASS):
|
|
|
+ ixfr=None, zone_name=TEST_ZONE_NAME,
|
|
|
+ soa_class=TEST_RRCLASS, num_soa=1):
|
|
|
'''Create a commonly used XFR request data.
|
|
|
|
|
|
By default the request type is AXFR; if 'ixfr' is an integer,
|
|
@@ -227,6 +227,14 @@ class TestXfroutSessionBase(unittest.TestCase):
|
|
|
the value of the parameter will be included in the authority
|
|
|
section.
|
|
|
|
|
|
+ This method has various minor parameters only for creating bad
|
|
|
+ format requests for testing purposes:
|
|
|
+ zone_name: the query (zone) name. for IXFR, it's also used as
|
|
|
+ the owner name of the SOA in the authority section.
|
|
|
+ soa_class: IXFR only. The RR class of the SOA RR in the authority
|
|
|
+ section.
|
|
|
+ num_soa: IXFR only. The number of SOA RDATAs in the authority
|
|
|
+ section.
|
|
|
'''
|
|
|
msg = Message(Message.RENDER)
|
|
|
query_id = 0x1035
|
|
@@ -239,8 +247,9 @@ class TestXfroutSessionBase(unittest.TestCase):
|
|
|
if req_type == RRType.IXFR():
|
|
|
soa = RRset(zone_name, soa_class, RRType.SOA(), RRTTL(0))
|
|
|
# In the RDATA only the serial matters.
|
|
|
- soa.add_rdata(Rdata(RRType.SOA(), soa_class,
|
|
|
- 'm r ' + str(ixfr) + ' 1 1 1 1'))
|
|
|
+ for i in range(0, num_soa):
|
|
|
+ soa.add_rdata(Rdata(RRType.SOA(), soa_class,
|
|
|
+ 'm r ' + str(ixfr) + ' 1 1 1 1'))
|
|
|
msg.add_rrset(Message.SECTION_AUTHORITY, soa)
|
|
|
|
|
|
renderer = MessageRenderer()
|
|
@@ -758,6 +767,12 @@ class TestXfroutSession(TestXfroutSessionBase):
|
|
|
self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
self.getmsg(), zone_name, TEST_RRCLASS), Rcode.FORMERR())
|
|
|
|
|
|
+ # multiple SOA RRs
|
|
|
+ self.mdata = self.create_request_data(ixfr=IXFR_OK_VERSION,
|
|
|
+ num_soa=2)
|
|
|
+ self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
+ self.getmsg(), zone_name, TEST_RRCLASS), Rcode.FORMERR())
|
|
|
+
|
|
|
def test_dns_xfrout_start_formerror(self):
|
|
|
# formerror
|
|
|
self.xfrsess.dns_xfrout_start(self.sock, b"\xd6=\x00\x00\x00\x01\x00")
|