|
@@ -43,11 +43,20 @@ TEST_MASTER_PORT = '53535'
|
|
|
|
|
|
TSIG_KEY = TSIGKey("example.com:SFuWd/q99SzF8Yzd1QbB9g==")
|
|
|
|
|
|
+# SOA intended to be used for the new SOA as a result of transfer.
|
|
|
soa_rdata = Rdata(RRType.SOA(), TEST_RRCLASS,
|
|
|
'master.example.com. admin.example.com ' +
|
|
|
'1234 3600 1800 2419200 7200')
|
|
|
soa_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA(), RRTTL(3600))
|
|
|
soa_rrset.add_rdata(soa_rdata)
|
|
|
+
|
|
|
+# SOA intended to be used for the current SOA at the secondary side.
|
|
|
+# Note that its serial is smaller than that of soa_rdata.
|
|
|
+begin_soa_rdata = Rdata(RRType.SOA(), TEST_RRCLASS,
|
|
|
+ 'master.example.com. admin.example.com ' +
|
|
|
+ '1230 3600 1800 2419200 7200')
|
|
|
+begin_soa_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA(), RRTTL(3600))
|
|
|
+begin_soa_rrset.add_rdata(begin_soa_rdata)
|
|
|
example_axfr_question = Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.AXFR())
|
|
|
example_soa_question = Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA())
|
|
|
default_questions = [example_axfr_question]
|
|
@@ -107,6 +116,48 @@ class MockXfrinConnection(XfrinConnection):
|
|
|
self.qid = None
|
|
|
self.response_generator = None
|
|
|
|
|
|
+ # The following three implement a simplified mock of DataSourceClient
|
|
|
+ # and ZoneFinder classes for testing purposes.
|
|
|
+ def _get_datasrc_client(self, rrclass):
|
|
|
+ return self
|
|
|
+
|
|
|
+ def find_zone(self, zone_name):
|
|
|
+ '''Mock DataSourceClient.find_zone().
|
|
|
+
|
|
|
+ It returns itself (subsequently acting as a mock ZoneFinder) for
|
|
|
+ some test zone names. For some others it returns either NOTFOUND
|
|
|
+ or PARTIALMATCH.
|
|
|
+
|
|
|
+ '''
|
|
|
+ if zone_name == TEST_ZONE_NAME or \
|
|
|
+ zone_name == Name('no-soa.example') or \
|
|
|
+ zone_name == Name('dup-soa.example'):
|
|
|
+ return (isc.datasrc.DataSourceClient.SUCCESS, self)
|
|
|
+ elif zone_name == Name('no-such-zone.example'):
|
|
|
+ return (DataSourceClient.NOTFOUND, None)
|
|
|
+ elif zone_name == Name('partial-match-zone.example'):
|
|
|
+ return (DataSourceClient.PARTIALMATCH, self)
|
|
|
+ raise ValueError('Unexpected input to mock client: bug in test case?')
|
|
|
+
|
|
|
+ def find(self, name, rrtype, target, options):
|
|
|
+ '''Mock ZoneFinder.find().
|
|
|
+
|
|
|
+ It returns the predefined SOA RRset to queries for SOA of the common
|
|
|
+ test zone name. It also emulates some unusual cases for special
|
|
|
+ zone names.
|
|
|
+
|
|
|
+ '''
|
|
|
+ if name == TEST_ZONE_NAME and rrtype == RRType.SOA():
|
|
|
+ return (ZoneFinder.SUCCESS, begin_soa_rrset)
|
|
|
+ if name == Name('no-soa.example'):
|
|
|
+ return (ZoneFinder.NXDOMAIN, None)
|
|
|
+ if name == Name('dup-soa.example'):
|
|
|
+ dup_soa_rrset = RRset(name, TEST_RRCLASS, RRType.SOA(), RRTTL(0))
|
|
|
+ dup_soa_rrset.add_rdata(begin_soa_rdata)
|
|
|
+ dup_soa_rrset.add_rdata(soa_rdata)
|
|
|
+ return (ZoneFinder.SUCCESS, dup_soa_rrset)
|
|
|
+ raise ValueError('Unexpected input to mock finder: bug in test case?')
|
|
|
+
|
|
|
def _asyncore_loop(self):
|
|
|
if self.force_close:
|
|
|
self.handle_close()
|
|
@@ -450,7 +501,7 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
c.close()
|
|
|
|
|
|
def test_create_query(self):
|
|
|
- def check_query(expected_qtype, expected_authority):
|
|
|
+ def check_query(expected_qtype, expected_auth):
|
|
|
'''Helper method to repeat the same pattern of tests'''
|
|
|
self.assertEqual(Opcode.QUERY(), msg.get_opcode())
|
|
|
self.assertEqual(Rcode.NOERROR(), msg.get_rcode())
|
|
@@ -459,15 +510,24 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
self.assertEqual(expected_qtype, msg.get_question()[0].get_type())
|
|
|
self.assertEqual(0, msg.get_rr_count(Message.SECTION_ANSWER))
|
|
|
self.assertEqual(0, msg.get_rr_count(Message.SECTION_ADDITIONAL))
|
|
|
- if expected_authority is None:
|
|
|
+ if expected_auth is None:
|
|
|
self.assertEqual(0,
|
|
|
msg.get_rr_count(Message.SECTION_AUTHORITY))
|
|
|
else:
|
|
|
self.assertEqual(1,
|
|
|
msg.get_rr_count(Message.SECTION_AUTHORITY))
|
|
|
+ auth_rr = msg.get_section(Message.SECTION_AUTHORITY)[0]
|
|
|
+ self.assertEqual(expected_auth.get_name(), auth_rr.get_name())
|
|
|
+ self.assertEqual(expected_auth.get_type(), auth_rr.get_type())
|
|
|
+ self.assertEqual(expected_auth.get_class(),
|
|
|
+ auth_rr.get_class())
|
|
|
+ # In our test scenario RDATA must be 1
|
|
|
+ self.assertEqual(1, expected_auth.get_rdata_count())
|
|
|
+ self.assertEqual(1, auth_rr.get_rdata_count())
|
|
|
+ self.assertEqual(expected_auth.get_rdata()[0],
|
|
|
+ auth_rr.get_rdata()[0])
|
|
|
|
|
|
# Actual tests start here
|
|
|
-
|
|
|
# SOA query
|
|
|
msg = self.conn._create_query(RRType.SOA())
|
|
|
check_query(RRType.SOA(), None)
|
|
@@ -476,6 +536,31 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
msg = self.conn._create_query(RRType.AXFR())
|
|
|
check_query(RRType.AXFR(), None)
|
|
|
|
|
|
+ # IXFR query
|
|
|
+ msg = self.conn._create_query(RRType.IXFR())
|
|
|
+ check_query(RRType.IXFR(), begin_soa_rrset)
|
|
|
+ self.assertEqual(1230, self.conn._request_serial)
|
|
|
+
|
|
|
+ def test_create_ixfr_query_fail(self):
|
|
|
+ # In these cases _create_query() will fail to find a valid SOA RR to
|
|
|
+ # insert in the IXFR query, and should raise an exception.
|
|
|
+
|
|
|
+ self.conn._zone_name = Name('no-such-zone.example')
|
|
|
+ self.assertRaises(XfrinException, self.conn._create_query,
|
|
|
+ RRType.IXFR())
|
|
|
+
|
|
|
+ self.conn._zone_name = Name('partial-match-zone.example')
|
|
|
+ self.assertRaises(XfrinException, self.conn._create_query,
|
|
|
+ RRType.IXFR())
|
|
|
+
|
|
|
+ self.conn._zone_name = Name('no-soa.example')
|
|
|
+ self.assertRaises(XfrinException, self.conn._create_query,
|
|
|
+ RRType.IXFR())
|
|
|
+
|
|
|
+ self.conn._zone_name = Name('dup-soa.example')
|
|
|
+ self.assertRaises(XfrinException, self.conn._create_query,
|
|
|
+ RRType.IXFR())
|
|
|
+
|
|
|
def test_send_query(self):
|
|
|
def message_has_tsig(data):
|
|
|
# a simple check if the actual data contains a TSIG RR.
|