Browse Source

[master] Merge branch 'trac914'
with fixing conflicts in src/bin/xfrin/tests/xfrin_test.py
Also made a couple of cleanups to prevent disruption:
- initialize s_Name::position in constructor
- add tsigctx_mock.py to EXTRA_DIST

JINMEI Tatuya 14 years ago
parent
commit
78502c0214

+ 159 - 23
src/bin/xfrin/tests/xfrin_test.py

@@ -15,6 +15,7 @@
 
 
 import unittest
 import unittest
 import socket
 import socket
+from isc.testutils.tsigctx_mock import MockTSIGContext
 from xfrin import *
 from xfrin import *
 
 
 #
 #
@@ -55,13 +56,6 @@ default_answers = [soa_rrset]
 class XfrinTestException(Exception):
 class XfrinTestException(Exception):
     pass
     pass
 
 
-def strip_mutable_tsig_data(data):
-    # Unfortunately we cannot easily compare TSIG RR because we can't tweak
-    # current time.  As a work around this helper function strips off the time
-    # dependent part of TSIG RDATA, i.e., the MAC (assuming HMAC-MD5) and
-    # Time Signed.
-    return data[0:-32] + data[-26:-22] + data[-6:]
-
 class MockCC():
 class MockCC():
     def get_default_value(self, identifier):
     def get_default_value(self, identifier):
         if identifier == "zones/master_port":
         if identifier == "zones/master_port":
@@ -151,10 +145,11 @@ class MockXfrinConnection(XfrinConnection):
                 self.response_generator()
                 self.response_generator()
         return len(data)
         return len(data)
 
 
-    def create_response_data(self, response = True, bad_qid = False,
-                             rcode = Rcode.NOERROR(),
-                             questions = default_questions,
-                             answers = default_answers):
+    def create_response_data(self, response=True, bad_qid=False,
+                             rcode=Rcode.NOERROR(),
+                             questions=default_questions,
+                             answers=default_answers,
+                             tsig=False):
         resp = Message(Message.RENDER)
         resp = Message(Message.RENDER)
         qid = self.qid
         qid = self.qid
         if bad_qid:
         if bad_qid:
@@ -168,7 +163,13 @@ class MockXfrinConnection(XfrinConnection):
         [resp.add_rrset(Message.SECTION_ANSWER, a) for a in answers]
         [resp.add_rrset(Message.SECTION_ANSWER, a) for a in answers]
 
 
         renderer = MessageRenderer()
         renderer = MessageRenderer()
-        resp.to_wire(renderer)
+        if tsig:
+            # for now, we don't need a valid SIG.  We only need to include
+            # TSIG RR.  So how to add it and which key is used don't matter.
+            tsig_ctx = TSIGContext(TSIG_KEY)
+            resp.to_wire(renderer, tsig_ctx)
+        else:
+            resp.to_wire(renderer)
         reply_data = struct.pack('H', socket.htons(renderer.get_length()))
         reply_data = struct.pack('H', socket.htons(renderer.get_length()))
         reply_data += renderer.get_data()
         reply_data += renderer.get_data()
 
 
@@ -183,14 +184,18 @@ class TestXfrinConnection(unittest.TestCase):
                                         TEST_RRCLASS, TEST_DB_FILE,
                                         TEST_RRCLASS, TEST_DB_FILE,
                                         threading.Event(),
                                         threading.Event(),
                                         TEST_MASTER_IPV4_ADDRINFO)
                                         TEST_MASTER_IPV4_ADDRINFO)
-        self.axfr_after_soa = False
         self.soa_response_params = {
         self.soa_response_params = {
             'questions': [example_soa_question],
             'questions': [example_soa_question],
             'bad_qid': False,
             'bad_qid': False,
             'response': True,
             'response': True,
             'rcode': Rcode.NOERROR(),
             'rcode': Rcode.NOERROR(),
+            'tsig': False,
             'axfr_after_soa': self._create_normal_response_data
             'axfr_after_soa': self._create_normal_response_data
             }
             }
+        self.axfr_response_params = {
+            'tsig_1st': False,
+            'tsig_2nd': False
+            }
 
 
     def tearDown(self):
     def tearDown(self):
         self.conn.close()
         self.conn.close()
@@ -236,6 +241,15 @@ class TestXfrinConnection(unittest.TestCase):
             query_question = Question(Name("example.com."), RRClass.IN(), query_type)
             query_question = Question(Name("example.com."), RRClass.IN(), query_type)
             msg.add_question(query_question)
             msg.add_question(query_question)
             return msg
             return msg
+
+        def message_has_tsig(data):
+            # a simple check if the actual data contains a TSIG RR.
+            # At our level this simple check should suffice; other detailed
+            # tests regarding the TSIG protocol are done in pydnspp.
+            msg = Message(Message.PARSE)
+            msg.from_wire(data)
+            return msg.get_tsig_record() is not None
+
         self.conn._create_query = create_msg
         self.conn._create_query = create_msg
         # soa request
         # soa request
         self.conn._send_query(RRType.SOA())
         self.conn._send_query(RRType.SOA())
@@ -245,22 +259,20 @@ class TestXfrinConnection(unittest.TestCase):
         self.assertEqual(self.conn.query_data, b'\x00\x1d\x105\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\x03com\x00\x00\xfc\x00\x01')
         self.assertEqual(self.conn.query_data, b'\x00\x1d\x105\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\x03com\x00\x00\xfc\x00\x01')
 
 
         # soa request with tsig
         # soa request with tsig
-        self.conn._tsig_ctx = TSIGContext(TSIG_KEY)
+        self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
         self.conn._send_query(RRType.SOA())
         self.conn._send_query(RRType.SOA())
-        tsig_soa_data = strip_mutable_tsig_data(self.conn.query_data)
-        self.assertEqual(tsig_soa_data, b'\x00n\x105\x00\x00\x00\x01\x00\x00\x00\x00\x00\x01\x07example\x03com\x00\x00\x06\x00\x01\x07example\x03com\x00\x00\xfa\x00\xff\x00\x00\x00\x00\x00:\x08hmac-md5\x07sig-alg\x03reg\x03int\x00\x01,\x00\x10\x105\x00\x00\x00\x00')
+        self.assertTrue(message_has_tsig(self.conn.query_data[2:]))
 
 
         # axfr request with tsig
         # axfr request with tsig
         self.conn._send_query(RRType.AXFR())
         self.conn._send_query(RRType.AXFR())
-        tsig_axfr_data = strip_mutable_tsig_data(self.conn.query_data)
-        self.assertEqual(tsig_axfr_data, b'\x00n\x105\x00\x00\x00\x01\x00\x00\x00\x00\x00\x01\x07example\x03com\x00\x00\xfc\x00\x01\x07example\x03com\x00\x00\xfa\x00\xff\x00\x00\x00\x00\x00:\x08hmac-md5\x07sig-alg\x03reg\x03int\x00\x01,\x00\x10\x105\x00\x00\x00\x00')
+        self.assertTrue(message_has_tsig(self.conn.query_data[2:]))
 
 
     def test_response_with_invalid_msg(self):
     def test_response_with_invalid_msg(self):
         self.conn.reply_data = b'aaaxxxx'
         self.conn.reply_data = b'aaaxxxx'
         self.assertRaises(XfrinTestException, self._handle_xfrin_response)
         self.assertRaises(XfrinTestException, self._handle_xfrin_response)
 
 
-    def test_response_with_tsig(self):
-        self.conn._tsig_ctx = TSIGContext(TSIG_KEY)
+    def test_response_with_tsigfail(self):
+        self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
         # server tsig check fail, return with RCODE 9 (NOTAUTH)
         # server tsig check fail, return with RCODE 9 (NOTAUTH)
         self.conn._send_query(RRType.SOA())
         self.conn._send_query(RRType.SOA())
         self.conn.reply_data = self.conn.create_response_data(rcode=Rcode.NOTAUTH())
         self.conn.reply_data = self.conn.create_response_data(rcode=Rcode.NOTAUTH())
@@ -330,6 +342,52 @@ class TestXfrinConnection(unittest.TestCase):
         self.conn.response_generator = self._create_soa_response_data
         self.conn.response_generator = self._create_soa_response_data
         self.assertRaises(XfrinException, self.conn._check_soa_serial)
         self.assertRaises(XfrinException, self.conn._check_soa_serial)
 
 
+    def test_soacheck_with_tsig(self):
+        self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
+        self.conn.response_generator = self._create_soa_response_data
+        # emulate a validly signed response
+        self.conn._tsig_ctx.error = TSIGError.NOERROR
+        self.assertEqual(self.conn._check_soa_serial(), XFRIN_OK)
+        self.assertEqual(self.conn._tsig_ctx.get_error(), TSIGError.NOERROR)
+
+    def test_soacheck_with_tsig_notauth(self):
+        self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
+
+        # emulate a valid error response
+        self.soa_response_params['rcode'] = Rcode.NOTAUTH()
+        self.conn.response_generator = self._create_soa_response_data
+        self.conn._tsig_ctx.error = TSIGError.BAD_SIG
+
+        self.assertRaises(XfrinException, self.conn._check_soa_serial)
+
+    def test_soacheck_with_tsig_noerror_badsig(self):
+        self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
+
+        # emulate a normal response bad verification failure due to BADSIG.
+        # According RFC2845, in this case we should ignore it and keep
+        # waiting for a valid response until a timeout.  But we immediately
+        # treat this as a final failure (just as BIND 9 does).
+        self.conn.response_generator = self._create_soa_response_data
+        self.conn._tsig_ctx.error = TSIGError.BAD_SIG
+
+        self.assertRaises(XfrinException, self.conn._check_soa_serial)
+
+    def test_soacheck_with_tsig_unsigned_response(self):
+        # we can use a real TSIGContext for this.  the response doesn't
+        # contain a TSIG while we sent a signed query.  RFC2845 states
+        # we should wait for a valid response in this case, but we treat
+        # it as a fatal transaction failure, too.
+        self.conn._tsig_ctx = TSIGContext(TSIG_KEY)
+        self.conn.response_generator = self._create_soa_response_data
+        self.assertRaises(XfrinException, self.conn._check_soa_serial)
+
+    def test_soacheck_with_unexpected_tsig_response(self):
+        # we reject unexpected TSIG in responses (following BIND 9's
+        # behavior)
+        self.soa_response_params['tsig'] = True
+        self.conn.response_generator = self._create_soa_response_data
+        self.assertRaises(XfrinException, self.conn._check_soa_serial)
+
     def test_response_shutdown(self):
     def test_response_shutdown(self):
         self.conn.response_generator = self._create_normal_response_data
         self.conn.response_generator = self._create_normal_response_data
         self.conn._shutdown_event.set()
         self.conn._shutdown_event.set()
@@ -363,6 +421,81 @@ class TestXfrinConnection(unittest.TestCase):
         self.conn.response_generator = self._create_normal_response_data
         self.conn.response_generator = self._create_normal_response_data
         self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
         self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
 
 
+    def test_do_xfrin_with_tsig(self):
+        # use TSIG with a mock context.  we fake all verify results to
+        # emulate successful verification.
+        self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
+        self.conn._tsig_ctx.error = TSIGError.NOERROR
+        self.conn.response_generator = self._create_normal_response_data
+        self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
+        # We use two messages in the tests.  The same context should have been
+        # usef for both.
+        self.assertEqual(2, self.conn._tsig_ctx.verify_called)
+
+    def test_do_xfrin_with_tsig_fail(self):
+        # TSIG verify will fail for the first message.  xfrin should fail
+        # immediately.
+        self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
+        self.conn._tsig_ctx.error = TSIGError.BAD_SIG
+        self.conn.response_generator = self._create_normal_response_data
+        self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
+        self.assertEqual(1, self.conn._tsig_ctx.verify_called)
+
+    def test_do_xfrin_with_tsig_fail_for_second_message(self):
+        # Similar to the previous test, but first verify succeeds.  There
+        # should be a second verify attempt, which will fail, which should
+        # make xfrin fail.
+        def fake_tsig_error(ctx):
+            if self.conn._tsig_ctx.verify_called == 1:
+                return TSIGError.NOERROR
+            return TSIGError.BAD_SIG
+        self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
+        self.conn._tsig_ctx.error = fake_tsig_error
+        self.conn.response_generator = self._create_normal_response_data
+        self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
+        self.assertEqual(2, self.conn._tsig_ctx.verify_called)
+
+    def test_do_xfrin_with_missing_tsig(self):
+        # XFR request sent with TSIG, but the response doesn't have TSIG.
+        # xfr should fail.
+        self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
+        self.conn.response_generator = self._create_normal_response_data
+        self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
+        self.assertEqual(1, self.conn._tsig_ctx.verify_called)
+
+    def test_do_xfrin_with_missing_tsig_for_second_message(self):
+        # Similar to the previous test, but firt one contains TSIG and verify
+        # succeeds (due to fake).  The second message lacks TSIG.
+        #
+        # Note: this test case is actually not that trivial:  Skipping
+        # intermediate TSIG is allowed.  In this case, however, the second
+        # message is the last one, which must contain TSIG anyway, so the
+        # expected result is correct.  If/when we support skipping
+        # intermediate TSIGs, we'll need additional test cases.
+        def fake_tsig_error(ctx):
+            if self.conn._tsig_ctx.verify_called == 1:
+                return TSIGError.NOERROR
+            return TSIGError.FORMERR
+        self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
+        self.conn._tsig_ctx.error = fake_tsig_error
+        self.conn.response_generator = self._create_normal_response_data
+        self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
+        self.assertEqual(2, self.conn._tsig_ctx.verify_called)
+
+    def test_do_xfrin_with_unexpected_tsig(self):
+        # XFR request wasn't signed, but response includes TSIG.  Like BIND 9,
+        # we reject that.
+        self.axfr_response_params['tsig_1st'] = True
+        self.conn.response_generator = self._create_normal_response_data
+        self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
+
+    def test_do_xfrin_with_unexpected_tsig_for_second_message(self):
+        # similar to the previous test, but the first message is normal.
+        # the second one contains an unexpected TSIG.  should be rejected.
+        self.axfr_response_params['tsig_2nd'] = True
+        self.conn.response_generator = self._create_normal_response_data
+        self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
+
     def test_do_xfrin_empty_response(self):
     def test_do_xfrin_empty_response(self):
         # skipping the creation of response data, so the transfer will fail.
         # skipping the creation of response data, so the transfer will fail.
         self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
         self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
@@ -408,8 +541,10 @@ class TestXfrinConnection(unittest.TestCase):
         # This helper method creates a simple sequence of DNS messages that
         # This helper method creates a simple sequence of DNS messages that
         # forms a valid XFR transaction.  It consists of two messages, each
         # forms a valid XFR transaction.  It consists of two messages, each
         # containing just a single SOA RR.
         # containing just a single SOA RR.
-        self.conn.reply_data = self.conn.create_response_data()
-        self.conn.reply_data += self.conn.create_response_data()
+        tsig_1st = self.axfr_response_params['tsig_1st']
+        tsig_2nd = self.axfr_response_params['tsig_2nd']
+        self.conn.reply_data = self.conn.create_response_data(tsig=tsig_1st)
+        self.conn.reply_data += self.conn.create_response_data(tsig=tsig_2nd)
 
 
     def _create_soa_response_data(self):
     def _create_soa_response_data(self):
         # This helper method creates a DNS message that is supposed to be
         # This helper method creates a DNS message that is supposed to be
@@ -420,7 +555,8 @@ class TestXfrinConnection(unittest.TestCase):
             bad_qid=self.soa_response_params['bad_qid'],
             bad_qid=self.soa_response_params['bad_qid'],
             response=self.soa_response_params['response'],
             response=self.soa_response_params['response'],
             rcode=self.soa_response_params['rcode'],
             rcode=self.soa_response_params['rcode'],
-            questions=self.soa_response_params['questions'])
+            questions=self.soa_response_params['questions'],
+            tsig=self.soa_response_params['tsig'])
         if self.soa_response_params['axfr_after_soa'] != None:
         if self.soa_response_params['axfr_after_soa'] != None:
             self.conn.response_generator = self.soa_response_params['axfr_after_soa']
             self.conn.response_generator = self.soa_response_params['axfr_after_soa']
 
 

+ 25 - 4
src/bin/xfrin/xfrin.py.in

@@ -207,6 +207,22 @@ class XfrinConnection(asyncore.dispatcher):
 
 
         return data
         return data
 
 
+    def _check_response_tsig(self, msg, response_data):
+        tsig_record = msg.get_tsig_record()
+        if self._tsig_ctx is not None:
+            tsig_error = self._tsig_ctx.verify(tsig_record, response_data)
+            if tsig_error != TSIGError.NOERROR:
+                raise XfrinException('TSIG verify fail: %s' % str(tsig_error))
+        elif tsig_record is not None:
+            # If the response includes a TSIG while we didn't sign the query,
+            # we treat it as an error.  RFC doesn't say anything about this
+            # case, but it clearly states the server must not sign a response
+            # to an unsigned request.  Although we could be flexible, no sane
+            # implementation would return such a response, and since this is
+            # part of security mechanism, it's probably better to be more
+            # strict.
+            raise XfrinException('Unexpected TSIG in response')
+
     def _check_soa_serial(self):
     def _check_soa_serial(self):
         ''' Compare the soa serial, if soa serial in master is less than
         ''' Compare the soa serial, if soa serial in master is less than
         the soa serial in local, Finish xfrin.
         the soa serial in local, Finish xfrin.
@@ -214,7 +230,7 @@ class XfrinConnection(asyncore.dispatcher):
         True: soa serial in master is bigger
         True: soa serial in master is bigger
         '''
         '''
 
 
-        self._send_query(RRType("SOA"))
+        self._send_query(RRType.SOA())
         data_len = self._get_request_response(2)
         data_len = self._get_request_response(2)
         msg_len = socket.htons(struct.unpack('H', data_len)[0])
         msg_len = socket.htons(struct.unpack('H', data_len)[0])
         soa_response = self._get_request_response(msg_len)
         soa_response = self._get_request_response(msg_len)
@@ -225,6 +241,9 @@ class XfrinConnection(asyncore.dispatcher):
         # strict we should be (see the comment in _check_response_header())
         # strict we should be (see the comment in _check_response_header())
         self._check_response_header(msg)
         self._check_response_header(msg)
 
 
+        # TSIG related checks, including an unexpected signed response
+        self._check_response_tsig(msg, soa_response)
+
         # TODO, need select soa record from data source then compare the two
         # TODO, need select soa record from data source then compare the two
         # serial, current just return OK, since this function hasn't been used
         # serial, current just return OK, since this function hasn't been used
         # now.
         # now.
@@ -242,8 +261,7 @@ class XfrinConnection(asyncore.dispatcher):
             logstr = 'transfer of \'%s\': AXFR ' % self._zone_name
             logstr = 'transfer of \'%s\': AXFR ' % self._zone_name
             if ret == XFRIN_OK:
             if ret == XFRIN_OK:
                 self.log_msg(logstr + 'started')
                 self.log_msg(logstr + 'started')
-                # TODO: .AXFR() RRType.AXFR()
-                self._send_query(RRType(252))
+                self._send_query(RRType.AXFR())
                 isc.datasrc.sqlite3_ds.load(self._db_file, self._zone_name,
                 isc.datasrc.sqlite3_ds.load(self._db_file, self._zone_name,
                                             self._handle_xfrin_response)
                                             self._handle_xfrin_response)
 
 
@@ -314,7 +332,7 @@ class XfrinConnection(asyncore.dispatcher):
 
 
             for rdata in rrset.get_rdata():
             for rdata in rrset.get_rdata():
                 # Count the soa record count
                 # Count the soa record count
-                if rrset.get_type() == RRType("SOA"):
+                if rrset.get_type() == RRType.SOA():
                     self._soa_rr_count += 1
                     self._soa_rr_count += 1
 
 
                     # XXX: the current DNS message parser can't preserve the
                     # XXX: the current DNS message parser can't preserve the
@@ -340,6 +358,9 @@ class XfrinConnection(asyncore.dispatcher):
             msg.from_wire(recvdata)
             msg.from_wire(recvdata)
             self._check_response_status(msg)
             self._check_response_status(msg)
 
 
+            # TSIG related checks, including an unexpected signed response
+            self._check_response_tsig(msg, recvdata)
+
             answer_section = msg.get_section(Message.SECTION_ANSWER)
             answer_section = msg.get_section(Message.SECTION_ANSWER)
             for rr in self._handle_answer_section(answer_section):
             for rr in self._handle_answer_section(answer_section):
                 yield rr
                 yield rr

+ 1 - 1
src/lib/dns/python/name_python.h

@@ -56,7 +56,7 @@ public:
 
 
 class s_Name : public PyObject {
 class s_Name : public PyObject {
 public:
 public:
-    s_Name() : cppobj(NULL) {}
+    s_Name() : cppobj(NULL), position(0) {}
     Name* cppobj;
     Name* cppobj;
     size_t position;
     size_t position;
 };
 };

+ 8 - 1
src/lib/dns/python/tsig_python.cc

@@ -270,7 +270,14 @@ PyTypeObject tsigcontext_type = {
     NULL,                               // tp_getattro
     NULL,                               // tp_getattro
     NULL,                               // tp_setattro
     NULL,                               // tp_setattro
     NULL,                               // tp_as_buffer
     NULL,                               // tp_as_buffer
-    Py_TPFLAGS_DEFAULT,                 // tp_flags
+
+    // We allow the python version of TSIGContext to act as a base class.
+    // From pure design point of view, this is wrong because it's not intended
+    // to be inherited.  However, cryptographic operations are generally
+    // difficult to test, so it would be very advantageous if we can define
+    // a mock context class.
+    Py_TPFLAGS_DEFAULT|Py_TPFLAGS_BASETYPE, // tp_flags
+
     "The TSIGContext class objects is...(COMPLETE THIS)",
     "The TSIGContext class objects is...(COMPLETE THIS)",
     NULL,                               // tp_traverse
     NULL,                               // tp_traverse
     NULL,                               // tp_clear
     NULL,                               // tp_clear

+ 1 - 1
src/lib/python/isc/testutils/Makefile.am

@@ -1 +1 @@
-EXTRA_DIST = __init__.py parse_args.py
+EXTRA_DIST = __init__.py parse_args.py tsigctx_mock.py

+ 53 - 0
src/lib/python/isc/testutils/tsigctx_mock.py

@@ -0,0 +1,53 @@
+# Copyright (C) 2011  Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from pydnspp import *
+
+class MockTSIGContext(TSIGContext):
+    """Tthis is a mock of TSIGContext class for testing.
+    Via its "error" attribute, you can fake the result of verify(), thereby
+    you can test many of TSIG related tests without requiring actual crypto
+    setups.  "error" should be either a TSIGError type value or a callable
+    object (typically a function).  In the latter case, the callable object
+    will take the context as a parameter, and is expected to return a
+    TSIGError object.
+    """
+
+    def __init__(self, tsig_key):
+        super().__init__(tsig_key)
+        self.error = None
+        self.verify_called = 0  # number of verify() called
+
+    def sign(self, qid, data):
+        """Transparently delegate the processing to the super class.
+        It doesn't matter much anyway because normal applications that would
+        be implemented in Python normally won't call TSIGContext.sign()
+        directly.
+        """
+        return super().sign(qid, data)
+
+    def verify(self, tsig_record, data):
+        self.verify_called += 1
+        # call real "verify" so that we can notice any misue (which would
+        # result in exception.
+        super().verify(tsig_record, data)
+        return self.get_error()
+
+    def get_error(self):
+        if self.error is None:
+            return super().get_error()
+        if hasattr(self.error, '__call__'):
+            return self.error(self)
+        return self.error