|
@@ -218,7 +218,7 @@ class TestXfroutSessionBase(unittest.TestCase):
|
|
|
return msg.get_tsig_record() is not None
|
|
|
|
|
|
def create_request_data(self, with_question=True, with_tsig=False,
|
|
|
- ixfr=None):
|
|
|
+ zone_name=TEST_ZONE_NAME, ixfr=None):
|
|
|
'''Create a commonly used XFR request data.
|
|
|
|
|
|
By default the request type is AXFR; if 'ixfr' is an integer,
|
|
@@ -234,11 +234,9 @@ class TestXfroutSessionBase(unittest.TestCase):
|
|
|
msg.set_rcode(Rcode.NOERROR())
|
|
|
req_type = RRType.AXFR() if ixfr is None else RRType.IXFR()
|
|
|
if with_question:
|
|
|
- msg.add_question(Question(Name("example.com"), RRClass.IN(),
|
|
|
- req_type))
|
|
|
+ msg.add_question(Question(zone_name, RRClass.IN(), req_type))
|
|
|
if req_type == RRType.IXFR():
|
|
|
- soa = RRset(Name('example.com'), RRClass.IN(), RRType.SOA(),
|
|
|
- RRTTL(0))
|
|
|
+ soa = RRset(zone_name, RRClass.IN(), RRType.SOA(), RRTTL(0))
|
|
|
# In the RDATA only the serial matters.
|
|
|
soa.add_rdata(Rdata(RRType.SOA(), RRClass.IN(),
|
|
|
'm r ' + str(ixfr) + ' 1 1 1 1'))
|
|
@@ -713,21 +711,41 @@ class TestXfroutSession(TestXfroutSessionBase):
|
|
|
self.assertEqual(None, self.xfrsess._jnl_reader)
|
|
|
|
|
|
# The data source doesn't support journaling. Should fallback to AXFR.
|
|
|
+ zone_name = Name('nojournal.example.com')
|
|
|
+ self.mdata = self.create_request_data(ixfr=IXFR_OK_VERSION,
|
|
|
+ zone_name=zone_name)
|
|
|
self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
- self.getmsg(), Name('nojournal.example.com')), Rcode.NOERROR())
|
|
|
+ self.getmsg(), zone_name), Rcode.NOERROR())
|
|
|
self.assertNotEqual(None, self.xfrsess._iterator)
|
|
|
|
|
|
# Failure cases
|
|
|
+ zone_name = Name('notauth.example.com')
|
|
|
+ self.mdata = self.create_request_data(ixfr=IXFR_OK_VERSION,
|
|
|
+ zone_name=zone_name)
|
|
|
self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
- self.getmsg(), Name('notauth.example.com')), Rcode.NOTAUTH())
|
|
|
+ self.getmsg(), zone_name), Rcode.NOTAUTH())
|
|
|
# this is a strange case: zone's SOA will be found but the journal
|
|
|
# reader won't be created due to 'no such zone'.
|
|
|
+ zone_name = Name('notauth2.example.com')
|
|
|
+ self.mdata = self.create_request_data(ixfr=IXFR_OK_VERSION,
|
|
|
+ zone_name=zone_name)
|
|
|
self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
- self.getmsg(), Name('notauth2.example.com')), Rcode.NOTAUTH())
|
|
|
+ self.getmsg(), zone_name), Rcode.NOTAUTH())
|
|
|
+ zone_name = Name('nosoa.example.com')
|
|
|
+ self.mdata = self.create_request_data(ixfr=IXFR_OK_VERSION,
|
|
|
+ zone_name=zone_name)
|
|
|
self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
- self.getmsg(), Name('nosoa.example.com')), Rcode.SERVFAIL())
|
|
|
+ self.getmsg(), zone_name), Rcode.SERVFAIL())
|
|
|
+ zone_name = Name('multisoa.example.com')
|
|
|
+ self.mdata = self.create_request_data(ixfr=IXFR_OK_VERSION,
|
|
|
+ zone_name=zone_name)
|
|
|
self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
- self.getmsg(), Name('multisoa.example.com')), Rcode.SERVFAIL())
|
|
|
+ self.getmsg(), zone_name), Rcode.SERVFAIL())
|
|
|
+
|
|
|
+ # query name doesn't match the SOA's owner
|
|
|
+ self.mdata = self.create_request_data(ixfr=IXFR_OK_VERSION)
|
|
|
+ self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
+ self.getmsg(), zone_name), Rcode.FORMERR())
|
|
|
|
|
|
def test_dns_xfrout_start_formerror(self):
|
|
|
# formerror
|