|
@@ -218,7 +218,8 @@ class TestXfroutSessionBase(unittest.TestCase):
|
|
|
return msg.get_tsig_record() is not None
|
|
|
|
|
|
def create_request_data(self, with_question=True, with_tsig=False,
|
|
|
- zone_name=TEST_ZONE_NAME, ixfr=None):
|
|
|
+ zone_name=TEST_ZONE_NAME, ixfr=None,
|
|
|
+ soa_class=TEST_RRCLASS):
|
|
|
'''Create a commonly used XFR request data.
|
|
|
|
|
|
By default the request type is AXFR; if 'ixfr' is an integer,
|
|
@@ -236,9 +237,9 @@ class TestXfroutSessionBase(unittest.TestCase):
|
|
|
if with_question:
|
|
|
msg.add_question(Question(zone_name, RRClass.IN(), req_type))
|
|
|
if req_type == RRType.IXFR():
|
|
|
- soa = RRset(zone_name, RRClass.IN(), RRType.SOA(), RRTTL(0))
|
|
|
+ soa = RRset(zone_name, soa_class, RRType.SOA(), RRTTL(0))
|
|
|
# In the RDATA only the serial matters.
|
|
|
- soa.add_rdata(Rdata(RRType.SOA(), RRClass.IN(),
|
|
|
+ soa.add_rdata(Rdata(RRType.SOA(), soa_class,
|
|
|
'm r ' + str(ixfr) + ' 1 1 1 1'))
|
|
|
msg.add_rrset(Message.SECTION_AUTHORITY, soa)
|
|
|
|
|
@@ -680,16 +681,19 @@ class TestXfroutSession(TestXfroutSessionBase):
|
|
|
self.xfrsess.ClientClass = MockDataSrcClient
|
|
|
# Successful case. A zone iterator should be set up.
|
|
|
self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
- self.getmsg(), Name('example.com')), Rcode.NOERROR())
|
|
|
+ self.getmsg(), TEST_ZONE_NAME, TEST_RRCLASS), Rcode.NOERROR())
|
|
|
self.assertNotEqual(None, self.xfrsess._iterator)
|
|
|
|
|
|
# Failure cases
|
|
|
self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
- self.getmsg(), Name('notauth.example.com')), Rcode.NOTAUTH())
|
|
|
+ self.getmsg(), Name('notauth.example.com'), TEST_RRCLASS),
|
|
|
+ Rcode.NOTAUTH())
|
|
|
self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
- self.getmsg(), Name('nosoa.example.com')), Rcode.SERVFAIL())
|
|
|
+ self.getmsg(), Name('nosoa.example.com'), TEST_RRCLASS),
|
|
|
+ Rcode.SERVFAIL())
|
|
|
self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
- self.getmsg(), Name('multisoa.example.com')), Rcode.SERVFAIL())
|
|
|
+ self.getmsg(), Name('multisoa.example.com'), TEST_RRCLASS),
|
|
|
+ Rcode.SERVFAIL())
|
|
|
|
|
|
def test_xfrout_ixfr_setup(self):
|
|
|
self.xfrsess.ClientClass = MockDataSrcClient
|
|
@@ -699,14 +703,14 @@ class TestXfroutSession(TestXfroutSessionBase):
|
|
|
# up.
|
|
|
self.mdata = self.create_request_data(ixfr=IXFR_OK_VERSION)
|
|
|
self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
- self.getmsg(), TEST_ZONE_NAME), Rcode.NOERROR())
|
|
|
+ self.getmsg(), TEST_ZONE_NAME, TEST_RRCLASS), Rcode.NOERROR())
|
|
|
self.assertNotEqual(None, self.xfrsess._jnl_reader)
|
|
|
|
|
|
# Successful case, but as a result of falling back to AXFR-style
|
|
|
# IXFR. A zone iterator should be set up instead of a journal reader.
|
|
|
self.mdata = self.create_request_data(ixfr=IXFR_NG_VERSION)
|
|
|
self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
- self.getmsg(), TEST_ZONE_NAME), Rcode.NOERROR())
|
|
|
+ self.getmsg(), TEST_ZONE_NAME, TEST_RRCLASS), Rcode.NOERROR())
|
|
|
self.assertNotEqual(None, self.xfrsess._iterator)
|
|
|
self.assertEqual(None, self.xfrsess._jnl_reader)
|
|
|
|
|
@@ -715,7 +719,7 @@ class TestXfroutSession(TestXfroutSessionBase):
|
|
|
self.mdata = self.create_request_data(ixfr=IXFR_OK_VERSION,
|
|
|
zone_name=zone_name)
|
|
|
self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
- self.getmsg(), zone_name), Rcode.NOERROR())
|
|
|
+ self.getmsg(), zone_name, TEST_RRCLASS), Rcode.NOERROR())
|
|
|
self.assertNotEqual(None, self.xfrsess._iterator)
|
|
|
|
|
|
# Failure cases
|
|
@@ -723,29 +727,36 @@ class TestXfroutSession(TestXfroutSessionBase):
|
|
|
self.mdata = self.create_request_data(ixfr=IXFR_OK_VERSION,
|
|
|
zone_name=zone_name)
|
|
|
self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
- self.getmsg(), zone_name), Rcode.NOTAUTH())
|
|
|
+ self.getmsg(), zone_name, TEST_RRCLASS), Rcode.NOTAUTH())
|
|
|
# this is a strange case: zone's SOA will be found but the journal
|
|
|
# reader won't be created due to 'no such zone'.
|
|
|
zone_name = Name('notauth2.example.com')
|
|
|
self.mdata = self.create_request_data(ixfr=IXFR_OK_VERSION,
|
|
|
zone_name=zone_name)
|
|
|
self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
- self.getmsg(), zone_name), Rcode.NOTAUTH())
|
|
|
+ self.getmsg(), zone_name, TEST_RRCLASS), Rcode.NOTAUTH())
|
|
|
zone_name = Name('nosoa.example.com')
|
|
|
self.mdata = self.create_request_data(ixfr=IXFR_OK_VERSION,
|
|
|
zone_name=zone_name)
|
|
|
self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
- self.getmsg(), zone_name), Rcode.SERVFAIL())
|
|
|
+ self.getmsg(), zone_name, TEST_RRCLASS), Rcode.SERVFAIL())
|
|
|
zone_name = Name('multisoa.example.com')
|
|
|
self.mdata = self.create_request_data(ixfr=IXFR_OK_VERSION,
|
|
|
zone_name=zone_name)
|
|
|
self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
- self.getmsg(), zone_name), Rcode.SERVFAIL())
|
|
|
+ self.getmsg(), zone_name, TEST_RRCLASS), Rcode.SERVFAIL())
|
|
|
|
|
|
# query name doesn't match the SOA's owner
|
|
|
self.mdata = self.create_request_data(ixfr=IXFR_OK_VERSION)
|
|
|
self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
- self.getmsg(), zone_name), Rcode.FORMERR())
|
|
|
+ self.getmsg(), zone_name, TEST_RRCLASS), Rcode.FORMERR())
|
|
|
+
|
|
|
+ # query's RR class doesn't match the SOA's class
|
|
|
+ zone_name = TEST_ZONE_NAME # make sure the name matches this time
|
|
|
+ self.mdata = self.create_request_data(ixfr=IXFR_OK_VERSION,
|
|
|
+ soa_class=RRClass.CH())
|
|
|
+ self.assertEqual(self.xfrsess._xfrout_setup(
|
|
|
+ self.getmsg(), zone_name, TEST_RRCLASS), Rcode.FORMERR())
|
|
|
|
|
|
def test_dns_xfrout_start_formerror(self):
|
|
|
# formerror
|
|
@@ -757,7 +768,7 @@ class TestXfroutSession(TestXfroutSessionBase):
|
|
|
return "example.com"
|
|
|
|
|
|
def test_dns_xfrout_start_notauth(self):
|
|
|
- def notauth(msg, name):
|
|
|
+ def notauth(msg, name, rrclass):
|
|
|
return Rcode.NOTAUTH()
|
|
|
self.xfrsess._xfrout_setup = notauth
|
|
|
self.xfrsess.dns_xfrout_start(self.sock, self.mdata)
|
|
@@ -772,7 +783,7 @@ class TestXfroutSession(TestXfroutSessionBase):
|
|
|
self.assertEqual(self.sock.read_msg().get_rcode(), Rcode.SERVFAIL())
|
|
|
|
|
|
def test_dns_xfrout_start_noerror(self):
|
|
|
- def noerror(msg, name):
|
|
|
+ def noerror(msg, name, rrclass):
|
|
|
return Rcode.NOERROR()
|
|
|
self.xfrsess._xfrout_setup = noerror
|
|
|
|