|
@@ -72,6 +72,26 @@ class MockCC():
|
|
|
if identifier == "zones/class":
|
|
|
return TEST_RRCLASS_STR
|
|
|
|
|
|
+class MockDataSourceClient():
|
|
|
+ '''A simple mock data source client.
|
|
|
+
|
|
|
+ This class provides a minimal set of wrappers related the data source
|
|
|
+ API that would be used by Diff objects. For our testing purposes they
|
|
|
+ can be alsmot no-op.
|
|
|
+
|
|
|
+ '''
|
|
|
+ def get_updater(self, zone_name, replace):
|
|
|
+ return self
|
|
|
+
|
|
|
+ def add_rrset(self, rrset):
|
|
|
+ pass
|
|
|
+
|
|
|
+ def remove_rrset(self, rrset):
|
|
|
+ pass
|
|
|
+
|
|
|
+ def commit(self):
|
|
|
+ pass
|
|
|
+
|
|
|
class MockXfrin(Xfrin):
|
|
|
# This is a class attribute of a callable object that specifies a non
|
|
|
# default behavior triggered in _cc_check_command(). Specific test methods
|
|
@@ -239,21 +259,7 @@ class TestXfrinState(unittest.TestCase):
|
|
|
self.ns_rrset.add_rdata(Rdata(RRType.NS(), TEST_RRCLASS,
|
|
|
'ns.example.com'))
|
|
|
self.__data_operations = []
|
|
|
- self.conn._diff = Diff(self, TEST_ZONE_NAME)
|
|
|
-
|
|
|
- # The following methods are to emulate a data source updater used by
|
|
|
- # the Diff object. For our testing purposes they can simply be no-op.
|
|
|
- def get_updater(self, zone_name, replace):
|
|
|
- return self
|
|
|
-
|
|
|
- def add_rrset(self, rrset):
|
|
|
- pass
|
|
|
-
|
|
|
- def remove_rrset(self, rrset):
|
|
|
- pass
|
|
|
-
|
|
|
- def commit(self):
|
|
|
- pass
|
|
|
+ self.conn._diff = Diff(MockDataSourceClient(), TEST_ZONE_NAME)
|
|
|
|
|
|
class TestXfrinInitialSOA(TestXfrinState):
|
|
|
def setUp(self):
|
|
@@ -425,6 +431,13 @@ class TestXfrinAXFR(TestXfrinState):
|
|
|
soa_rrset)
|
|
|
|
|
|
class TestXfrinConnection(unittest.TestCase):
|
|
|
+ '''Convenient parent class for XFR-protocol tests.
|
|
|
+
|
|
|
+ This class provides common setups and helper methods for protocol related
|
|
|
+ tests on AXFR and IXFR.
|
|
|
+
|
|
|
+ '''
|
|
|
+
|
|
|
def setUp(self):
|
|
|
if os.path.exists(TEST_DB_FILE):
|
|
|
os.remove(TEST_DB_FILE)
|
|
@@ -451,6 +464,64 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
if os.path.exists(TEST_DB_FILE):
|
|
|
os.remove(TEST_DB_FILE)
|
|
|
|
|
|
+ def _handle_xfrin_response(self):
|
|
|
+ # This helper methods iterates over all RRs (excluding the ending SOA)
|
|
|
+ # transferred, and simply returns the number of RRs. The return value
|
|
|
+ # may be used an assertion value for test cases.
|
|
|
+ rrs = 0
|
|
|
+ for rr in self.conn._handle_axfrin_response():
|
|
|
+ rrs += 1
|
|
|
+ return rrs
|
|
|
+
|
|
|
+ def _create_normal_response_data(self):
|
|
|
+ # This helper method creates a simple sequence of DNS messages that
|
|
|
+ # forms a valid XFR transaction. It consists of two messages, each
|
|
|
+ # containing just a single SOA RR.
|
|
|
+ tsig_1st = self.axfr_response_params['tsig_1st']
|
|
|
+ tsig_2nd = self.axfr_response_params['tsig_2nd']
|
|
|
+ self.conn.reply_data = self.conn.create_response_data(tsig_ctx=tsig_1st)
|
|
|
+ self.conn.reply_data += \
|
|
|
+ self.conn.create_response_data(tsig_ctx=tsig_2nd)
|
|
|
+
|
|
|
+ def _create_soa_response_data(self):
|
|
|
+ # This helper method creates a DNS message that is supposed to be
|
|
|
+ # used a valid response to SOA queries prior to XFR.
|
|
|
+ # If tsig is True, it tries to verify the query with a locally
|
|
|
+ # created TSIG context (which may or may not succeed) so that the
|
|
|
+ # response will include a TSIG.
|
|
|
+ # If axfr_after_soa is True, it resets the response_generator so that
|
|
|
+ # a valid XFR messages will follow.
|
|
|
+
|
|
|
+ verify_ctx = None
|
|
|
+ if self.soa_response_params['tsig']:
|
|
|
+ # xfrin (curreently) always uses TCP. strip off the length field.
|
|
|
+ query_data = self.conn.query_data[2:]
|
|
|
+ query_message = Message(Message.PARSE)
|
|
|
+ query_message.from_wire(query_data)
|
|
|
+ verify_ctx = TSIGContext(TSIG_KEY)
|
|
|
+ verify_ctx.verify(query_message.get_tsig_record(), query_data)
|
|
|
+
|
|
|
+ self.conn.reply_data = self.conn.create_response_data(
|
|
|
+ bad_qid=self.soa_response_params['bad_qid'],
|
|
|
+ response=self.soa_response_params['response'],
|
|
|
+ rcode=self.soa_response_params['rcode'],
|
|
|
+ questions=self.soa_response_params['questions'],
|
|
|
+ tsig_ctx=verify_ctx)
|
|
|
+ if self.soa_response_params['axfr_after_soa'] != None:
|
|
|
+ self.conn.response_generator = \
|
|
|
+ self.soa_response_params['axfr_after_soa']
|
|
|
+
|
|
|
+ def _create_broken_response_data(self):
|
|
|
+ # This helper method creates a bogus "DNS message" that only contains
|
|
|
+ # 4 octets of data. The DNS message parser will raise an exception.
|
|
|
+ bogus_data = b'xxxx'
|
|
|
+ self.conn.reply_data = struct.pack('H', socket.htons(len(bogus_data)))
|
|
|
+ self.conn.reply_data += bogus_data
|
|
|
+
|
|
|
+class TestAXFR(TestXfrinConnection):
|
|
|
+ def setUp(self):
|
|
|
+ super().setUp()
|
|
|
+
|
|
|
def __create_mock_tsig(self, key, error):
|
|
|
# This helper function creates a MockTSIGContext for a given key
|
|
|
# and TSIG error to be used as a result of verify (normally faked
|
|
@@ -760,7 +831,7 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
self.conn._send_query(RRType.AXFR())
|
|
|
self.assertRaises(Exception, self._handle_xfrin_response)
|
|
|
|
|
|
- def test_response(self):
|
|
|
+ def test_axfr_response(self):
|
|
|
# normal case.
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
self.conn._send_query(RRType.AXFR())
|
|
@@ -903,59 +974,26 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
self.assertEqual(self.conn.do_xfrin(True), XFRIN_FAIL)
|
|
|
|
|
|
- def _handle_xfrin_response(self):
|
|
|
- # This helper methods iterates over all RRs (excluding the ending SOA)
|
|
|
- # transferred, and simply returns the number of RRs. The return value
|
|
|
- # may be used an assertion value for test cases.
|
|
|
- rrs = 0
|
|
|
- for rr in self.conn._handle_axfrin_response():
|
|
|
- rrs += 1
|
|
|
- return rrs
|
|
|
-
|
|
|
- def _create_normal_response_data(self):
|
|
|
- # This helper method creates a simple sequence of DNS messages that
|
|
|
- # forms a valid XFR transaction. It consists of two messages, each
|
|
|
- # containing just a single SOA RR.
|
|
|
- tsig_1st = self.axfr_response_params['tsig_1st']
|
|
|
- tsig_2nd = self.axfr_response_params['tsig_2nd']
|
|
|
- self.conn.reply_data = self.conn.create_response_data(tsig_ctx=tsig_1st)
|
|
|
- self.conn.reply_data += \
|
|
|
- self.conn.create_response_data(tsig_ctx=tsig_2nd)
|
|
|
+class TestIXFR(TestXfrinConnection):
|
|
|
+ def setUp(self):
|
|
|
+ super().setUp()
|
|
|
+ self.conn._query_id = self.conn.qid = 1035
|
|
|
+ self.conn._request_serial = 1230
|
|
|
+ self.conn._request_type = RRType.IXFR()
|
|
|
+ self.conn._diff = Diff(MockDataSourceClient(), TEST_ZONE_NAME)
|
|
|
|
|
|
- def _create_soa_response_data(self):
|
|
|
- # This helper method creates a DNS message that is supposed to be
|
|
|
- # used a valid response to SOA queries prior to XFR.
|
|
|
- # If tsig is True, it tries to verify the query with a locally
|
|
|
- # created TSIG context (which may or may not succeed) so that the
|
|
|
- # response will include a TSIG.
|
|
|
- # If axfr_after_soa is True, it resets the response_generator so that
|
|
|
- # a valid XFR messages will follow.
|
|
|
+ def test_ixfr_response(self):
|
|
|
+ '''A simplest form of IXFR response.
|
|
|
|
|
|
- verify_ctx = None
|
|
|
- if self.soa_response_params['tsig']:
|
|
|
- # xfrin (curreently) always uses TCP. strip off the length field.
|
|
|
- query_data = self.conn.query_data[2:]
|
|
|
- query_message = Message(Message.PARSE)
|
|
|
- query_message.from_wire(query_data)
|
|
|
- verify_ctx = TSIGContext(TSIG_KEY)
|
|
|
- verify_ctx.verify(query_message.get_tsig_record(), query_data)
|
|
|
+ It simply updates the zone's SOA one time.
|
|
|
|
|
|
+ '''
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
- bad_qid=self.soa_response_params['bad_qid'],
|
|
|
- response=self.soa_response_params['response'],
|
|
|
- rcode=self.soa_response_params['rcode'],
|
|
|
- questions=self.soa_response_params['questions'],
|
|
|
- tsig_ctx=verify_ctx)
|
|
|
- if self.soa_response_params['axfr_after_soa'] != None:
|
|
|
- self.conn.response_generator = \
|
|
|
- self.soa_response_params['axfr_after_soa']
|
|
|
-
|
|
|
- def _create_broken_response_data(self):
|
|
|
- # This helper method creates a bogus "DNS message" that only contains
|
|
|
- # 4 octets of data. The DNS message parser will raise an exception.
|
|
|
- bogus_data = b'xxxx'
|
|
|
- self.conn.reply_data = struct.pack('H', socket.htons(len(bogus_data)))
|
|
|
- self.conn.reply_data += bogus_data
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR())],
|
|
|
+ answers=[soa_rrset, begin_soa_rrset, soa_rrset, soa_rrset])
|
|
|
+ XfrinInitialSOA().set_xfrstate(self.conn, XfrinInitialSOA())
|
|
|
+ self.conn._handle_xfrin_responses()
|
|
|
+ self.assertEqual(type(XfrinIXFREnd()), type(self.conn.get_xfrstate()))
|
|
|
|
|
|
class TestXfrinRecorder(unittest.TestCase):
|
|
|
def setUp(self):
|