|
@@ -34,7 +34,16 @@ TEST_QID = 5353 # arbitrary chosen
|
|
|
TEST_RRCLASS = RRClass.IN()
|
|
|
TEST_SERVER6 = ('2001:db8::53', 53, 0, 0)
|
|
|
TEST_CLIENT6 = ('2001:db8::1', 53000, 0, 0)
|
|
|
+TEST_SERVER4 = ('192.0.2.53', 53)
|
|
|
+TEST_CLIENT4 = ('192.0.2.1', 53534)
|
|
|
TEST_ZONE_RECORD = Question(TEST_ZONE_NAME, TEST_RRCLASS, UPDATE_RRTYPE)
|
|
|
+# TSIG key for tests when needed. The key name is TEST_ZONE_NAME.
|
|
|
+TEST_TSIG_KEY = TSIGKey("example.org:SFuWd/q99SzF8Yzd1QbB9g==")
|
|
|
+# TSIG keyring that contanins the test key
|
|
|
+TEST_TSIG_KEYRING = TSIGKeyRing()
|
|
|
+TEST_TSIG_KEYRING.add(TEST_TSIG_KEY)
|
|
|
+# Another TSIG key not in the keyring, making verification fail
|
|
|
+BAD_TSIG_KEY = TSIGKey("example.com:SFuWd/q99SzF8Yzd1QbB9g==")
|
|
|
|
|
|
class FakeSocket:
|
|
|
"""
|
|
@@ -53,6 +62,10 @@ class FakeSocket:
|
|
|
def sendto(self, data, addr):
|
|
|
self._sent_data = data
|
|
|
self._sent_addr = addr
|
|
|
+ def clear(self):
|
|
|
+ '''Clear internal instrumental data.'''
|
|
|
+ self._sent_data = None
|
|
|
+ self._sent_addr = None
|
|
|
|
|
|
class FakeSessionReceiver:
|
|
|
"""
|
|
@@ -68,16 +81,37 @@ class FakeSessionReceiver:
|
|
|
return self._socket
|
|
|
|
|
|
class FakeUpdateSession:
|
|
|
- def __init__(self, msg, client_addr, zone_config):
|
|
|
+ '''A fake update session, emulating isc.ddns.session.UpdateSession.
|
|
|
+
|
|
|
+ It provides the same interfaces as UpdateSession with skipping complicated
|
|
|
+ internal protocol processing and returning given faked results. This
|
|
|
+ will help simplify test setups.
|
|
|
+
|
|
|
+ '''
|
|
|
+ def __init__(self, msg, client_addr, zone_config, faked_result):
|
|
|
+ '''Faked constructor.
|
|
|
+
|
|
|
+ It takes an additional faked_result parameter. It will be used
|
|
|
+ as the result value of handle(). If its value is UPDATE_ERROR,
|
|
|
+ get_message() will create a response message whose Rcode is
|
|
|
+ REFUSED.
|
|
|
+
|
|
|
+ '''
|
|
|
self.__msg = msg
|
|
|
+ self.__faked_result = faked_result
|
|
|
|
|
|
def handle(self):
|
|
|
- return UPDATE_SUCCESS, TEST_ZONE_NAME, TEST_RRCLASS
|
|
|
+ if self.__faked_result == UPDATE_SUCCESS:
|
|
|
+ return self.__faked_result, TEST_ZONE_NAME, TEST_RRCLASS
|
|
|
+ return self.__faked_result, None, None
|
|
|
|
|
|
def get_message(self):
|
|
|
self.__msg.make_response()
|
|
|
self.__msg.clear_section(SECTION_ZONE)
|
|
|
- self.__msg.set_rcode(Rcode.NOERROR())
|
|
|
+ if self.__faked_result == UPDATE_SUCCESS:
|
|
|
+ self.__msg.set_rcode(Rcode.NOERROR())
|
|
|
+ else:
|
|
|
+ self.__msg.set_rcode(Rcode.REFUSED())
|
|
|
return self.__msg
|
|
|
|
|
|
class MyCCSession(isc.config.ConfigData):
|
|
@@ -390,7 +424,7 @@ class TestDDNSServer(unittest.TestCase):
|
|
|
self.__select_expected = ([1, 2], [], [], None)
|
|
|
self.assertRaises(select.error, self.ddns_server.run)
|
|
|
|
|
|
-def create_msg(opcode=Opcode.UPDATE(), zones=[TEST_ZONE_RECORD], tsig_key=None):
|
|
|
+def create_msg(opcode=Opcode.UPDATE(), zones=[TEST_ZONE_RECORD], tsigctx=None):
|
|
|
msg = Message(Message.RENDER)
|
|
|
msg.set_qid(TEST_QID)
|
|
|
msg.set_opcode(opcode)
|
|
@@ -399,8 +433,8 @@ def create_msg(opcode=Opcode.UPDATE(), zones=[TEST_ZONE_RECORD], tsig_key=None):
|
|
|
msg.add_question(z)
|
|
|
|
|
|
renderer = MessageRenderer()
|
|
|
- if tsig_key is not None:
|
|
|
- msg.to_wire(renderer, TSIGContext(tsig_key))
|
|
|
+ if tsigctx is not None:
|
|
|
+ msg.to_wire(renderer, tsigctx)
|
|
|
else:
|
|
|
msg.to_wire(renderer)
|
|
|
|
|
@@ -416,30 +450,79 @@ class TestDDNSession(unittest.TestCase):
|
|
|
cc_session = MyCCSession()
|
|
|
self.assertFalse(cc_session._started)
|
|
|
self.server = ddns.DDNSServer(cc_session)
|
|
|
- self.server._UpdateSessionClass = FakeUpdateSession
|
|
|
+ self.server._UpdateSessionClass = self.__fake_session_creator
|
|
|
+ self.server._get_tsig_keyring = self.__fake_keyring_getter
|
|
|
+ self.__faked_result = UPDATE_SUCCESS # will be returned by fake session
|
|
|
self.__sock = FakeSocket(-1)
|
|
|
|
|
|
- def check_update_response(self, resp_wire, expected_rcode=Rcode.NOERROR()):
|
|
|
+ def __fake_session_creator(self, req_message, client_addr, zone_config):
|
|
|
+ return FakeUpdateSession(req_message, client_addr, zone_config,
|
|
|
+ self.__faked_result)
|
|
|
+
|
|
|
+ def __fake_keyring_getter(self):
|
|
|
+ return TEST_TSIG_KEYRING
|
|
|
+
|
|
|
+ def check_update_response(self, resp_wire, expected_rcode=Rcode.NOERROR(),
|
|
|
+ tsig_ctx=None):
|
|
|
'''Check if given wire data are valid form of update response.
|
|
|
|
|
|
In this implementation, zone/prerequisite/update sections should be
|
|
|
empty in responses.
|
|
|
|
|
|
+ If tsig_ctx (isc.dns.TSIGContext) is not None, the response should
|
|
|
+ be TSIG signed and the signature should be verifiable with the context
|
|
|
+ that has signed the corresponding request.
|
|
|
+
|
|
|
'''
|
|
|
msg = Message(Message.PARSE)
|
|
|
msg.from_wire(resp_wire)
|
|
|
+ if tsig_ctx is not None:
|
|
|
+ tsig_record = msg.get_tsig_record()
|
|
|
+ self.assertNotEqual(None, tsig_record)
|
|
|
+ self.assertEqual(TSIGError.NOERROR,
|
|
|
+ tsig_ctx.verify(tsig_record, resp_wire))
|
|
|
self.assertEqual(Opcode.UPDATE(), msg.get_opcode())
|
|
|
self.assertEqual(expected_rcode, msg.get_rcode())
|
|
|
self.assertEqual(TEST_QID, msg.get_qid())
|
|
|
for section in [SECTION_ZONE, SECTION_PREREQUISITE, SECTION_UPDATE]:
|
|
|
self.assertEqual(0, msg.get_rr_count(section))
|
|
|
|
|
|
+ def check_session(self, result=UPDATE_SUCCESS, ipv6=True, tsig_key=None):
|
|
|
+ # reset test parameters
|
|
|
+ self.__sock.clear()
|
|
|
+ self.__faked_result = result
|
|
|
+
|
|
|
+ server_addr = TEST_SERVER6 if ipv6 else TEST_SERVER4
|
|
|
+ client_addr = TEST_CLIENT6 if ipv6 else TEST_CLIENT4
|
|
|
+ tsig = TSIGContext(tsig_key) if tsig_key is not None else None
|
|
|
+ rcode = Rcode.NOERROR() if result == UPDATE_SUCCESS else Rcode.REFUSED()
|
|
|
+ has_response = False if result == UPDATE_DROP else True
|
|
|
+
|
|
|
+ self.assertEqual(has_response,
|
|
|
+ self.server.handle_request((self.__sock,
|
|
|
+ server_addr, client_addr,
|
|
|
+ create_msg(tsigctx=tsig))))
|
|
|
+ if has_response:
|
|
|
+ self.assertEqual(client_addr, self.__sock._sent_addr)
|
|
|
+ self.check_update_response(self.__sock._sent_data, rcode)
|
|
|
+ else:
|
|
|
+ self.assertEqual((None, None), (self.__sock._sent_addr,
|
|
|
+ self.__sock._sent_data))
|
|
|
+
|
|
|
def test_handle_request(self):
|
|
|
- self.assertTrue(self.server.handle_request((self.__sock,
|
|
|
- TEST_SERVER6, TEST_CLIENT6,
|
|
|
- create_msg())))
|
|
|
- self.assertEqual(TEST_CLIENT6, self.__sock._sent_addr)
|
|
|
- self.check_update_response(self.__sock._sent_data)
|
|
|
+ '''Basic request handling without any unexpected errors.'''
|
|
|
+ # Success, without TSIG
|
|
|
+ self.check_session()
|
|
|
+ # Update will be refused with a response.
|
|
|
+ self.check_session(UPDATE_ERROR, ipv6=False)
|
|
|
+ # Update will be refused and dropped
|
|
|
+ self.check_session(UPDATE_DROP)
|
|
|
+ # Success, with TSIG
|
|
|
+ self.check_session(ipv6=False, tsig_key=TEST_TSIG_KEY)
|
|
|
+ # Update will be refused with a response, with TSIG.
|
|
|
+ self.check_session(UPDATE_ERROR, tsig_key=TEST_TSIG_KEY)
|
|
|
+ # Update will be refused and dropped, with TSIG (doesn't matter though)
|
|
|
+ self.check_session(UPDATE_DROP, ipv6=False, tsig_key=TEST_TSIG_KEY)
|
|
|
|
|
|
def test_broken_request(self):
|
|
|
# Message data too short
|
|
@@ -453,6 +536,11 @@ class TestDDNSession(unittest.TestCase):
|
|
|
(self.__sock, None, None, create_msg(opcode=Opcode.QUERY()))))
|
|
|
self.assertEqual((None, None), (s._sent_data, s._sent_addr))
|
|
|
|
|
|
+ # TSIG verification error. We use UPDATE_DROP to signal check_session
|
|
|
+ # that no response should be given.
|
|
|
+ self.check_session(result=UPDATE_DROP, ipv6=False,
|
|
|
+ tsig_key=BAD_TSIG_KEY)
|
|
|
+
|
|
|
class TestMain(unittest.TestCase):
|
|
|
def setUp(self):
|
|
|
self._server = MyDDNSServer()
|