|
@@ -636,6 +636,11 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
rrset.add_rdata(Rdata(RRType.SOA(), TEST_RRCLASS, rdata_str))
|
|
|
return rrset
|
|
|
|
|
|
+ def _create_ns(self, nsname='ns.'+TEST_ZONE_NAME_STR):
|
|
|
+ rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.NS(), RRTTL(3600))
|
|
|
+ rrset.add_rdata(Rdata(RRType.NS(), TEST_RRCLASS, nsname))
|
|
|
+ return rrset
|
|
|
+
|
|
|
class TestAXFR(TestXfrinConnection):
|
|
|
def setUp(self):
|
|
|
super().setUp()
|
|
@@ -1201,6 +1206,61 @@ class TestIXFRResponse(TestXfrinConnection):
|
|
|
[[('remove', begin_soa_rrset), ('add', soa_rrset)]],
|
|
|
self.conn._datasrc_client.committed_diffs)
|
|
|
|
|
|
+ def test_ixfr_to_axfr_response(self):
|
|
|
+ '''AXFR-style IXFR response.
|
|
|
+
|
|
|
+ It simply updates the zone's SOA one time.
|
|
|
+
|
|
|
+ '''
|
|
|
+ ns_rr = self._create_ns()
|
|
|
+ a_rr = self._create_a('192.0.2.1')
|
|
|
+ self.conn.reply_data = self.conn.create_response_data(
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR())],
|
|
|
+ answers=[soa_rrset, ns_rr, a_rr, soa_rrset])
|
|
|
+ self.conn._handle_xfrin_responses()
|
|
|
+ self.assertEqual(type(XfrinAXFREnd()), type(self.conn.get_xfrstate()))
|
|
|
+ self.assertEqual([], self.conn._datasrc_client.diffs)
|
|
|
+
|
|
|
+
|
|
|
+ check_diffs(self.assertEqual,
|
|
|
+ [[('add', ns_rr), ('add', a_rr), ('add', soa_rrset)]],
|
|
|
+ self.conn._datasrc_client.committed_diffs)
|
|
|
+
|
|
|
+ def test_ixfr_to_axfr_response_mismatch_soa(self):
|
|
|
+ '''AXFR-style IXFR response, but the two SOA are not the same.
|
|
|
+
|
|
|
+ In the current implementation, we accept it and use the second SOA.
|
|
|
+
|
|
|
+ '''
|
|
|
+ ns_rr = self._create_ns()
|
|
|
+ a_rr = self._create_a('192.0.2.1')
|
|
|
+ self.conn.reply_data = self.conn.create_response_data(
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR())],
|
|
|
+ answers=[soa_rrset, ns_rr, a_rr, begin_soa_rrset])
|
|
|
+ self.conn._handle_xfrin_responses()
|
|
|
+ self.assertEqual(type(XfrinAXFREnd()), type(self.conn.get_xfrstate()))
|
|
|
+ self.assertEqual([], self.conn._datasrc_client.diffs)
|
|
|
+ check_diffs(self.assertEqual,
|
|
|
+ [[('add', ns_rr), ('add', a_rr),
|
|
|
+ ('add', begin_soa_rrset)]],
|
|
|
+ self.conn._datasrc_client.committed_diffs)
|
|
|
+
|
|
|
+ def test_ixfr_to_axfr_response_extra(self):
|
|
|
+ '''Test with an extra RR after the end of AXFR-style IXFR session.
|
|
|
+
|
|
|
+ The session should be rejected, and nothing should be committed.
|
|
|
+
|
|
|
+ '''
|
|
|
+ ns_rr = self._create_ns()
|
|
|
+ a_rr = self._create_a('192.0.2.1')
|
|
|
+ self.conn.reply_data = self.conn.create_response_data(
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR())],
|
|
|
+ answers=[soa_rrset, ns_rr, a_rr, soa_rrset, a_rr])
|
|
|
+ self.assertRaises(XfrinProtocolError,
|
|
|
+ self.conn._handle_xfrin_responses)
|
|
|
+ self.assertEqual(type(XfrinAXFREnd()), type(self.conn.get_xfrstate()))
|
|
|
+ self.assertEqual([], self.conn._datasrc_client.committed_diffs)
|
|
|
+
|
|
|
class TestIXFRSession(TestXfrinConnection):
|
|
|
'''Tests for a full IXFR session (query and response).
|
|
|
|
|
@@ -1285,6 +1345,12 @@ class TestIXFRSessionWithSQLite3(TestXfrinConnection):
|
|
|
self.assertEqual(1, soa.get_rdata_count())
|
|
|
return get_soa_serial(soa.get_rdata()[0])
|
|
|
|
|
|
+ def record_exist(self, name, type):
|
|
|
+ result, finder = self.conn._datasrc_client.find_zone(TEST_ZONE_NAME)
|
|
|
+ self.assertEqual(DataSourceClient.SUCCESS, result)
|
|
|
+ result, soa = finder.find(name, type, None, ZoneFinder.FIND_DEFAULT)
|
|
|
+ return result == ZoneFinder.SUCCESS
|
|
|
+
|
|
|
def test_do_xfrin_sqlite3(self):
|
|
|
def create_ixfr_response():
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
@@ -1316,6 +1382,47 @@ class TestIXFRSessionWithSQLite3(TestXfrinConnection):
|
|
|
self.assertEqual(XFRIN_FAIL, self.conn.do_xfrin(False, RRType.IXFR()))
|
|
|
self.assertEqual(1230, self.get_zone_serial())
|
|
|
|
|
|
+ def test_do_xfrin_axfr_sqlite3(self):
|
|
|
+ '''AXFR-style IXFR.
|
|
|
+
|
|
|
+ '''
|
|
|
+ def create_ixfr_response():
|
|
|
+ self.conn.reply_data = self.conn.create_response_data(
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
+ RRType.IXFR())],
|
|
|
+ answers=[soa_rrset, self._create_ns(), soa_rrset])
|
|
|
+ self.conn.response_generator = create_ixfr_response
|
|
|
+
|
|
|
+
|
|
|
+ self.assertEqual(1230, self.get_zone_serial())
|
|
|
+ self.assertTrue(self.record_exist(Name('dns01.example.com'),
|
|
|
+ RRType.A()))
|
|
|
+ self.assertEqual(XFRIN_OK, self.conn.do_xfrin(False, RRType.IXFR()))
|
|
|
+ self.assertEqual(1234, self.get_zone_serial())
|
|
|
+ self.assertFalse(self.record_exist(Name('dns01.example.com'),
|
|
|
+ RRType.A()))
|
|
|
+
|
|
|
+ def test_do_xfrin_axfr_sqlite3_fail(self):
|
|
|
+ '''Similar to the previous test, but xfrin fails due to error.
|
|
|
+
|
|
|
+ Check the DB is not changed.
|
|
|
+
|
|
|
+ '''
|
|
|
+ def create_ixfr_response():
|
|
|
+ self.conn.reply_data = self.conn.create_response_data(
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
+ RRType.IXFR())],
|
|
|
+ answers=[soa_rrset, self._create_ns(), soa_rrset, soa_rrset])
|
|
|
+ self.conn.response_generator = create_ixfr_response
|
|
|
+
|
|
|
+ self.assertEqual(1230, self.get_zone_serial())
|
|
|
+ self.assertTrue(self.record_exist(Name('dns01.example.com'),
|
|
|
+ RRType.A()))
|
|
|
+ self.assertEqual(XFRIN_FAIL, self.conn.do_xfrin(False, RRType.IXFR()))
|
|
|
+ self.assertEqual(1230, self.get_zone_serial())
|
|
|
+ self.assertTrue(self.record_exist(Name('dns01.example.com'),
|
|
|
+ RRType.A()))
|
|
|
+
|
|
|
class TestXfrinRecorder(unittest.TestCase):
|
|
|
def setUp(self):
|
|
|
self.recorder = XfrinRecorder()
|