|
@@ -30,6 +30,20 @@ import isc.acl.dns
|
|
|
TESTDATA_SRCDIR = os.getenv("TESTDATASRCDIR")
|
|
|
TSIG_KEY = TSIGKey("example.com:SFuWd/q99SzF8Yzd1QbB9g==")
|
|
|
|
|
|
+#
|
|
|
+# Commonly used (mostly constant) test parameters
|
|
|
+#
|
|
|
+TEST_ZONE_NAME_STR = "example.com."
|
|
|
+TEST_ZONE_NAME = Name(TEST_ZONE_NAME_STR)
|
|
|
+TEST_RRCLASS = RRClass.IN()
|
|
|
+
|
|
|
+# SOA intended to be used for the new SOA as a result of transfer.
|
|
|
+soa_rdata = Rdata(RRType.SOA(), TEST_RRCLASS,
|
|
|
+ 'master.example.com. admin.example.com ' +
|
|
|
+ '1234 3600 1800 2419200 7200')
|
|
|
+soa_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA(), RRTTL(3600))
|
|
|
+soa_rrset.add_rdata(soa_rdata)
|
|
|
+
|
|
|
# our fake socket, where we can read and insert messages
|
|
|
class MySocket():
|
|
|
def __init__(self, family, type):
|
|
@@ -69,6 +83,30 @@ class MockDataSrcClient:
|
|
|
def __init__(self, type, config):
|
|
|
pass
|
|
|
|
|
|
+ def find_zone(self, zone_name):
|
|
|
+ '''Mock version of find_zone().
|
|
|
+
|
|
|
+ It returns itself (subsequently acting as a mock ZoneFinder) for
|
|
|
+ some test zone names. For some others it returns either NOTFOUND
|
|
|
+ or PARTIALMATCH.
|
|
|
+
|
|
|
+ '''
|
|
|
+ if zone_name == TEST_ZONE_NAME:
|
|
|
+ return (isc.datasrc.DataSourceClient.SUCCESS, self)
|
|
|
+ raise ValueError('Unexpected input to mock client: bug in test case?')
|
|
|
+
|
|
|
+ def find(self, name, rrtype, target, options):
|
|
|
+ '''Mock ZoneFinder.find().
|
|
|
+
|
|
|
+ It returns the predefined SOA RRset to queries for SOA of the common
|
|
|
+ test zone name. It also emulates some unusual cases for special
|
|
|
+ zone names.
|
|
|
+
|
|
|
+ '''
|
|
|
+ if name == TEST_ZONE_NAME and rrtype == RRType.SOA():
|
|
|
+ return (ZoneFinder.SUCCESS, soa_rrset)
|
|
|
+ raise ValueError('Unexpected input to mock finder: bug in test case?')
|
|
|
+
|
|
|
def get_iterator(self, zone_name, adjust_ttl=False):
|
|
|
if zone_name == Name('notauth.example.com'):
|
|
|
raise isc.datasrc.Error('no such zone')
|
|
@@ -91,6 +129,9 @@ class MockDataSrcClient:
|
|
|
'3600 1800 2419200 7200'))
|
|
|
return soa_rrset
|
|
|
|
|
|
+ def get_journal_reader(self, zone_name, begin_serial, end_serial):
|
|
|
+ return isc.datasrc.ZoneJournalReader.SUCCESS, self
|
|
|
+
|
|
|
class MyCCSession(isc.config.ConfigData):
|
|
|
def __init__(self):
|
|
|
module_spec = isc.config.module_spec_from_file(
|
|
@@ -195,6 +236,13 @@ class TestXfroutSessionBase(unittest.TestCase):
|
|
|
request_data = renderer.get_data()
|
|
|
return request_data
|
|
|
|
|
|
+ def set_request_type(self, type):
|
|
|
+ self.xfrsess._request_type = type
|
|
|
+ if type == RRType.AXFR():
|
|
|
+ self.xfrsess._request_typestr = 'AXFR'
|
|
|
+ else:
|
|
|
+ self.xfrsess._request_typestr = 'IXFR'
|
|
|
+
|
|
|
def setUp(self):
|
|
|
self.sock = MySocket(socket.AF_INET,socket.SOCK_STREAM)
|
|
|
self.xfrsess = MyXfroutSession(self.sock, None, Dbserver(),
|
|
@@ -205,6 +253,7 @@ class TestXfroutSessionBase(unittest.TestCase):
|
|
|
isc.acl.dns.REQUEST_LOADER.load(
|
|
|
[{"action": "ACCEPT"}]),
|
|
|
{})
|
|
|
+ self.set_request_type(RRType.AXFR()) # test AXFR by default
|
|
|
self.mdata = self.create_request_data()
|
|
|
self.soa_rrset = RRset(Name('example.com'), RRClass.IN(), RRType.SOA(),
|
|
|
RRTTL(3600))
|
|
@@ -612,16 +661,24 @@ class TestXfroutSession(TestXfroutSessionBase):
|
|
|
def test_get_rrset_len(self):
|
|
|
self.assertEqual(82, get_rrset_len(self.soa_rrset))
|
|
|
|
|
|
- def test_check_xfrout_available(self):
|
|
|
+ def test_check_xfrout_axfr_available(self):
|
|
|
self.xfrsess.ClientClass = MockDataSrcClient
|
|
|
self.assertEqual(self.xfrsess._check_xfrout_available(
|
|
|
- Name('example.com')), Rcode.NOERROR())
|
|
|
+ self.getmsg(), Name('example.com')), Rcode.NOERROR())
|
|
|
+ self.assertEqual(self.xfrsess._check_xfrout_available(
|
|
|
+ self.getmsg(), Name('notauth.example.com')), Rcode.NOTAUTH())
|
|
|
self.assertEqual(self.xfrsess._check_xfrout_available(
|
|
|
- Name('notauth.example.com')), Rcode.NOTAUTH())
|
|
|
+ self.getmsg(), Name('nosoa.example.com')), Rcode.SERVFAIL())
|
|
|
self.assertEqual(self.xfrsess._check_xfrout_available(
|
|
|
- Name('nosoa.example.com')), Rcode.SERVFAIL())
|
|
|
+ self.getmsg(), Name('multisoa.example.com')), Rcode.SERVFAIL())
|
|
|
+
|
|
|
+ def test_check_xfrout_ixfr_available(self):
|
|
|
+ self.xfrsess.ClientClass = MockDataSrcClient
|
|
|
+ self.set_request_type(RRType.IXFR())
|
|
|
+ self.mdata = self.create_request_data(ixfr=2011111802)
|
|
|
+ request_msg = self.getmsg()
|
|
|
self.assertEqual(self.xfrsess._check_xfrout_available(
|
|
|
- Name('multisoa.example.com')), Rcode.SERVFAIL())
|
|
|
+ self.getmsg(), Name('example.com')), Rcode.NOERROR())
|
|
|
|
|
|
def test_dns_xfrout_start_formerror(self):
|
|
|
# formerror
|
|
@@ -633,7 +690,7 @@ class TestXfroutSession(TestXfroutSessionBase):
|
|
|
return "example.com"
|
|
|
|
|
|
def test_dns_xfrout_start_notauth(self):
|
|
|
- def notauth(formpara):
|
|
|
+ def notauth(msg, name):
|
|
|
return Rcode.NOTAUTH()
|
|
|
self.xfrsess._check_xfrout_available = notauth
|
|
|
self.xfrsess.dns_xfrout_start(self.sock, self.mdata)
|
|
@@ -648,7 +705,7 @@ class TestXfroutSession(TestXfroutSessionBase):
|
|
|
self.assertEqual(self.sock.read_msg().get_rcode(), Rcode.SERVFAIL())
|
|
|
|
|
|
def test_dns_xfrout_start_noerror(self):
|
|
|
- def noerror(form):
|
|
|
+ def noerror(msg, name):
|
|
|
return Rcode.NOERROR()
|
|
|
self.xfrsess._check_xfrout_available = noerror
|
|
|
|