|
@@ -352,6 +352,37 @@ class TestXfrinInitialSOA(TestXfrinState):
|
|
self.assertRaises(XfrinProtocolError, self.state.handle_rr, self.conn,
|
|
self.assertRaises(XfrinProtocolError, self.state.handle_rr, self.conn,
|
|
self.ns_rrset)
|
|
self.ns_rrset)
|
|
|
|
|
|
|
|
+ def test_handle_ixfr_uptodate(self):
|
|
|
|
+ self.conn._request_type = RRType.IXFR()
|
|
|
|
+ self.conn._request_serial = isc.dns.Serial(1234) # same as soa_rrset
|
|
|
|
+ self.assertTrue(self.state.handle_rr(self.conn, soa_rrset))
|
|
|
|
+ self.assertEqual(type(XfrinIXFRUptodate()),
|
|
|
|
+ type(self.conn.get_xfrstate()))
|
|
|
|
+
|
|
|
|
+ def test_handle_ixfr_uptodate2(self):
|
|
|
|
+ self.conn._request_type = RRType.IXFR()
|
|
|
|
+ self.conn._request_serial = isc.dns.Serial(1235) # > soa_rrset
|
|
|
|
+ self.assertTrue(self.state.handle_rr(self.conn, soa_rrset))
|
|
|
|
+ self.assertEqual(type(XfrinIXFRUptodate()),
|
|
|
|
+ type(self.conn.get_xfrstate()))
|
|
|
|
+
|
|
|
|
+ def test_handle_ixfr_uptodate3(self):
|
|
|
|
+ # Similar to the previous case, but checking serial number arithmetic
|
|
|
|
+ # comparison
|
|
|
|
+ self.conn._request_type = RRType.IXFR()
|
|
|
|
+ self.conn._request_serial = isc.dns.Serial(0xffffffff)
|
|
|
|
+ self.assertTrue(self.state.handle_rr(self.conn, soa_rrset))
|
|
|
|
+ self.assertEqual(type(XfrinFirstData()),
|
|
|
|
+ type(self.conn.get_xfrstate()))
|
|
|
|
+
|
|
|
|
+ def test_handle_axfr_uptodate(self):
|
|
|
|
+ # "request serial" should matter only for IXFR
|
|
|
|
+ self.conn._request_type = RRType.AXFR()
|
|
|
|
+ self.conn._request_serial = isc.dns.Serial(1234) # same as soa_rrset
|
|
|
|
+ self.assertTrue(self.state.handle_rr(self.conn, soa_rrset))
|
|
|
|
+ self.assertEqual(type(XfrinFirstData()),
|
|
|
|
+ type(self.conn.get_xfrstate()))
|
|
|
|
+
|
|
def test_finish_message(self):
|
|
def test_finish_message(self):
|
|
self.assertTrue(self.state.finish_message(self.conn))
|
|
self.assertTrue(self.state.finish_message(self.conn))
|
|
|
|
|
|
@@ -523,6 +554,19 @@ class TestXfrinIXFREnd(TestXfrinState):
|
|
def test_finish_message(self):
|
|
def test_finish_message(self):
|
|
self.assertFalse(self.state.finish_message(self.conn))
|
|
self.assertFalse(self.state.finish_message(self.conn))
|
|
|
|
|
|
|
|
+class TestXfrinIXFREnd(TestXfrinState):
|
|
|
|
+ def setUp(self):
|
|
|
|
+ super().setUp()
|
|
|
|
+ self.state = XfrinIXFRUptodate()
|
|
|
|
+
|
|
|
|
+ def test_handle_rr(self):
|
|
|
|
+ self.assertRaises(XfrinProtocolError, self.state.handle_rr, self.conn,
|
|
|
|
+ self.ns_rrset)
|
|
|
|
+
|
|
|
|
+ def test_finish_message(self):
|
|
|
|
+ self.assertRaises(XfrinZoneUptodate, self.state.finish_message,
|
|
|
|
+ self.conn)
|
|
|
|
+
|
|
class TestXfrinAXFR(TestXfrinState):
|
|
class TestXfrinAXFR(TestXfrinState):
|
|
def setUp(self):
|
|
def setUp(self):
|
|
super().setUp()
|
|
super().setUp()
|
|
@@ -1488,6 +1532,16 @@ class TestIXFRResponse(TestXfrinConnection):
|
|
[[('delete', begin_soa_rrset), ('add', soa_rrset)]],
|
|
[[('delete', begin_soa_rrset), ('add', soa_rrset)]],
|
|
self.conn._datasrc_client.committed_diffs)
|
|
self.conn._datasrc_client.committed_diffs)
|
|
|
|
|
|
|
|
+ def test_ixfr_response_uptodate(self):
|
|
|
|
+ '''IXFR response indicates the zone is new enough'''
|
|
|
|
+ self.conn.reply_data = self.conn.create_response_data(
|
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR())],
|
|
|
|
+ answers=[begin_soa_rrset])
|
|
|
|
+ self.assertRaises(XfrinZoneUptodate, self.conn._handle_xfrin_responses)
|
|
|
|
+ # no diffs should have been committed
|
|
|
|
+ check_diffs(self.assertEqual,
|
|
|
|
+ [], self.conn._datasrc_client.committed_diffs)
|
|
|
|
+
|
|
def test_ixfr_response_broken(self):
|
|
def test_ixfr_response_broken(self):
|
|
'''Test with a broken response.
|
|
'''Test with a broken response.
|
|
|
|
|
|
@@ -1520,6 +1574,22 @@ class TestIXFRResponse(TestXfrinConnection):
|
|
[[('delete', begin_soa_rrset), ('add', soa_rrset)]],
|
|
[[('delete', begin_soa_rrset), ('add', soa_rrset)]],
|
|
self.conn._datasrc_client.committed_diffs)
|
|
self.conn._datasrc_client.committed_diffs)
|
|
|
|
|
|
|
|
+ def test_ixfr_response_uptodate_extra(self):
|
|
|
|
+ '''Similar to 'uptodate' test, but with extra bogus data.
|
|
|
|
+
|
|
|
|
+ In either case an exception will be raised, but in this case it's
|
|
|
|
+ considered an error.
|
|
|
|
+
|
|
|
|
+ '''
|
|
|
|
+ self.conn.reply_data = self.conn.create_response_data(
|
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR())],
|
|
|
|
+ answers=[begin_soa_rrset, soa_rrset])
|
|
|
|
+ self.assertRaises(XfrinProtocolError,
|
|
|
|
+ self.conn._handle_xfrin_responses)
|
|
|
|
+ # no diffs should have been committed
|
|
|
|
+ check_diffs(self.assertEqual,
|
|
|
|
+ [], self.conn._datasrc_client.committed_diffs)
|
|
|
|
+
|
|
def test_ixfr_to_axfr_response(self):
|
|
def test_ixfr_to_axfr_response(self):
|
|
'''AXFR-style IXFR response.
|
|
'''AXFR-style IXFR response.
|
|
|
|
|
|
@@ -1623,13 +1693,25 @@ class TestIXFRSession(TestXfrinConnection):
|
|
self.conn.response_generator = create_ixfr_response
|
|
self.conn.response_generator = create_ixfr_response
|
|
self.assertEqual(XFRIN_FAIL, self.conn.do_xfrin(False, RRType.IXFR()))
|
|
self.assertEqual(XFRIN_FAIL, self.conn.do_xfrin(False, RRType.IXFR()))
|
|
|
|
|
|
- def test_do_xfrin_fail(self):
|
|
|
|
|
|
+ def test_do_xfrin_fail2(self):
|
|
'''IXFR fails due to a bogus DNS message.
|
|
'''IXFR fails due to a bogus DNS message.
|
|
|
|
|
|
'''
|
|
'''
|
|
self._create_broken_response_data()
|
|
self._create_broken_response_data()
|
|
self.assertEqual(XFRIN_FAIL, self.conn.do_xfrin(False, RRType.IXFR()))
|
|
self.assertEqual(XFRIN_FAIL, self.conn.do_xfrin(False, RRType.IXFR()))
|
|
|
|
|
|
|
|
+ def test_do_xfrin_uptodate(self):
|
|
|
|
+ '''IXFR is (gracefully) aborted because serial is not new
|
|
|
|
+
|
|
|
|
+ '''
|
|
|
|
+ def create_response():
|
|
|
|
+ self.conn.reply_data = self.conn.create_response_data(
|
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
|
+ RRType.IXFR())],
|
|
|
|
+ answers=[begin_soa_rrset])
|
|
|
|
+ self.conn.response_generator = create_response
|
|
|
|
+ self.assertEqual(XFRIN_OK, self.conn.do_xfrin(False, RRType.IXFR()))
|
|
|
|
+
|
|
class TestXFRSessionWithSQLite3(TestXfrinConnection):
|
|
class TestXFRSessionWithSQLite3(TestXfrinConnection):
|
|
'''Tests for XFR sessions using an SQLite3 DB.
|
|
'''Tests for XFR sessions using an SQLite3 DB.
|
|
|
|
|