|
@@ -564,6 +564,18 @@ class TestXfrinIXFRAdd(TestXfrinState):
|
|
|
self.assertEqual(type(XfrinIXFRDeleteSOA()),
|
|
|
type(self.conn.get_xfrstate()))
|
|
|
|
|
|
+ def test_handle_new_delete_missing_sig(self):
|
|
|
+ self.conn._end_serial = isc.dns.Serial(1234)
|
|
|
+ # SOA RR whose serial is the current one means we are going to a new
|
|
|
+ # difference, starting with removing that SOA.
|
|
|
+ self.conn._diff.add_data(self.ns_rrset) # put some dummy change
|
|
|
+ self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
+ self.conn._tsig_ctx.last_has_signature = lambda: False
|
|
|
+ # This would try to finish up. But the TSIG pretends not everything is
|
|
|
+ # signed, rejecting it.
|
|
|
+ self.assertRaises(xfrin.XfrinProtocolError, self.state.handle_rr,
|
|
|
+ self.conn, self.begin_soa)
|
|
|
+
|
|
|
def test_handle_out_of_sync(self):
|
|
|
# getting SOA with an inconsistent serial. This is an error.
|
|
|
self.conn._end_serial = isc.dns.Serial(1235)
|
|
@@ -792,12 +804,14 @@ class TestAXFR(TestXfrinConnection):
|
|
|
def tearDown(self):
|
|
|
time.time = self.orig_time_time
|
|
|
|
|
|
- def __create_mock_tsig(self, key, error):
|
|
|
+ def __create_mock_tsig(self, key, error, has_last_signature=True):
|
|
|
# This helper function creates a MockTSIGContext for a given key
|
|
|
# and TSIG error to be used as a result of verify (normally faked
|
|
|
# one)
|
|
|
mock_ctx = MockTSIGContext(key)
|
|
|
mock_ctx.error = error
|
|
|
+ if not has_last_signature:
|
|
|
+ mock_ctx.last_has_signature = lambda: False
|
|
|
return mock_ctx
|
|
|
|
|
|
def __match_exception(self, expected_exception, expected_msg, expression):
|
|
@@ -1379,6 +1393,16 @@ class TestAXFR(TestXfrinConnection):
|
|
|
self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
self.assertEqual(1, self.conn._tsig_ctx.verify_called)
|
|
|
|
|
|
+ def test_do_xfrin_without_last_tsig(self):
|
|
|
+ # TSIG verify will succeed, but it will pretend the last message is
|
|
|
+ # not signed.
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, TSIGError.NOERROR, False)
|
|
|
+ self.conn.response_generator = self._create_normal_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
+ self.assertEqual(2, self.conn._tsig_ctx.verify_called)
|
|
|
+
|
|
|
def test_do_xfrin_with_tsig_fail_for_second_message(self):
|
|
|
# Similar to the previous test, but first verify succeeds. There
|
|
|
# should be a second verify attempt, which will fail, which should
|