|
@@ -21,6 +21,7 @@ import sqlite3
|
|
|
import sys
|
|
|
import io
|
|
|
from isc.testutils.tsigctx_mock import MockTSIGContext
|
|
|
+from isc.testutils.rrset_utils import *
|
|
|
from xfrin import *
|
|
|
import xfrin
|
|
|
from isc.xfrin.diff import Diff
|
|
@@ -601,6 +602,7 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
'response': True,
|
|
|
'auth': True,
|
|
|
'rcode': Rcode.NOERROR(),
|
|
|
+ 'answers': default_answers,
|
|
|
'tsig': False,
|
|
|
'axfr_after_soa': self._create_normal_response_data
|
|
|
}
|
|
@@ -659,6 +661,7 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
response=self.soa_response_params['response'],
|
|
|
auth=self.soa_response_params['auth'],
|
|
|
rcode=self.soa_response_params['rcode'],
|
|
|
+ answers=self.soa_response_params['answers'],
|
|
|
questions=self.soa_response_params['questions'],
|
|
|
tsig_ctx=verify_ctx)
|
|
|
if self.soa_response_params['axfr_after_soa'] != None:
|
|
@@ -948,6 +951,25 @@ class TestAXFR(TestXfrinConnection):
|
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
|
|
|
+ def test_soacheck_uptodate(self):
|
|
|
+ # Primary's SOA serial is identical the local serial
|
|
|
+ self.soa_response_params['answers'] = [begin_soa_rrset]
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.assertRaises(XfrinZoneUptodate, self.conn._check_soa_serial)
|
|
|
+
|
|
|
+ def test_soacheck_uptodate2(self):
|
|
|
+ # Primary's SOA serial is "smaller" than the local serial
|
|
|
+ self.soa_response_params['answers'] = [create_soa(1229)]
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.assertRaises(XfrinZoneUptodate, self.conn._check_soa_serial)
|
|
|
+
|
|
|
+ def test_soacheck_uptodate3(self):
|
|
|
+ # Similar to the previous case, but checking the comparison is based
|
|
|
+ # on the serial number arithmetic.
|
|
|
+ self.soa_response_params['answers'] = [create_soa(0xffffffff)]
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.assertRaises(XfrinZoneUptodate, self.conn._check_soa_serial)
|
|
|
+
|
|
|
def test_soacheck_with_tsig(self):
|
|
|
# Use a mock tsig context emulating a validly signed response
|
|
|
self.conn._tsig_key = TSIG_KEY
|
|
@@ -1256,6 +1278,11 @@ class TestAXFR(TestXfrinConnection):
|
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
self.assertEqual(self.conn.do_xfrin(True), XFRIN_OK)
|
|
|
|
|
|
+ def test_do_soacheck_uptodate(self):
|
|
|
+ self.soa_response_params['answers'] = [begin_soa_rrset]
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(True), XFRIN_OK)
|
|
|
+
|
|
|
def test_do_soacheck_and_xfrin_with_tsig(self):
|
|
|
# We are going to have a SOA query/response transaction, followed by
|
|
|
# AXFR, all TSIG signed. xfrin should use a new TSIG context for
|