|
@@ -125,10 +125,11 @@ class MockXfrinConnection(XfrinConnection):
|
|
|
self.response_generator()
|
|
|
return len(data)
|
|
|
|
|
|
- def create_response_data(self, response = True, bad_qid = False,
|
|
|
- rcode = Rcode.NOERROR(),
|
|
|
- questions = default_questions,
|
|
|
- answers = default_answers):
|
|
|
+ def create_response_data(self, response=True, bad_qid=False,
|
|
|
+ rcode=Rcode.NOERROR(),
|
|
|
+ questions=default_questions,
|
|
|
+ answers=default_answers,
|
|
|
+ tsig=False):
|
|
|
resp = Message(Message.RENDER)
|
|
|
qid = self.qid
|
|
|
if bad_qid:
|
|
@@ -142,7 +143,13 @@ class MockXfrinConnection(XfrinConnection):
|
|
|
[resp.add_rrset(Message.SECTION_ANSWER, a) for a in answers]
|
|
|
|
|
|
renderer = MessageRenderer()
|
|
|
- resp.to_wire(renderer)
|
|
|
+ if tsig:
|
|
|
+
|
|
|
+
|
|
|
+ tsig_ctx = TSIGContext(TSIG_KEY)
|
|
|
+ resp.to_wire(renderer, tsig_ctx)
|
|
|
+ else:
|
|
|
+ resp.to_wire(renderer)
|
|
|
reply_data = struct.pack('H', socket.htons(renderer.get_length()))
|
|
|
reply_data += renderer.get_data()
|
|
|
|
|
@@ -157,14 +164,18 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
TEST_RRCLASS, TEST_DB_FILE,
|
|
|
threading.Event(),
|
|
|
TEST_MASTER_IPV4_ADDRINFO)
|
|
|
- self.axfr_after_soa = False
|
|
|
self.soa_response_params = {
|
|
|
'questions': [example_soa_question],
|
|
|
'bad_qid': False,
|
|
|
'response': True,
|
|
|
'rcode': Rcode.NOERROR(),
|
|
|
+ 'tsig': False,
|
|
|
'axfr_after_soa': self._create_normal_response_data
|
|
|
}
|
|
|
+ self.axfr_response_params = {
|
|
|
+ 'tsig_1st': False,
|
|
|
+ 'tsig_2nd': False
|
|
|
+ }
|
|
|
|
|
|
def tearDown(self):
|
|
|
self.conn.close()
|
|
@@ -240,7 +251,7 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
self.conn.reply_data = b'aaaxxxx'
|
|
|
self.assertRaises(XfrinTestException, self._handle_xfrin_response)
|
|
|
|
|
|
- def test_response_with_tsig(self):
|
|
|
+ def test_response_with_tsigfail(self):
|
|
|
self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
|
|
|
self.conn._send_query(RRType.SOA())
|
|
@@ -311,6 +322,52 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
|
|
|
+ def test_soacheck_with_tsig(self):
|
|
|
+ self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+
|
|
|
+ self.conn._tsig_ctx.error = TSIGError.NOERROR
|
|
|
+ self.assertEqual(self.conn._check_soa_serial(), XFRIN_OK)
|
|
|
+ self.assertEqual(self.conn._tsig_ctx.get_error(), TSIGError.NOERROR)
|
|
|
+
|
|
|
+ def test_soacheck_with_tsig_notauth(self):
|
|
|
+ self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
+
|
|
|
+
|
|
|
+ self.soa_response_params['rcode'] = Rcode.NOTAUTH()
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.conn._tsig_ctx.error = TSIGError.BAD_SIG
|
|
|
+
|
|
|
+ self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
+
|
|
|
+ def test_soacheck_with_tsig_noerror_badsig(self):
|
|
|
+ self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.conn._tsig_ctx.error = TSIGError.BAD_SIG
|
|
|
+
|
|
|
+ self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
+
|
|
|
+ def test_soacheck_with_tsig_unsigned_response(self):
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ self.conn._tsig_ctx = TSIGContext(TSIG_KEY)
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
+
|
|
|
+ def test_soacheck_with_unexpected_tsig_response(self):
|
|
|
+
|
|
|
+
|
|
|
+ self.soa_response_params['tsig'] = True
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
+
|
|
|
def test_response_shutdown(self):
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
self.conn._shutdown_event.set()
|
|
@@ -344,6 +401,81 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
|
|
|
|
|
|
+ def test_do_xfrin_with_tsig(self):
|
|
|
+
|
|
|
+
|
|
|
+ self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
+ self.conn._tsig_ctx.error = TSIGError.NOERROR
|
|
|
+ self.conn.response_generator = self._create_normal_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
|
|
|
+
|
|
|
+
|
|
|
+ self.assertEqual(2, self.conn._tsig_ctx.verify_called)
|
|
|
+
|
|
|
+ def test_do_xfrin_with_tsig_fail(self):
|
|
|
+
|
|
|
+
|
|
|
+ self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
+ self.conn._tsig_ctx.error = TSIGError.BAD_SIG
|
|
|
+ self.conn.response_generator = self._create_normal_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
+ self.assertEqual(1, self.conn._tsig_ctx.verify_called)
|
|
|
+
|
|
|
+ def test_do_xfrin_with_tsig_fail_for_second_message(self):
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ def fake_tsig_error(ctx):
|
|
|
+ if self.conn._tsig_ctx.verify_called == 1:
|
|
|
+ return TSIGError.NOERROR
|
|
|
+ return TSIGError.BAD_SIG
|
|
|
+ self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
+ self.conn._tsig_ctx.error = fake_tsig_error
|
|
|
+ self.conn.response_generator = self._create_normal_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
+ self.assertEqual(2, self.conn._tsig_ctx.verify_called)
|
|
|
+
|
|
|
+ def test_do_xfrin_with_missing_tsig(self):
|
|
|
+
|
|
|
+
|
|
|
+ self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
+ self.conn.response_generator = self._create_normal_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
+ self.assertEqual(1, self.conn._tsig_ctx.verify_called)
|
|
|
+
|
|
|
+ def test_do_xfrin_with_missing_tsig_for_second_message(self):
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ def fake_tsig_error(ctx):
|
|
|
+ if self.conn._tsig_ctx.verify_called == 1:
|
|
|
+ return TSIGError.NOERROR
|
|
|
+ return TSIGError.FORMERR
|
|
|
+ self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
+ self.conn._tsig_ctx.error = fake_tsig_error
|
|
|
+ self.conn.response_generator = self._create_normal_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
+ self.assertEqual(2, self.conn._tsig_ctx.verify_called)
|
|
|
+
|
|
|
+ def test_do_xfrin_with_unexpected_tsig(self):
|
|
|
+
|
|
|
+
|
|
|
+ self.axfr_response_params['tsig_1st'] = True
|
|
|
+ self.conn.response_generator = self._create_normal_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
+
|
|
|
+ def test_do_xfrin_with_unexpected_tsig_for_second_message(self):
|
|
|
+
|
|
|
+
|
|
|
+ self.axfr_response_params['tsig_2nd'] = True
|
|
|
+ self.conn.response_generator = self._create_normal_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
+
|
|
|
def test_do_xfrin_empty_response(self):
|
|
|
|
|
|
self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
@@ -389,8 +521,10 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
|
|
|
|
|
|
|
|
|
- self.conn.reply_data = self.conn.create_response_data()
|
|
|
- self.conn.reply_data += self.conn.create_response_data()
|
|
|
+ tsig_1st = self.axfr_response_params['tsig_1st']
|
|
|
+ tsig_2nd = self.axfr_response_params['tsig_2nd']
|
|
|
+ self.conn.reply_data = self.conn.create_response_data(tsig=tsig_1st)
|
|
|
+ self.conn.reply_data += self.conn.create_response_data(tsig=tsig_2nd)
|
|
|
|
|
|
def _create_soa_response_data(self):
|
|
|
|
|
@@ -401,7 +535,8 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
bad_qid=self.soa_response_params['bad_qid'],
|
|
|
response=self.soa_response_params['response'],
|
|
|
rcode=self.soa_response_params['rcode'],
|
|
|
- questions=self.soa_response_params['questions'])
|
|
|
+ questions=self.soa_response_params['questions'],
|
|
|
+ tsig=self.soa_response_params['tsig'])
|
|
|
if self.soa_response_params['axfr_after_soa'] != None:
|
|
|
self.conn.response_generator = self.soa_response_params['axfr_after_soa']
|
|
|
|