|
@@ -149,7 +149,7 @@ class MockXfrinConnection(XfrinConnection):
|
|
|
rcode=Rcode.NOERROR(),
|
|
|
questions=default_questions,
|
|
|
answers=default_answers,
|
|
|
- tsig=False):
|
|
|
+ tsig_ctx=None):
|
|
|
resp = Message(Message.RENDER)
|
|
|
qid = self.qid
|
|
|
if bad_qid:
|
|
@@ -163,10 +163,7 @@ class MockXfrinConnection(XfrinConnection):
|
|
|
[resp.add_rrset(Message.SECTION_ANSWER, a) for a in answers]
|
|
|
|
|
|
renderer = MessageRenderer()
|
|
|
- if tsig:
|
|
|
- # for now, we don't need a valid SIG. We only need to include
|
|
|
- # TSIG RR. So how to add it and which key is used don't matter.
|
|
|
- tsig_ctx = TSIGContext(TSIG_KEY)
|
|
|
+ if tsig_ctx is not None:
|
|
|
resp.to_wire(renderer, tsig_ctx)
|
|
|
else:
|
|
|
resp.to_wire(renderer)
|
|
@@ -193,8 +190,8 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
'axfr_after_soa': self._create_normal_response_data
|
|
|
}
|
|
|
self.axfr_response_params = {
|
|
|
- 'tsig_1st': False,
|
|
|
- 'tsig_2nd': False
|
|
|
+ 'tsig_1st': None,
|
|
|
+ 'tsig_2nd': None
|
|
|
}
|
|
|
|
|
|
def tearDown(self):
|
|
@@ -202,6 +199,14 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
if os.path.exists(TEST_DB_FILE):
|
|
|
os.remove(TEST_DB_FILE)
|
|
|
|
|
|
+ def __create_mock_tsig(self, key, error):
|
|
|
+ # This helper function creates a MockTSIGContext for a given key
|
|
|
+ # and TSIG error to be used as a result of verify (normally faked
|
|
|
+ # one)
|
|
|
+ mock_ctx = MockTSIGContext(key)
|
|
|
+ mock_ctx.error = error
|
|
|
+ return mock_ctx
|
|
|
+
|
|
|
def test_close(self):
|
|
|
# we shouldn't be using the global asyncore map.
|
|
|
self.assertEqual(len(asyncore.socket_map), 0)
|
|
@@ -259,7 +264,7 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
self.assertEqual(self.conn.query_data, b'\x00\x1d\x105\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\x03com\x00\x00\xfc\x00\x01')
|
|
|
|
|
|
# soa request with tsig
|
|
|
- self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
self.conn._send_query(RRType.SOA())
|
|
|
self.assertTrue(message_has_tsig(self.conn.query_data[2:]))
|
|
|
|
|
@@ -272,7 +277,7 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
self.assertRaises(XfrinTestException, self._handle_xfrin_response)
|
|
|
|
|
|
def test_response_with_tsigfail(self):
|
|
|
- self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
# server tsig check fail, return with RCODE 9 (NOTAUTH)
|
|
|
self.conn._send_query(RRType.SOA())
|
|
|
self.conn.reply_data = self.conn.create_response_data(rcode=Rcode.NOTAUTH())
|
|
@@ -343,32 +348,34 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
|
|
|
def test_soacheck_with_tsig(self):
|
|
|
- self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
+ # Use a mock tsig context emulating a validly signed response
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, TSIGError.NOERROR)
|
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
- # emulate a validly signed response
|
|
|
- self.conn._tsig_ctx.error = TSIGError.NOERROR
|
|
|
self.assertEqual(self.conn._check_soa_serial(), XFRIN_OK)
|
|
|
self.assertEqual(self.conn._tsig_ctx.get_error(), TSIGError.NOERROR)
|
|
|
|
|
|
def test_soacheck_with_tsig_notauth(self):
|
|
|
- self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
-
|
|
|
# emulate a valid error response
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, TSIGError.BAD_SIG)
|
|
|
self.soa_response_params['rcode'] = Rcode.NOTAUTH()
|
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
- self.conn._tsig_ctx.error = TSIGError.BAD_SIG
|
|
|
|
|
|
self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
|
|
|
def test_soacheck_with_tsig_noerror_badsig(self):
|
|
|
- self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, TSIGError.BAD_SIG)
|
|
|
|
|
|
# emulate a normal response bad verification failure due to BADSIG.
|
|
|
# According RFC2845, in this case we should ignore it and keep
|
|
|
# waiting for a valid response until a timeout. But we immediately
|
|
|
# treat this as a final failure (just as BIND 9 does).
|
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
- self.conn._tsig_ctx.error = TSIGError.BAD_SIG
|
|
|
|
|
|
self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
|
|
@@ -377,7 +384,7 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
# contain a TSIG while we sent a signed query. RFC2845 states
|
|
|
# we should wait for a valid response in this case, but we treat
|
|
|
# it as a fatal transaction failure, too.
|
|
|
- self.conn._tsig_ctx = TSIGContext(TSIG_KEY)
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
|
|
@@ -424,8 +431,9 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
def test_do_xfrin_with_tsig(self):
|
|
|
# use TSIG with a mock context. we fake all verify results to
|
|
|
# emulate successful verification.
|
|
|
- self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
- self.conn._tsig_ctx.error = TSIGError.NOERROR
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, TSIGError.NOERROR)
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
|
|
|
# We use two messages in the tests. The same context should have been
|
|
@@ -435,8 +443,9 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
def test_do_xfrin_with_tsig_fail(self):
|
|
|
# TSIG verify will fail for the first message. xfrin should fail
|
|
|
# immediately.
|
|
|
- self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
- self.conn._tsig_ctx.error = TSIGError.BAD_SIG
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, TSIGError.BAD_SIG)
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
self.assertEqual(1, self.conn._tsig_ctx.verify_called)
|
|
@@ -449,8 +458,9 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
if self.conn._tsig_ctx.verify_called == 1:
|
|
|
return TSIGError.NOERROR
|
|
|
return TSIGError.BAD_SIG
|
|
|
- self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
- self.conn._tsig_ctx.error = fake_tsig_error
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, fake_tsig_error)
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
self.assertEqual(2, self.conn._tsig_ctx.verify_called)
|
|
@@ -458,6 +468,9 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
def test_do_xfrin_with_missing_tsig(self):
|
|
|
# XFR request sent with TSIG, but the response doesn't have TSIG.
|
|
|
# xfr should fail.
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, None)
|
|
|
self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
@@ -476,8 +489,9 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
if self.conn._tsig_ctx.verify_called == 1:
|
|
|
return TSIGError.NOERROR
|
|
|
return TSIGError.FORMERR
|
|
|
- self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
- self.conn._tsig_ctx.error = fake_tsig_error
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, fake_tsig_error)
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
self.assertEqual(2, self.conn._tsig_ctx.verify_called)
|
|
@@ -485,14 +499,14 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
def test_do_xfrin_with_unexpected_tsig(self):
|
|
|
# XFR request wasn't signed, but response includes TSIG. Like BIND 9,
|
|
|
# we reject that.
|
|
|
- self.axfr_response_params['tsig_1st'] = True
|
|
|
+ self.axfr_response_params['tsig_1st'] = TSIGContext(TSIG_KEY)
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
|
|
|
def test_do_xfrin_with_unexpected_tsig_for_second_message(self):
|
|
|
# similar to the previous test, but the first message is normal.
|
|
|
# the second one contains an unexpected TSIG. should be rejected.
|
|
|
- self.axfr_response_params['tsig_2nd'] = True
|
|
|
+ self.axfr_response_params['tsig_2nd'] = TSIGContext(TSIG_KEY)
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
|
|
@@ -514,6 +528,23 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
self.assertEqual(self.conn.do_xfrin(True), XFRIN_OK)
|
|
|
|
|
|
+ def test_do_soacheck_and_xfrin_with_tsig(self):
|
|
|
+ # We are going to have a SOA query/response transaction, followed by
|
|
|
+ # AXFR, all TSIG signed. xfrin should use a new TSIG context for
|
|
|
+ # AXFR. We are not interested in whether verify works correctly in
|
|
|
+ # this tests, so we simply fake the results (they need to success for
|
|
|
+ # this test)
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, TSIGError.NOERROR)
|
|
|
+ self.soa_response_params['tsig'] = True
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(True), XFRIN_OK)
|
|
|
+ # We should've got 3 response messages: 1 SOA and two AXFR, but
|
|
|
+ # the context should be replaced for AXFR, so verify() should be
|
|
|
+ # called only twice for the latest context.
|
|
|
+ self.assertEqual(2, self.conn._tsig_ctx.verify_called)
|
|
|
+
|
|
|
def test_do_soacheck_broken_response(self):
|
|
|
self.conn.response_generator = self._create_broken_response_data
|
|
|
# XXX: TODO: this test failed here, should xfr not raise an
|
|
@@ -543,22 +574,37 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
# containing just a single SOA RR.
|
|
|
tsig_1st = self.axfr_response_params['tsig_1st']
|
|
|
tsig_2nd = self.axfr_response_params['tsig_2nd']
|
|
|
- self.conn.reply_data = self.conn.create_response_data(tsig=tsig_1st)
|
|
|
- self.conn.reply_data += self.conn.create_response_data(tsig=tsig_2nd)
|
|
|
+ self.conn.reply_data = self.conn.create_response_data(tsig_ctx=tsig_1st)
|
|
|
+ self.conn.reply_data += \
|
|
|
+ self.conn.create_response_data(tsig_ctx=tsig_2nd)
|
|
|
|
|
|
def _create_soa_response_data(self):
|
|
|
# This helper method creates a DNS message that is supposed to be
|
|
|
# used a valid response to SOA queries prior to XFR.
|
|
|
+ # If tsig is True, it tries to verify the query with a locally
|
|
|
+ # created TSIG context (which may or may not succeed) so that the
|
|
|
+ # response will include a TSIG.
|
|
|
# If axfr_after_soa is True, it resets the response_generator so that
|
|
|
# a valid XFR messages will follow.
|
|
|
+
|
|
|
+ verify_ctx = None
|
|
|
+ if self.soa_response_params['tsig']:
|
|
|
+ # xfrin (curreently) always uses TCP. strip off the length field.
|
|
|
+ query_data = self.conn.query_data[2:]
|
|
|
+ query_message = Message(Message.PARSE)
|
|
|
+ query_message.from_wire(query_data)
|
|
|
+ verify_ctx = TSIGContext(TSIG_KEY)
|
|
|
+ verify_ctx.verify(query_message.get_tsig_record(), query_data)
|
|
|
+
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
bad_qid=self.soa_response_params['bad_qid'],
|
|
|
response=self.soa_response_params['response'],
|
|
|
rcode=self.soa_response_params['rcode'],
|
|
|
questions=self.soa_response_params['questions'],
|
|
|
- tsig=self.soa_response_params['tsig'])
|
|
|
+ tsig_ctx=verify_ctx)
|
|
|
if self.soa_response_params['axfr_after_soa'] != None:
|
|
|
- self.conn.response_generator = self.soa_response_params['axfr_after_soa']
|
|
|
+ self.conn.response_generator = \
|
|
|
+ self.soa_response_params['axfr_after_soa']
|
|
|
|
|
|
def _create_broken_response_data(self):
|
|
|
# This helper method creates a bogus "DNS message" that only contains
|