|
@@ -1,4 +1,4 @@
|
|
|
-# Copyright (C) 2009 Internet Systems Consortium.
|
|
|
+# Copyright (C) 2009-2011 Internet Systems Consortium.
|
|
|
#
|
|
|
# Permission to use, copy, modify, and distribute this software for any
|
|
|
# purpose with or without fee is hereby granted, provided that the above
|
|
@@ -15,13 +15,16 @@
|
|
|
|
|
|
import unittest
|
|
|
import socket
|
|
|
+from isc.testutils.tsigctx_mock import MockTSIGContext
|
|
|
from xfrin import *
|
|
|
|
|
|
#
|
|
|
# Commonly used (mostly constant) test parameters
|
|
|
#
|
|
|
-TEST_ZONE_NAME = "example.com"
|
|
|
+TEST_ZONE_NAME_STR = "example.com."
|
|
|
+TEST_ZONE_NAME = Name(TEST_ZONE_NAME_STR)
|
|
|
TEST_RRCLASS = RRClass.IN()
|
|
|
+TEST_RRCLASS_STR = 'IN'
|
|
|
TEST_DB_FILE = 'db_file'
|
|
|
TEST_MASTER_IPV4_ADDRESS = '127.0.0.1'
|
|
|
TEST_MASTER_IPV4_ADDRINFO = (socket.AF_INET, socket.SOCK_STREAM,
|
|
@@ -40,12 +43,12 @@ TSIG_KEY = TSIGKey("example.com:SFuWd/q99SzF8Yzd1QbB9g==")
|
|
|
soa_rdata = Rdata(RRType.SOA(), TEST_RRCLASS,
|
|
|
'master.example.com. admin.example.com ' +
|
|
|
'1234 3600 1800 2419200 7200')
|
|
|
-soa_rrset = RRset(Name(TEST_ZONE_NAME), TEST_RRCLASS, RRType.SOA(),
|
|
|
+soa_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA(),
|
|
|
RRTTL(3600))
|
|
|
soa_rrset.add_rdata(soa_rdata)
|
|
|
-example_axfr_question = Question(Name(TEST_ZONE_NAME), TEST_RRCLASS,
|
|
|
+example_axfr_question = Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
RRType.AXFR())
|
|
|
-example_soa_question = Question(Name(TEST_ZONE_NAME), TEST_RRCLASS,
|
|
|
+example_soa_question = Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
RRType.SOA())
|
|
|
default_questions = [example_axfr_question]
|
|
|
default_answers = [soa_rrset]
|
|
@@ -53,12 +56,12 @@ default_answers = [soa_rrset]
|
|
|
class XfrinTestException(Exception):
|
|
|
pass
|
|
|
|
|
|
-def strip_mutable_tsig_data(data):
|
|
|
- # Unfortunately we cannot easily compare TSIG RR because we can't tweak
|
|
|
- # current time. As a work around this helper function strips off the time
|
|
|
- # dependent part of TSIG RDATA, i.e., the MAC (assuming HMAC-MD5) and
|
|
|
- # Time Signed.
|
|
|
- return data[0:-32] + data[-26:-22] + data[-6:]
|
|
|
+class MockCC():
|
|
|
+ def get_default_value(self, identifier):
|
|
|
+ if identifier == "zones/master_port":
|
|
|
+ return TEST_MASTER_PORT
|
|
|
+ if identifier == "zones/class":
|
|
|
+ return TEST_RRCLASS_STR
|
|
|
|
|
|
class MockXfrin(Xfrin):
|
|
|
# This is a class attribute of a callable object that specifies a non
|
|
@@ -69,7 +72,8 @@ class MockXfrin(Xfrin):
|
|
|
check_command_hook = None
|
|
|
|
|
|
def _cc_setup(self):
|
|
|
- self._tsig_key_str = None
|
|
|
+ self._tsig_key = None
|
|
|
+ self._module_cc = MockCC()
|
|
|
pass
|
|
|
|
|
|
def _get_db_file(self):
|
|
@@ -80,6 +84,16 @@ class MockXfrin(Xfrin):
|
|
|
if MockXfrin.check_command_hook:
|
|
|
MockXfrin.check_command_hook()
|
|
|
|
|
|
+ def xfrin_start(self, zone_name, rrclass, db_file, master_addrinfo,
|
|
|
+ tsig_key, check_soa=True):
|
|
|
+ # store some of the arguments for verification, then call this
|
|
|
+ # method in the superclass
|
|
|
+ self.xfrin_started_master_addr = master_addrinfo[2][0]
|
|
|
+ self.xfrin_started_master_port = master_addrinfo[2][1]
|
|
|
+ return Xfrin.xfrin_start(self, zone_name, rrclass, db_file,
|
|
|
+ master_addrinfo, tsig_key,
|
|
|
+ check_soa)
|
|
|
+
|
|
|
class MockXfrinConnection(XfrinConnection):
|
|
|
def __init__(self, sock_map, zone_name, rrclass, db_file, shutdown_event,
|
|
|
master_addr):
|
|
@@ -131,10 +145,11 @@ class MockXfrinConnection(XfrinConnection):
|
|
|
self.response_generator()
|
|
|
return len(data)
|
|
|
|
|
|
- def create_response_data(self, response = True, bad_qid = False,
|
|
|
- rcode = Rcode.NOERROR(),
|
|
|
- questions = default_questions,
|
|
|
- answers = default_answers):
|
|
|
+ def create_response_data(self, response=True, bad_qid=False,
|
|
|
+ rcode=Rcode.NOERROR(),
|
|
|
+ questions=default_questions,
|
|
|
+ answers=default_answers,
|
|
|
+ tsig_ctx=None):
|
|
|
resp = Message(Message.RENDER)
|
|
|
qid = self.qid
|
|
|
if bad_qid:
|
|
@@ -148,7 +163,10 @@ class MockXfrinConnection(XfrinConnection):
|
|
|
[resp.add_rrset(Message.SECTION_ANSWER, a) for a in answers]
|
|
|
|
|
|
renderer = MessageRenderer()
|
|
|
- resp.to_wire(renderer)
|
|
|
+ if tsig_ctx is not None:
|
|
|
+ resp.to_wire(renderer, tsig_ctx)
|
|
|
+ else:
|
|
|
+ resp.to_wire(renderer)
|
|
|
reply_data = struct.pack('H', socket.htons(renderer.get_length()))
|
|
|
reply_data += renderer.get_data()
|
|
|
|
|
@@ -163,20 +181,32 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
TEST_RRCLASS, TEST_DB_FILE,
|
|
|
threading.Event(),
|
|
|
TEST_MASTER_IPV4_ADDRINFO)
|
|
|
- self.axfr_after_soa = False
|
|
|
self.soa_response_params = {
|
|
|
'questions': [example_soa_question],
|
|
|
'bad_qid': False,
|
|
|
'response': True,
|
|
|
'rcode': Rcode.NOERROR(),
|
|
|
+ 'tsig': False,
|
|
|
'axfr_after_soa': self._create_normal_response_data
|
|
|
}
|
|
|
+ self.axfr_response_params = {
|
|
|
+ 'tsig_1st': None,
|
|
|
+ 'tsig_2nd': None
|
|
|
+ }
|
|
|
|
|
|
def tearDown(self):
|
|
|
self.conn.close()
|
|
|
if os.path.exists(TEST_DB_FILE):
|
|
|
os.remove(TEST_DB_FILE)
|
|
|
|
|
|
+ def __create_mock_tsig(self, key, error):
|
|
|
+ # This helper function creates a MockTSIGContext for a given key
|
|
|
+ # and TSIG error to be used as a result of verify (normally faked
|
|
|
+ # one)
|
|
|
+ mock_ctx = MockTSIGContext(key)
|
|
|
+ mock_ctx.error = error
|
|
|
+ return mock_ctx
|
|
|
+
|
|
|
def test_close(self):
|
|
|
# we shouldn't be using the global asyncore map.
|
|
|
self.assertEqual(len(asyncore.socket_map), 0)
|
|
@@ -216,6 +246,15 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
query_question = Question(Name("example.com."), RRClass.IN(), query_type)
|
|
|
msg.add_question(query_question)
|
|
|
return msg
|
|
|
+
|
|
|
+ def message_has_tsig(data):
|
|
|
+ # a simple check if the actual data contains a TSIG RR.
|
|
|
+ # At our level this simple check should suffice; other detailed
|
|
|
+ # tests regarding the TSIG protocol are done in pydnspp.
|
|
|
+ msg = Message(Message.PARSE)
|
|
|
+ msg.from_wire(data)
|
|
|
+ return msg.get_tsig_record() is not None
|
|
|
+
|
|
|
self.conn._create_query = create_msg
|
|
|
# soa request
|
|
|
self.conn._send_query(RRType.SOA())
|
|
@@ -225,22 +264,20 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
self.assertEqual(self.conn.query_data, b'\x00\x1d\x105\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\x03com\x00\x00\xfc\x00\x01')
|
|
|
|
|
|
# soa request with tsig
|
|
|
- self.conn._tsig_ctx = TSIGContext(TSIG_KEY)
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
self.conn._send_query(RRType.SOA())
|
|
|
- tsig_soa_data = strip_mutable_tsig_data(self.conn.query_data)
|
|
|
- self.assertEqual(tsig_soa_data, b'\x00n\x105\x00\x00\x00\x01\x00\x00\x00\x00\x00\x01\x07example\x03com\x00\x00\x06\x00\x01\x07example\x03com\x00\x00\xfa\x00\xff\x00\x00\x00\x00\x00:\x08hmac-md5\x07sig-alg\x03reg\x03int\x00\x01,\x00\x10\x105\x00\x00\x00\x00')
|
|
|
+ self.assertTrue(message_has_tsig(self.conn.query_data[2:]))
|
|
|
|
|
|
# axfr request with tsig
|
|
|
self.conn._send_query(RRType.AXFR())
|
|
|
- tsig_axfr_data = strip_mutable_tsig_data(self.conn.query_data)
|
|
|
- self.assertEqual(tsig_axfr_data, b'\x00n\x105\x00\x00\x00\x01\x00\x00\x00\x00\x00\x01\x07example\x03com\x00\x00\xfc\x00\x01\x07example\x03com\x00\x00\xfa\x00\xff\x00\x00\x00\x00\x00:\x08hmac-md5\x07sig-alg\x03reg\x03int\x00\x01,\x00\x10\x105\x00\x00\x00\x00')
|
|
|
+ self.assertTrue(message_has_tsig(self.conn.query_data[2:]))
|
|
|
|
|
|
def test_response_with_invalid_msg(self):
|
|
|
self.conn.reply_data = b'aaaxxxx'
|
|
|
self.assertRaises(XfrinTestException, self._handle_xfrin_response)
|
|
|
|
|
|
- def test_response_with_tsig(self):
|
|
|
- self.conn._tsig_ctx = TSIGContext(TSIG_KEY)
|
|
|
+ def test_response_with_tsigfail(self):
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
# server tsig check fail, return with RCODE 9 (NOTAUTH)
|
|
|
self.conn._send_query(RRType.SOA())
|
|
|
self.conn.reply_data = self.conn.create_response_data(rcode=Rcode.NOTAUTH())
|
|
@@ -310,6 +347,54 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
|
|
|
+ def test_soacheck_with_tsig(self):
|
|
|
+ # Use a mock tsig context emulating a validly signed response
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, TSIGError.NOERROR)
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.assertEqual(self.conn._check_soa_serial(), XFRIN_OK)
|
|
|
+ self.assertEqual(self.conn._tsig_ctx.get_error(), TSIGError.NOERROR)
|
|
|
+
|
|
|
+ def test_soacheck_with_tsig_notauth(self):
|
|
|
+ # emulate a valid error response
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, TSIGError.BAD_SIG)
|
|
|
+ self.soa_response_params['rcode'] = Rcode.NOTAUTH()
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+
|
|
|
+ self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
+
|
|
|
+ def test_soacheck_with_tsig_noerror_badsig(self):
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, TSIGError.BAD_SIG)
|
|
|
+
|
|
|
+ # emulate a normal response bad verification failure due to BADSIG.
|
|
|
+ # According RFC2845, in this case we should ignore it and keep
|
|
|
+ # waiting for a valid response until a timeout. But we immediately
|
|
|
+ # treat this as a final failure (just as BIND 9 does).
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+
|
|
|
+ self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
+
|
|
|
+ def test_soacheck_with_tsig_unsigned_response(self):
|
|
|
+ # we can use a real TSIGContext for this. the response doesn't
|
|
|
+ # contain a TSIG while we sent a signed query. RFC2845 states
|
|
|
+ # we should wait for a valid response in this case, but we treat
|
|
|
+ # it as a fatal transaction failure, too.
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
+
|
|
|
+ def test_soacheck_with_unexpected_tsig_response(self):
|
|
|
+ # we reject unexpected TSIG in responses (following BIND 9's
|
|
|
+ # behavior)
|
|
|
+ self.soa_response_params['tsig'] = True
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
+
|
|
|
def test_response_shutdown(self):
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
self.conn._shutdown_event.set()
|
|
@@ -343,6 +428,88 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
|
|
|
|
|
|
+ def test_do_xfrin_with_tsig(self):
|
|
|
+ # use TSIG with a mock context. we fake all verify results to
|
|
|
+ # emulate successful verification.
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, TSIGError.NOERROR)
|
|
|
+ self.conn.response_generator = self._create_normal_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
|
|
|
+ # We use two messages in the tests. The same context should have been
|
|
|
+ # usef for both.
|
|
|
+ self.assertEqual(2, self.conn._tsig_ctx.verify_called)
|
|
|
+
|
|
|
+ def test_do_xfrin_with_tsig_fail(self):
|
|
|
+ # TSIG verify will fail for the first message. xfrin should fail
|
|
|
+ # immediately.
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, TSIGError.BAD_SIG)
|
|
|
+ self.conn.response_generator = self._create_normal_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
+ self.assertEqual(1, self.conn._tsig_ctx.verify_called)
|
|
|
+
|
|
|
+ def test_do_xfrin_with_tsig_fail_for_second_message(self):
|
|
|
+ # Similar to the previous test, but first verify succeeds. There
|
|
|
+ # should be a second verify attempt, which will fail, which should
|
|
|
+ # make xfrin fail.
|
|
|
+ def fake_tsig_error(ctx):
|
|
|
+ if self.conn._tsig_ctx.verify_called == 1:
|
|
|
+ return TSIGError.NOERROR
|
|
|
+ return TSIGError.BAD_SIG
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, fake_tsig_error)
|
|
|
+ self.conn.response_generator = self._create_normal_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
+ self.assertEqual(2, self.conn._tsig_ctx.verify_called)
|
|
|
+
|
|
|
+ def test_do_xfrin_with_missing_tsig(self):
|
|
|
+ # XFR request sent with TSIG, but the response doesn't have TSIG.
|
|
|
+ # xfr should fail.
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, None)
|
|
|
+ self.conn._tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
+ self.conn.response_generator = self._create_normal_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
+ self.assertEqual(1, self.conn._tsig_ctx.verify_called)
|
|
|
+
|
|
|
+ def test_do_xfrin_with_missing_tsig_for_second_message(self):
|
|
|
+ # Similar to the previous test, but firt one contains TSIG and verify
|
|
|
+ # succeeds (due to fake). The second message lacks TSIG.
|
|
|
+ #
|
|
|
+ # Note: this test case is actually not that trivial: Skipping
|
|
|
+ # intermediate TSIG is allowed. In this case, however, the second
|
|
|
+ # message is the last one, which must contain TSIG anyway, so the
|
|
|
+ # expected result is correct. If/when we support skipping
|
|
|
+ # intermediate TSIGs, we'll need additional test cases.
|
|
|
+ def fake_tsig_error(ctx):
|
|
|
+ if self.conn._tsig_ctx.verify_called == 1:
|
|
|
+ return TSIGError.NOERROR
|
|
|
+ return TSIGError.FORMERR
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, fake_tsig_error)
|
|
|
+ self.conn.response_generator = self._create_normal_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
+ self.assertEqual(2, self.conn._tsig_ctx.verify_called)
|
|
|
+
|
|
|
+ def test_do_xfrin_with_unexpected_tsig(self):
|
|
|
+ # XFR request wasn't signed, but response includes TSIG. Like BIND 9,
|
|
|
+ # we reject that.
|
|
|
+ self.axfr_response_params['tsig_1st'] = TSIGContext(TSIG_KEY)
|
|
|
+ self.conn.response_generator = self._create_normal_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
+
|
|
|
+ def test_do_xfrin_with_unexpected_tsig_for_second_message(self):
|
|
|
+ # similar to the previous test, but the first message is normal.
|
|
|
+ # the second one contains an unexpected TSIG. should be rejected.
|
|
|
+ self.axfr_response_params['tsig_2nd'] = TSIGContext(TSIG_KEY)
|
|
|
+ self.conn.response_generator = self._create_normal_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
+
|
|
|
def test_do_xfrin_empty_response(self):
|
|
|
# skipping the creation of response data, so the transfer will fail.
|
|
|
self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
@@ -361,6 +528,23 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
self.assertEqual(self.conn.do_xfrin(True), XFRIN_OK)
|
|
|
|
|
|
+ def test_do_soacheck_and_xfrin_with_tsig(self):
|
|
|
+ # We are going to have a SOA query/response transaction, followed by
|
|
|
+ # AXFR, all TSIG signed. xfrin should use a new TSIG context for
|
|
|
+ # AXFR. We are not interested in whether verify works correctly in
|
|
|
+ # this test, so we simply fake the results (they need to succeed for
|
|
|
+ # this test)
|
|
|
+ self.conn._tsig_key = TSIG_KEY
|
|
|
+ self.conn._tsig_ctx_creator = \
|
|
|
+ lambda key: self.__create_mock_tsig(key, TSIGError.NOERROR)
|
|
|
+ self.soa_response_params['tsig'] = True
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(True), XFRIN_OK)
|
|
|
+ # We should've got 3 response messages: 1 SOA and two AXFR, but
|
|
|
+ # the context should be replaced for AXFR, so verify() should be
|
|
|
+ # called only twice for the latest context.
|
|
|
+ self.assertEqual(2, self.conn._tsig_ctx.verify_called)
|
|
|
+
|
|
|
def test_do_soacheck_broken_response(self):
|
|
|
self.conn.response_generator = self._create_broken_response_data
|
|
|
# XXX: TODO: this test failed here, should xfr not raise an
|
|
@@ -388,21 +572,39 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
# This helper method creates a simple sequence of DNS messages that
|
|
|
# forms a valid XFR transaction. It consists of two messages, each
|
|
|
# containing just a single SOA RR.
|
|
|
- self.conn.reply_data = self.conn.create_response_data()
|
|
|
- self.conn.reply_data += self.conn.create_response_data()
|
|
|
+ tsig_1st = self.axfr_response_params['tsig_1st']
|
|
|
+ tsig_2nd = self.axfr_response_params['tsig_2nd']
|
|
|
+ self.conn.reply_data = self.conn.create_response_data(tsig_ctx=tsig_1st)
|
|
|
+ self.conn.reply_data += \
|
|
|
+ self.conn.create_response_data(tsig_ctx=tsig_2nd)
|
|
|
|
|
|
def _create_soa_response_data(self):
|
|
|
# This helper method creates a DNS message that is supposed to be
|
|
|
# used a valid response to SOA queries prior to XFR.
|
|
|
+ # If tsig is True, it tries to verify the query with a locally
|
|
|
+ # created TSIG context (which may or may not succeed) so that the
|
|
|
+ # response will include a TSIG.
|
|
|
# If axfr_after_soa is True, it resets the response_generator so that
|
|
|
# a valid XFR messages will follow.
|
|
|
+
|
|
|
+ verify_ctx = None
|
|
|
+ if self.soa_response_params['tsig']:
|
|
|
+ # xfrin (curreently) always uses TCP. strip off the length field.
|
|
|
+ query_data = self.conn.query_data[2:]
|
|
|
+ query_message = Message(Message.PARSE)
|
|
|
+ query_message.from_wire(query_data)
|
|
|
+ verify_ctx = TSIGContext(TSIG_KEY)
|
|
|
+ verify_ctx.verify(query_message.get_tsig_record(), query_data)
|
|
|
+
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
bad_qid=self.soa_response_params['bad_qid'],
|
|
|
response=self.soa_response_params['response'],
|
|
|
rcode=self.soa_response_params['rcode'],
|
|
|
- questions=self.soa_response_params['questions'])
|
|
|
+ questions=self.soa_response_params['questions'],
|
|
|
+ tsig_ctx=verify_ctx)
|
|
|
if self.soa_response_params['axfr_after_soa'] != None:
|
|
|
- self.conn.response_generator = self.soa_response_params['axfr_after_soa']
|
|
|
+ self.conn.response_generator = \
|
|
|
+ self.soa_response_params['axfr_after_soa']
|
|
|
|
|
|
def _create_broken_response_data(self):
|
|
|
# This helper method creates a bogus "DNS message" that only contains
|
|
@@ -450,7 +652,8 @@ class TestXfrin(unittest.TestCase):
|
|
|
sys.stderr = open(os.devnull, 'w')
|
|
|
self.xfr = MockXfrin()
|
|
|
self.args = {}
|
|
|
- self.args['zone_name'] = TEST_ZONE_NAME
|
|
|
+ self.args['zone_name'] = TEST_ZONE_NAME_STR
|
|
|
+ self.args['class'] = TEST_RRCLASS_STR
|
|
|
self.args['port'] = TEST_MASTER_PORT
|
|
|
self.args['master'] = TEST_MASTER_IPV4_ADDRESS
|
|
|
self.args['db_file'] = TEST_DB_FILE
|
|
@@ -464,7 +667,8 @@ class TestXfrin(unittest.TestCase):
|
|
|
return self.xfr._parse_zone_name_and_class(self.args)
|
|
|
|
|
|
def _do_parse_master_port(self):
|
|
|
- return self.xfr._parse_master_and_port(self.args)
|
|
|
+ name, rrclass = self._do_parse_zone_name_class()
|
|
|
+ return self.xfr._parse_master_and_port(self.args, name, rrclass)
|
|
|
|
|
|
def test_parse_cmd_params(self):
|
|
|
name, rrclass = self._do_parse_zone_name_class()
|
|
@@ -492,7 +696,7 @@ class TestXfrin(unittest.TestCase):
|
|
|
|
|
|
def test_parse_cmd_params_bogusclass(self):
|
|
|
self.args['zone_class'] = 'XXX'
|
|
|
- self.assertRaises(XfrinException, self._do_parse_zone_name_class)
|
|
|
+ self.assertRaises(XfrinZoneInfoException, self._do_parse_zone_name_class)
|
|
|
|
|
|
def test_parse_cmd_params_nozone(self):
|
|
|
# zone name is mandatory.
|
|
@@ -502,8 +706,7 @@ class TestXfrin(unittest.TestCase):
|
|
|
def test_parse_cmd_params_nomaster(self):
|
|
|
# master address is mandatory.
|
|
|
del self.args['master']
|
|
|
- master_addrinfo = self._do_parse_master_port()
|
|
|
- self.assertEqual(master_addrinfo[2][0], DEFAULT_MASTER)
|
|
|
+ self.assertRaises(XfrinException, self._do_parse_master_port)
|
|
|
|
|
|
def test_parse_cmd_params_bad_ip4(self):
|
|
|
self.args['master'] = '3.3.3.3.3'
|
|
@@ -533,6 +736,77 @@ class TestXfrin(unittest.TestCase):
|
|
|
def test_command_handler_retransfer(self):
|
|
|
self.assertEqual(self.xfr.command_handler("retransfer",
|
|
|
self.args)['result'][0], 0)
|
|
|
+ self.assertEqual(self.args['master'], self.xfr.xfrin_started_master_addr)
|
|
|
+ self.assertEqual(int(self.args['port']), self.xfr.xfrin_started_master_port)
|
|
|
+
|
|
|
+ def test_command_handler_retransfer_short_command1(self):
|
|
|
+ # try it when only specifying the zone name (of unknown zone)
|
|
|
+ # this should fail because master address is not specified.
|
|
|
+ short_args = {}
|
|
|
+ short_args['zone_name'] = TEST_ZONE_NAME_STR
|
|
|
+ self.assertEqual(self.xfr.command_handler("retransfer",
|
|
|
+ short_args)['result'][0], 1)
|
|
|
+
|
|
|
+ def test_command_handler_retransfer_short_command2(self):
|
|
|
+ # try it when only specifying the zone name (of known zone)
|
|
|
+ short_args = {}
|
|
|
+ short_args['zone_name'] = TEST_ZONE_NAME_STR
|
|
|
+
|
|
|
+ zones = { 'zones': [
|
|
|
+ { 'name': TEST_ZONE_NAME_STR,
|
|
|
+ 'master_addr': TEST_MASTER_IPV4_ADDRESS,
|
|
|
+ 'master_port': TEST_MASTER_PORT
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.xfr.config_handler(zones)
|
|
|
+ self.assertEqual(self.xfr.command_handler("retransfer",
|
|
|
+ short_args)['result'][0], 0)
|
|
|
+ self.assertEqual(TEST_MASTER_IPV4_ADDRESS,
|
|
|
+ self.xfr.xfrin_started_master_addr)
|
|
|
+ self.assertEqual(int(TEST_MASTER_PORT),
|
|
|
+ self.xfr.xfrin_started_master_port)
|
|
|
+
|
|
|
+ def test_command_handler_retransfer_short_command3(self):
|
|
|
+ # try it when only specifying the zone name (of known zone)
|
|
|
+ short_args = {}
|
|
|
+ # test it without the trailing root dot
|
|
|
+ short_args['zone_name'] = TEST_ZONE_NAME_STR[:-1]
|
|
|
+
|
|
|
+ zones = { 'zones': [
|
|
|
+ { 'name': TEST_ZONE_NAME_STR,
|
|
|
+ 'master_addr': TEST_MASTER_IPV4_ADDRESS,
|
|
|
+ 'master_port': TEST_MASTER_PORT
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.xfr.config_handler(zones)
|
|
|
+ self.assertEqual(self.xfr.command_handler("retransfer",
|
|
|
+ short_args)['result'][0], 0)
|
|
|
+ self.assertEqual(TEST_MASTER_IPV4_ADDRESS,
|
|
|
+ self.xfr.xfrin_started_master_addr)
|
|
|
+ self.assertEqual(int(TEST_MASTER_PORT),
|
|
|
+ self.xfr.xfrin_started_master_port)
|
|
|
+
|
|
|
+ def test_command_handler_retransfer_short_command4(self):
|
|
|
+ # try it when only specifying the zone name (of known zone, with
|
|
|
+ # different case)
|
|
|
+ short_args = {}
|
|
|
+
|
|
|
+ # swap the case of the zone name in our command
|
|
|
+ short_args['zone_name'] = TEST_ZONE_NAME_STR.swapcase()
|
|
|
+
|
|
|
+ zones = { 'zones': [
|
|
|
+ { 'name': TEST_ZONE_NAME_STR,
|
|
|
+ 'master_addr': TEST_MASTER_IPV4_ADDRESS,
|
|
|
+ 'master_port': TEST_MASTER_PORT
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.xfr.config_handler(zones)
|
|
|
+ self.assertEqual(self.xfr.command_handler("retransfer",
|
|
|
+ short_args)['result'][0], 0)
|
|
|
+ self.assertEqual(TEST_MASTER_IPV4_ADDRESS,
|
|
|
+ self.xfr.xfrin_started_master_addr)
|
|
|
+ self.assertEqual(int(TEST_MASTER_PORT),
|
|
|
+ self.xfr.xfrin_started_master_port)
|
|
|
|
|
|
def test_command_handler_retransfer_badcommand(self):
|
|
|
self.args['master'] = 'invalid'
|
|
@@ -540,13 +814,15 @@ class TestXfrin(unittest.TestCase):
|
|
|
self.args)['result'][0], 1)
|
|
|
|
|
|
def test_command_handler_retransfer_quota(self):
|
|
|
+ self.args['master'] = TEST_MASTER_IPV4_ADDRESS
|
|
|
+
|
|
|
for i in range(self.xfr._max_transfers_in - 1):
|
|
|
- self.xfr.recorder.increment(str(i) + TEST_ZONE_NAME)
|
|
|
+ self.xfr.recorder.increment(Name(str(i) + TEST_ZONE_NAME_STR))
|
|
|
# there can be one more outstanding transfer.
|
|
|
self.assertEqual(self.xfr.command_handler("retransfer",
|
|
|
self.args)['result'][0], 0)
|
|
|
# make sure the # xfrs would excceed the quota
|
|
|
- self.xfr.recorder.increment(str(self.xfr._max_transfers_in) + TEST_ZONE_NAME)
|
|
|
+ self.xfr.recorder.increment(Name(str(self.xfr._max_transfers_in) + TEST_ZONE_NAME_STR))
|
|
|
# this one should fail
|
|
|
self.assertEqual(self.xfr.command_handler("retransfer",
|
|
|
self.args)['result'][0], 1)
|
|
@@ -570,14 +846,43 @@ class TestXfrin(unittest.TestCase):
|
|
|
self.args['master'] = TEST_MASTER_IPV6_ADDRESS
|
|
|
self.assertEqual(self.xfr.command_handler("refresh",
|
|
|
self.args)['result'][0], 0)
|
|
|
+ self.assertEqual(TEST_MASTER_IPV6_ADDRESS,
|
|
|
+ self.xfr.xfrin_started_master_addr)
|
|
|
+ self.assertEqual(int(TEST_MASTER_PORT),
|
|
|
+ self.xfr.xfrin_started_master_port)
|
|
|
|
|
|
def test_command_handler_notify(self):
|
|
|
# at this level, refresh is no different than retransfer.
|
|
|
self.args['master'] = TEST_MASTER_IPV6_ADDRESS
|
|
|
- # ...but right now we disable the feature due to security concerns.
|
|
|
+ # ...but the zone is unknown so this would return an error
|
|
|
+ self.assertEqual(self.xfr.command_handler("notify",
|
|
|
+ self.args)['result'][0], 1)
|
|
|
+
|
|
|
+ def test_command_handler_notify_known_zone(self):
|
|
|
+ # try it with a known zone
|
|
|
+ self.args['master'] = TEST_MASTER_IPV6_ADDRESS
|
|
|
+
|
|
|
+ # but use a different address in the actual command
|
|
|
+ zones = { 'zones': [
|
|
|
+ { 'name': TEST_ZONE_NAME_STR,
|
|
|
+ 'master_addr': TEST_MASTER_IPV4_ADDRESS,
|
|
|
+ 'master_port': TEST_MASTER_PORT
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.xfr.config_handler(zones)
|
|
|
self.assertEqual(self.xfr.command_handler("notify",
|
|
|
self.args)['result'][0], 0)
|
|
|
|
|
|
+ # and see if we used the address from the command, and not from
|
|
|
+ # the config
|
|
|
+ # This is actually NOT the address given in the command, which
|
|
|
+ # would at this point not make sense, see the TODO in
|
|
|
+ # xfrin.py.in Xfrin.command_handler())
|
|
|
+ self.assertEqual(TEST_MASTER_IPV4_ADDRESS,
|
|
|
+ self.xfr.xfrin_started_master_addr)
|
|
|
+ self.assertEqual(int(TEST_MASTER_PORT),
|
|
|
+ self.xfr.xfrin_started_master_port)
|
|
|
+
|
|
|
def test_command_handler_unknown(self):
|
|
|
self.assertEqual(self.xfr.command_handler("xxx", None)['result'][0], 1)
|
|
|
|
|
@@ -586,20 +891,145 @@ class TestXfrin(unittest.TestCase):
|
|
|
self.assertEqual(self.xfr.config_handler({'transfers_in': 3})['result'][0], 0)
|
|
|
self.assertEqual(self.xfr._max_transfers_in, 3)
|
|
|
|
|
|
- def test_command_handler_masters(self):
|
|
|
- master_info = {'master_addr': '1.1.1.1', 'master_port':53}
|
|
|
- self.assertEqual(self.xfr.config_handler(master_info)['result'][0], 0)
|
|
|
-
|
|
|
- master_info = {'master_addr': '1111.1.1.1', 'master_port':53 }
|
|
|
- self.assertEqual(self.xfr.config_handler(master_info)['result'][0], 1)
|
|
|
-
|
|
|
- master_info = {'master_addr': '2.2.2.2', 'master_port':530000 }
|
|
|
- self.assertEqual(self.xfr.config_handler(master_info)['result'][0], 1)
|
|
|
-
|
|
|
- master_info = {'master_addr': '2.2.2.2', 'master_port':53 }
|
|
|
- self.xfr.config_handler(master_info)
|
|
|
- self.assertEqual(self.xfr._master_addr, '2.2.2.2')
|
|
|
- self.assertEqual(self.xfr._master_port, 53)
|
|
|
+ def _check_zones_config(self, config_given):
|
|
|
+ if 'transfers_in' in config_given:
|
|
|
+ self.assertEqual(config_given['transfers_in'],
|
|
|
+ self.xfr._max_transfers_in)
|
|
|
+ for zone_config in config_given['zones']:
|
|
|
+ zone_name = zone_config['name']
|
|
|
+ zone_info = self.xfr._get_zone_info(Name(zone_name), RRClass.IN())
|
|
|
+ self.assertEqual(str(zone_info.master_addr), zone_config['master_addr'])
|
|
|
+ self.assertEqual(zone_info.master_port, zone_config['master_port'])
|
|
|
+ if 'tsig_key' in zone_config:
|
|
|
+ self.assertEqual(zone_info.tsig_key.to_text(), TSIGKey(zone_config['tsig_key']).to_text())
|
|
|
+ else:
|
|
|
+ self.assertIsNone(zone_info.tsig_key)
|
|
|
+
|
|
|
+ def test_command_handler_zones(self):
|
|
|
+ config1 = { 'transfers_in': 3,
|
|
|
+ 'zones': [
|
|
|
+ { 'name': 'test.example.',
|
|
|
+ 'master_addr': '192.0.2.1',
|
|
|
+ 'master_port': 53
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.assertEqual(self.xfr.config_handler(config1)['result'][0], 0)
|
|
|
+ self._check_zones_config(config1)
|
|
|
+
|
|
|
+ config2 = { 'transfers_in': 4,
|
|
|
+ 'zones': [
|
|
|
+ { 'name': 'test.example.',
|
|
|
+ 'master_addr': '192.0.2.2',
|
|
|
+ 'master_port': 53,
|
|
|
+ 'tsig_key': "example.com:SFuWd/q99SzF8Yzd1QbB9g=="
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.assertEqual(self.xfr.config_handler(config2)['result'][0], 0)
|
|
|
+ self._check_zones_config(config2)
|
|
|
+
|
|
|
+ # test that configuring the zone multiple times fails
|
|
|
+ zones = { 'transfers_in': 5,
|
|
|
+ 'zones': [
|
|
|
+ { 'name': 'test.example.',
|
|
|
+ 'master_addr': '192.0.2.1',
|
|
|
+ 'master_port': 53
|
|
|
+ },
|
|
|
+ { 'name': 'test.example.',
|
|
|
+ 'master_addr': '192.0.2.2',
|
|
|
+ 'master_port': 53
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.assertEqual(self.xfr.config_handler(zones)['result'][0], 1)
|
|
|
+ # since this has failed, we should still have the previous config
|
|
|
+ self._check_zones_config(config2)
|
|
|
+
|
|
|
+ zones = { 'zones': [
|
|
|
+ { 'name': 'test.example.',
|
|
|
+ 'master_addr': '192.0.2.3',
|
|
|
+ 'master_port': 53,
|
|
|
+ 'class': 'BADCLASS'
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.assertEqual(self.xfr.config_handler(zones)['result'][0], 1)
|
|
|
+ self._check_zones_config(config2)
|
|
|
+
|
|
|
+ zones = { 'zones': [
|
|
|
+ { 'master_addr': '192.0.2.4',
|
|
|
+ 'master_port': 53
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.assertEqual(self.xfr.config_handler(zones)['result'][0], 1)
|
|
|
+ # since this has failed, we should still have the previous config
|
|
|
+ self._check_zones_config(config2)
|
|
|
+
|
|
|
+ zones = { 'zones': [
|
|
|
+ { 'name': 'bad..zone.',
|
|
|
+ 'master_addr': '192.0.2.5',
|
|
|
+ 'master_port': 53
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.assertEqual(self.xfr.config_handler(zones)['result'][0], 1)
|
|
|
+ # since this has failed, we should still have the previous config
|
|
|
+ self._check_zones_config(config2)
|
|
|
+
|
|
|
+ zones = { 'zones': [
|
|
|
+ { 'name': '',
|
|
|
+ 'master_addr': '192.0.2.6',
|
|
|
+ 'master_port': 53
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.assertEqual(self.xfr.config_handler(zones)['result'][0], 1)
|
|
|
+ # since this has failed, we should still have the previous config
|
|
|
+ self._check_zones_config(config2)
|
|
|
+
|
|
|
+ zones = { 'zones': [
|
|
|
+ { 'name': 'test.example',
|
|
|
+ 'master_addr': 'badaddress',
|
|
|
+ 'master_port': 53
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.assertEqual(self.xfr.config_handler(zones)['result'][0], 1)
|
|
|
+ # since this has failed, we should still have the previous config
|
|
|
+ self._check_zones_config(config2)
|
|
|
+
|
|
|
+ zones = { 'zones': [
|
|
|
+ { 'name': 'test.example',
|
|
|
+ 'master_addr': '192.0.2.7',
|
|
|
+ 'master_port': 'bad_port'
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.assertEqual(self.xfr.config_handler(zones)['result'][0], 1)
|
|
|
+ # since this has failed, we should still have the previous config
|
|
|
+ self._check_zones_config(config2)
|
|
|
+
|
|
|
+ zones = { 'zones': [
|
|
|
+ { 'name': 'test.example',
|
|
|
+ 'master_addr': '192.0.2.7',
|
|
|
+ 'master_port': 53,
|
|
|
+ # using a bad TSIG key spec
|
|
|
+ 'tsig_key': "bad..example.com:SFuWd/q99SzF8Yzd1QbB9g=="
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.assertEqual(self.xfr.config_handler(zones)['result'][0], 1)
|
|
|
+ # since this has failed, we should still have the previous config
|
|
|
+ self._check_zones_config(config2)
|
|
|
+
|
|
|
+ # let's also add a zone that is correct too, and make sure
|
|
|
+ # that the new config is not partially taken
|
|
|
+ zones = { 'zones': [
|
|
|
+ { 'name': 'test.example.',
|
|
|
+ 'master_addr': '192.0.2.8',
|
|
|
+ 'master_port': 53
|
|
|
+ },
|
|
|
+ { 'name': 'test2.example.',
|
|
|
+ 'master_addr': '192.0.2.9',
|
|
|
+ 'master_port': 53,
|
|
|
+ 'tsig_key': 'badkey'
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.assertEqual(self.xfr.config_handler(zones)['result'][0], 1)
|
|
|
+ # since this has failed, we should still have the previous config
|
|
|
+ self._check_zones_config(config2)
|
|
|
|
|
|
|
|
|
def raise_interrupt():
|