|
@@ -15,6 +15,7 @@
|
|
|
|
|
|
import unittest
|
|
|
import socket
|
|
|
+import io
|
|
|
from isc.testutils.tsigctx_mock import MockTSIGContext
|
|
|
from xfrin import *
|
|
|
|
|
@@ -293,6 +294,37 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
self.conn.reply_data = self.conn.create_response_data(bad_qid = True)
|
|
|
self.assertRaises(XfrinException, self._handle_xfrin_response)
|
|
|
|
|
|
+ def test_response_error_code_bad_sig(self):
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, TSIGError.BAD_SIG)
|
|
|
+ self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn.reply_data = self.conn.create_response_data(
|
|
|
+ rcode=Rcode.SERVFAIL())
|
|
|
+ # xfrin should check TSIG before other part of incoming message
|
|
|
+ # validate log message for XfrinException
|
|
|
+ self.conn._verbose = True
|
|
|
+ err_output = io.StringIO()
|
|
|
+ sys.stdout = err_output
|
|
|
+ self.assertRaises(XfrinException, self._handle_xfrin_response)
|
|
|
+ self.assertEqual("[b10-xfrin] TSIG verify fail: BADSIG\n", err_output.getvalue())
|
|
|
+ err_output.close()
|
|
|
+
|
|
|
+ def test_response_bad_qid_bad_key(self):
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, TSIGError.BAD_KEY)
|
|
|
+ self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn.reply_data = self.conn.create_response_data(bad_qid = True)
|
|
|
+ # xfrin should check TSIG before other part of incoming message
|
|
|
+ # validate log message for XfrinException
|
|
|
+ self.conn._verbose = True
|
|
|
+ err_output = io.StringIO()
|
|
|
+ sys.stdout = err_output
|
|
|
+ self.assertRaises(XfrinException, self._handle_xfrin_response)
|
|
|
+ self.assertEqual("[b10-xfrin] TSIG verify fail: BADKEY\n", err_output.getvalue())
|
|
|
+ err_output.close()
|
|
|
+
|
|
|
def test_response_non_response(self):
|
|
|
self.conn._send_query(RRType.AXFR())
|
|
|
self.conn.reply_data = self.conn.create_response_data(response = False)
|
|
@@ -337,6 +369,21 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
|
|
|
+ def test_soacheck_bad_qid_bad_sig(self):
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, TSIGError.BAD_SIG)
|
|
|
+ self.soa_response_params['bad_qid'] = True
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ # xfrin should check TSIG before other part of incoming message
|
|
|
+ # validate log message for XfrinException
|
|
|
+ self.conn._verbose = True
|
|
|
+ err_output = io.StringIO()
|
|
|
+ sys.stdout = err_output
|
|
|
+ self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
+ self.assertEqual("[b10-xfrin] TSIG verify fail: BADSIG\n", err_output.getvalue())
|
|
|
+ err_output.close()
|
|
|
+
|
|
|
def test_soacheck_non_response(self):
|
|
|
self.soa_response_params['response'] = False
|
|
|
self.conn.response_generator = self._create_soa_response_data
|