|
@@ -54,13 +54,6 @@ default_answers = [soa_rrset]
|
|
|
class XfrinTestException(Exception):
|
|
|
pass
|
|
|
|
|
|
-def strip_mutable_tsig_data(data):
|
|
|
- # Unfortunately we cannot easily compare TSIG RR because we can't tweak
|
|
|
- # current time. As a work around this helper function strips off the time
|
|
|
- # dependent part of TSIG RDATA, i.e., the MAC (assuming HMAC-MD5) and
|
|
|
- # Time Signed.
|
|
|
- return data[0:-32] + data[-26:-22] + data[-6:]
|
|
|
-
|
|
|
class MockXfrin(Xfrin):
|
|
|
# This is a class attribute of a callable object that specifies a non
|
|
|
# default behavior triggered in _cc_check_command(). Specific test methods
|
|
@@ -217,6 +210,15 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
query_question = Question(Name("example.com."), RRClass.IN(), query_type)
|
|
|
msg.add_question(query_question)
|
|
|
return msg
|
|
|
+
|
|
|
+ def message_has_tsig(data):
|
|
|
+ # a simple check if the actual data contains a TSIG RR.
|
|
|
+ # At our level this simple check should suffice; other detailed
|
|
|
+ # tests regarding the TSIG protocol are done in pydnspp.
|
|
|
+ msg = Message(Message.PARSE)
|
|
|
+ msg.from_wire(data)
|
|
|
+ return msg.get_tsig_record() is not None
|
|
|
+
|
|
|
self.conn._create_query = create_msg
|
|
|
# soa request
|
|
|
self.conn._send_query(RRType.SOA())
|
|
@@ -228,13 +230,11 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
# soa request with tsig
|
|
|
self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
self.conn._send_query(RRType.SOA())
|
|
|
- tsig_soa_data = strip_mutable_tsig_data(self.conn.query_data)
|
|
|
- self.assertEqual(tsig_soa_data, b'\x00n\x105\x00\x00\x00\x01\x00\x00\x00\x00\x00\x01\x07example\x03com\x00\x00\x06\x00\x01\x07example\x03com\x00\x00\xfa\x00\xff\x00\x00\x00\x00\x00:\x08hmac-md5\x07sig-alg\x03reg\x03int\x00\x01,\x00\x10\x105\x00\x00\x00\x00')
|
|
|
+ self.assertTrue(message_has_tsig(self.conn.query_data[2:]))
|
|
|
|
|
|
# axfr request with tsig
|
|
|
self.conn._send_query(RRType.AXFR())
|
|
|
- tsig_axfr_data = strip_mutable_tsig_data(self.conn.query_data)
|
|
|
- self.assertEqual(tsig_axfr_data, b'\x00n\x105\x00\x00\x00\x01\x00\x00\x00\x00\x00\x01\x07example\x03com\x00\x00\xfc\x00\x01\x07example\x03com\x00\x00\xfa\x00\xff\x00\x00\x00\x00\x00:\x08hmac-md5\x07sig-alg\x03reg\x03int\x00\x01,\x00\x10\x105\x00\x00\x00\x00')
|
|
|
+ self.assertTrue(message_has_tsig(self.conn.query_data[2:]))
|
|
|
|
|
|
def test_response_with_invalid_msg(self):
|
|
|
self.conn.reply_data = b'aaaxxxx'
|