|
@@ -35,6 +35,8 @@ TEST_MASTER_IPV6_ADDRINFO = (socket.AF_INET6, socket.SOCK_STREAM,
|
|
# If some other process uses this port test will fail.
|
|
# If some other process uses this port test will fail.
|
|
TEST_MASTER_PORT = '53535'
|
|
TEST_MASTER_PORT = '53535'
|
|
|
|
|
|
|
|
+TSIG_KEY = TSIGKey("example.com:SFuWd/q99SzF8Yzd1QbB9g==")
|
|
|
|
+
|
|
soa_rdata = Rdata(RRType.SOA(), TEST_RRCLASS,
|
|
soa_rdata = Rdata(RRType.SOA(), TEST_RRCLASS,
|
|
'master.example.com. admin.example.com ' +
|
|
'master.example.com. admin.example.com ' +
|
|
'1234 3600 1800 2419200 7200')
|
|
'1234 3600 1800 2419200 7200')
|
|
@@ -223,8 +225,7 @@ class TestXfrinConnection(unittest.TestCase):
|
|
self.assertEqual(self.conn.query_data, b'\x00\x1d\x105\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\x03com\x00\x00\xfc\x00\x01')
|
|
self.assertEqual(self.conn.query_data, b'\x00\x1d\x105\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\x03com\x00\x00\xfc\x00\x01')
|
|
|
|
|
|
# soa request with tsig
|
|
# soa request with tsig
|
|
- tsig_key = TSIGKey("example.com:SFuWd/q99SzF8Yzd1QbB9g==")
|
|
|
|
- self.conn._tsig_ctx = TSIGContext(tsig_key)
|
|
|
|
|
|
+ self.conn._tsig_ctx = TSIGContext(TSIG_KEY)
|
|
self.conn._send_query(RRType.SOA())
|
|
self.conn._send_query(RRType.SOA())
|
|
tsig_soa_data = strip_mutable_tsig_data(self.conn.query_data)
|
|
tsig_soa_data = strip_mutable_tsig_data(self.conn.query_data)
|
|
self.assertEqual(tsig_soa_data, b'\x00n\x105\x00\x00\x00\x01\x00\x00\x00\x00\x00\x01\x07example\x03com\x00\x00\x06\x00\x01\x07example\x03com\x00\x00\xfa\x00\xff\x00\x00\x00\x00\x00:\x08hmac-md5\x07sig-alg\x03reg\x03int\x00\x01,\x00\x10\x105\x00\x00\x00\x00')
|
|
self.assertEqual(tsig_soa_data, b'\x00n\x105\x00\x00\x00\x01\x00\x00\x00\x00\x00\x01\x07example\x03com\x00\x00\x06\x00\x01\x07example\x03com\x00\x00\xfa\x00\xff\x00\x00\x00\x00\x00:\x08hmac-md5\x07sig-alg\x03reg\x03int\x00\x01,\x00\x10\x105\x00\x00\x00\x00')
|
|
@@ -238,6 +239,13 @@ class TestXfrinConnection(unittest.TestCase):
|
|
self.conn.reply_data = b'aaaxxxx'
|
|
self.conn.reply_data = b'aaaxxxx'
|
|
self.assertRaises(XfrinTestException, self._handle_xfrin_response)
|
|
self.assertRaises(XfrinTestException, self._handle_xfrin_response)
|
|
|
|
|
|
|
|
+ def test_response_with_tsig(self):
|
|
|
|
+ self.conn._tsig_ctx = TSIGContext(TSIG_KEY)
|
|
|
|
+ # server tsig check fail, return with RCODE 9 (NOTAUTH)
|
|
|
|
+ self.conn._send_query(RRType.SOA())
|
|
|
|
+ self.conn.reply_data = self.conn.create_response_data(rcode=Rcode.NOTAUTH())
|
|
|
|
+ self.assertRaises(XfrinException, self._handle_xfrin_response)
|
|
|
|
+
|
|
def test_response_without_end_soa(self):
|
|
def test_response_without_end_soa(self):
|
|
self.conn._send_query(RRType.AXFR())
|
|
self.conn._send_query(RRType.AXFR())
|
|
self.conn.reply_data = self.conn.create_response_data()
|
|
self.conn.reply_data = self.conn.create_response_data()
|