|
@@ -15,28 +15,71 @@
|
|
|
|
|
|
'''Tests for the DDNS module'''
|
|
|
|
|
|
-import unittest
|
|
|
-import isc
|
|
|
+from isc.ddns.session import *
|
|
|
+from isc.dns import *
|
|
|
+from isc.acl.acl import ACCEPT
|
|
|
+import isc.util.cio.socketsession
|
|
|
+from isc.datasrc import DataSourceClient
|
|
|
import ddns
|
|
|
-import isc.config
|
|
|
-import select
|
|
|
import errno
|
|
|
-import isc.util.cio.socketsession
|
|
|
+import os
|
|
|
+import select
|
|
|
+import shutil
|
|
|
import socket
|
|
|
-import os.path
|
|
|
+import unittest
|
|
|
+
|
|
|
+# Some common test parameters
|
|
|
+TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
|
|
|
+READ_ZONE_DB_FILE = TESTDATA_PATH + "rwtest.sqlite3" # original, to be copied
|
|
|
+TEST_ZONE_NAME = Name('example.org')
|
|
|
+TEST_ZONE_NAME_STR = TEST_ZONE_NAME.to_text()
|
|
|
+UPDATE_RRTYPE = RRType.SOA()
|
|
|
+TEST_QID = 5353 # arbitrary chosen
|
|
|
+TEST_RRCLASS = RRClass.IN()
|
|
|
+TEST_RRCLASS_STR = TEST_RRCLASS.to_text()
|
|
|
+TEST_SERVER6 = ('2001:db8::53', 53, 0, 0)
|
|
|
+TEST_CLIENT6 = ('2001:db8::1', 53000, 0, 0)
|
|
|
+TEST_SERVER4 = ('192.0.2.53', 53)
|
|
|
+TEST_CLIENT4 = ('192.0.2.1', 53534)
|
|
|
+TEST_ZONE_RECORD = Question(TEST_ZONE_NAME, TEST_RRCLASS, UPDATE_RRTYPE)
|
|
|
+TEST_ACL_CONTEXT = isc.acl.dns.RequestContext(
|
|
|
+ socket.getaddrinfo("192.0.2.1", 1234, 0, socket.SOCK_DGRAM,
|
|
|
+ socket.IPPROTO_UDP, socket.AI_NUMERICHOST)[0][4])
|
|
|
+# TSIG key for tests when needed. The key name is TEST_ZONE_NAME.
|
|
|
+TEST_TSIG_KEY = TSIGKey("example.org:SFuWd/q99SzF8Yzd1QbB9g==")
|
|
|
+# TSIG keyring that contanins the test key
|
|
|
+TEST_TSIG_KEYRING = TSIGKeyRing()
|
|
|
+TEST_TSIG_KEYRING.add(TEST_TSIG_KEY)
|
|
|
+# Another TSIG key not in the keyring, making verification fail
|
|
|
+BAD_TSIG_KEY = TSIGKey("example.com:SFuWd/q99SzF8Yzd1QbB9g==")
|
|
|
|
|
|
class FakeSocket:
|
|
|
"""
|
|
|
A fake socket. It only provides a file number, peer name and accept method.
|
|
|
"""
|
|
|
def __init__(self, fileno):
|
|
|
+ self.proto = socket.IPPROTO_UDP
|
|
|
self.__fileno = fileno
|
|
|
+ self._sent_data = None
|
|
|
+ self._sent_addr = None
|
|
|
+ # customizable by tests; if set to True, sendto() will throw after
|
|
|
+ # recording the parameters.
|
|
|
+ self._raise_on_send = False
|
|
|
def fileno(self):
|
|
|
return self.__fileno
|
|
|
def getpeername(self):
|
|
|
return "fake_unix_socket"
|
|
|
def accept(self):
|
|
|
- return FakeSocket(self.__fileno + 1)
|
|
|
+ return FakeSocket(self.__fileno + 1), '/dummy/path'
|
|
|
+ def sendto(self, data, addr):
|
|
|
+ self._sent_data = data
|
|
|
+ self._sent_addr = addr
|
|
|
+ if self._raise_on_send:
|
|
|
+ raise socket.error('test socket failure')
|
|
|
+ def clear(self):
|
|
|
+ '''Clear internal instrumental data.'''
|
|
|
+ self._sent_data = None
|
|
|
+ self._sent_addr = None
|
|
|
|
|
|
class FakeSessionReceiver:
|
|
|
"""
|
|
@@ -51,14 +94,67 @@ class FakeSessionReceiver:
|
|
|
"""
|
|
|
return self._socket
|
|
|
|
|
|
+class FakeUpdateSession:
|
|
|
+ '''A fake update session, emulating isc.ddns.session.UpdateSession.
|
|
|
+
|
|
|
+ It provides the same interfaces as UpdateSession with skipping complicated
|
|
|
+ internal protocol processing and returning given faked results. This
|
|
|
+ will help simplify test setups.
|
|
|
+
|
|
|
+ '''
|
|
|
+ def __init__(self, msg, client_addr, zone_config, faked_result):
|
|
|
+ '''Faked constructor.
|
|
|
+
|
|
|
+ It takes an additional faked_result parameter. It will be used
|
|
|
+ as the result value of handle(). If its value is UPDATE_ERROR,
|
|
|
+ get_message() will create a response message whose Rcode is
|
|
|
+ REFUSED.
|
|
|
+
|
|
|
+ '''
|
|
|
+ self.__msg = msg
|
|
|
+ self.__faked_result = faked_result
|
|
|
+
|
|
|
+ def handle(self):
|
|
|
+ if self.__faked_result == UPDATE_SUCCESS:
|
|
|
+ return self.__faked_result, TEST_ZONE_NAME, TEST_RRCLASS
|
|
|
+ return self.__faked_result, None, None
|
|
|
+
|
|
|
+ def get_message(self):
|
|
|
+ self.__msg.make_response()
|
|
|
+ self.__msg.clear_section(SECTION_ZONE)
|
|
|
+ if self.__faked_result == UPDATE_SUCCESS:
|
|
|
+ self.__msg.set_rcode(Rcode.NOERROR())
|
|
|
+ else:
|
|
|
+ self.__msg.set_rcode(Rcode.REFUSED())
|
|
|
+ return self.__msg
|
|
|
+
|
|
|
+class FakeKeyringModule:
|
|
|
+ '''Fake the entire isc.server_common.tsig_keyring module.'''
|
|
|
+
|
|
|
+ def init_keyring(self, cc):
|
|
|
+ '''Set the instrumental attribute to True when called.
|
|
|
+
|
|
|
+ It can be used for a test that confirms TSIG key initialization is
|
|
|
+ surely performed. This class doesn't use any CC session, so the
|
|
|
+ cc parameter will be ignored.
|
|
|
+
|
|
|
+ '''
|
|
|
+ self.initialized = True
|
|
|
+
|
|
|
+ def get_keyring(self):
|
|
|
+ '''Simply return the predefined TSIG keyring unconditionally.'''
|
|
|
+ return TEST_TSIG_KEYRING
|
|
|
+
|
|
|
class MyCCSession(isc.config.ConfigData):
|
|
|
- '''Fake session with minimal interface compliance'''
|
|
|
+ '''Fake session with minimal interface compliance.'''
|
|
|
def __init__(self):
|
|
|
module_spec = isc.config.module_spec_from_file(
|
|
|
ddns.SPECFILE_LOCATION)
|
|
|
isc.config.ConfigData.__init__(self, module_spec)
|
|
|
self._started = False
|
|
|
self._stopped = False
|
|
|
+ # Used as the return value of get_remote_config_value. Customizable.
|
|
|
+ self.auth_db_file = READ_ZONE_DB_FILE
|
|
|
|
|
|
def start(self):
|
|
|
'''Called by DDNSServer initialization, but not used in tests'''
|
|
@@ -74,6 +170,13 @@ class MyCCSession(isc.config.ConfigData):
|
|
|
"""
|
|
|
return FakeSocket(1)
|
|
|
|
|
|
+ def add_remote_config(self, spec_file_name):
|
|
|
+ pass
|
|
|
+
|
|
|
+ def get_remote_config_value(self, module_name, item):
|
|
|
+ if module_name == "Auth" and item == "database_file":
|
|
|
+ return self.auth_db_file, False
|
|
|
+
|
|
|
class MyDDNSServer():
|
|
|
'''Fake DDNS server used to test the main() function'''
|
|
|
def __init__(self):
|
|
@@ -104,6 +207,8 @@ class TestDDNSServer(unittest.TestCase):
|
|
|
def setUp(self):
|
|
|
cc_session = MyCCSession()
|
|
|
self.assertFalse(cc_session._started)
|
|
|
+ self.orig_tsig_keyring = isc.server_common.tsig_keyring
|
|
|
+ isc.server_common.tsig_keyring = FakeKeyringModule()
|
|
|
self.ddns_server = ddns.DDNSServer(cc_session)
|
|
|
self.__cc_session = cc_session
|
|
|
self.assertTrue(cc_session._started)
|
|
@@ -118,6 +223,7 @@ class TestDDNSServer(unittest.TestCase):
|
|
|
ddns.select.select = select.select
|
|
|
ddns.isc.util.cio.socketsession.SocketSessionReceiver = \
|
|
|
isc.util.cio.socketsession.SocketSessionReceiver
|
|
|
+ isc.server_common.tsig_keyring = self.orig_tsig_keyring
|
|
|
|
|
|
def test_listen(self):
|
|
|
'''
|
|
@@ -141,12 +247,92 @@ class TestDDNSServer(unittest.TestCase):
|
|
|
ddns.clear_socket()
|
|
|
self.assertFalse(os.path.exists(ddns.SOCKET_FILE))
|
|
|
|
|
|
+ def test_initial_config(self):
|
|
|
+ # right now, the only configuration is the zone configuration, whose
|
|
|
+ # default should be an empty map.
|
|
|
+ self.assertEqual({}, self.ddns_server._zone_config)
|
|
|
+
|
|
|
def test_config_handler(self):
|
|
|
- # Config handler does not do anything yet, but should at least
|
|
|
- # return 'ok' for now.
|
|
|
- new_config = {}
|
|
|
+ # Update with a simple zone configuration: including an accept-all ACL
|
|
|
+ new_config = { 'zones': [ { 'origin': TEST_ZONE_NAME_STR,
|
|
|
+ 'class': TEST_RRCLASS_STR,
|
|
|
+ 'update_acl': [{'action': 'ACCEPT'}] } ] }
|
|
|
answer = self.ddns_server.config_handler(new_config)
|
|
|
self.assertEqual((0, None), isc.config.parse_answer(answer))
|
|
|
+ acl = self.ddns_server._zone_config[(TEST_ZONE_NAME, TEST_RRCLASS)]
|
|
|
+ self.assertEqual(ACCEPT, acl.execute(TEST_ACL_CONTEXT))
|
|
|
+
|
|
|
+ # Slightly more complicated one: containing multiple ACLs
|
|
|
+ new_config = { 'zones': [ { 'origin': 'example.com',
|
|
|
+ 'class': 'CH',
|
|
|
+ 'update_acl': [{'action': 'REJECT',
|
|
|
+ 'from': '2001:db8::1'}] },
|
|
|
+ { 'origin': TEST_ZONE_NAME_STR,
|
|
|
+ 'class': TEST_RRCLASS_STR,
|
|
|
+ 'update_acl': [{'action': 'ACCEPT'}] },
|
|
|
+ { 'origin': 'example.org',
|
|
|
+ 'class': 'CH',
|
|
|
+ 'update_acl': [{'action': 'DROP'}] } ] }
|
|
|
+ answer = self.ddns_server.config_handler(new_config)
|
|
|
+ self.assertEqual((0, None), isc.config.parse_answer(answer))
|
|
|
+ self.assertEqual(3, len(self.ddns_server._zone_config))
|
|
|
+ acl = self.ddns_server._zone_config[(TEST_ZONE_NAME, TEST_RRCLASS)]
|
|
|
+ self.assertEqual(ACCEPT, acl.execute(TEST_ACL_CONTEXT))
|
|
|
+
|
|
|
+ # empty zone config
|
|
|
+ new_config = { 'zones': [] }
|
|
|
+ answer = self.ddns_server.config_handler(new_config)
|
|
|
+ self.assertEqual((0, None), isc.config.parse_answer(answer))
|
|
|
+ self.assertEqual({}, self.ddns_server._zone_config)
|
|
|
+
|
|
|
+ # bad zone config data: bad name. The previous config shouls be kept.
|
|
|
+ bad_config = { 'zones': [ { 'origin': 'bad..example',
|
|
|
+ 'class': TEST_RRCLASS_STR,
|
|
|
+ 'update_acl': [{'action': 'ACCEPT'}] } ] }
|
|
|
+ answer = self.ddns_server.config_handler(bad_config)
|
|
|
+ self.assertEqual(1, isc.config.parse_answer(answer)[0])
|
|
|
+ self.assertEqual({}, self.ddns_server._zone_config)
|
|
|
+
|
|
|
+ # bad zone config data: bad class.
|
|
|
+ bad_config = { 'zones': [ { 'origin': TEST_ZONE_NAME_STR,
|
|
|
+ 'class': 'badclass',
|
|
|
+ 'update_acl': [{'action': 'ACCEPT'}] } ] }
|
|
|
+ answer = self.ddns_server.config_handler(bad_config)
|
|
|
+ self.assertEqual(1, isc.config.parse_answer(answer)[0])
|
|
|
+ self.assertEqual({}, self.ddns_server._zone_config)
|
|
|
+
|
|
|
+ # bad zone config data: bad ACL.
|
|
|
+ bad_config = { 'zones': [ { 'origin': TEST_ZONE_NAME_STR,
|
|
|
+ 'class': TEST_RRCLASS_STR,
|
|
|
+ 'update_acl': [{'action': 'badaction'}]}]}
|
|
|
+ answer = self.ddns_server.config_handler(bad_config)
|
|
|
+ self.assertEqual(1, isc.config.parse_answer(answer)[0])
|
|
|
+ self.assertEqual({}, self.ddns_server._zone_config)
|
|
|
+
|
|
|
+ # the first zone cofig is valid, but not the second. the first one
|
|
|
+ # shouldn't be installed.
|
|
|
+ bad_config = { 'zones': [ { 'origin': TEST_ZONE_NAME_STR,
|
|
|
+ 'class': TEST_RRCLASS_STR,
|
|
|
+ 'update_acl': [{'action': 'ACCEPT'}] },
|
|
|
+ { 'origin': 'bad..example',
|
|
|
+ 'class': TEST_RRCLASS_STR,
|
|
|
+ 'update_acl': [{'action': 'ACCEPT'}] } ] }
|
|
|
+ answer = self.ddns_server.config_handler(bad_config)
|
|
|
+ self.assertEqual(1, isc.config.parse_answer(answer)[0])
|
|
|
+ self.assertEqual({}, self.ddns_server._zone_config)
|
|
|
+
|
|
|
+ # Half-broken case: 'origin, class' pair is duplicate. For now we
|
|
|
+ # we accept it (the latter one will win)
|
|
|
+ dup_config = { 'zones': [ { 'origin': TEST_ZONE_NAME_STR,
|
|
|
+ 'class': TEST_RRCLASS_STR,
|
|
|
+ 'update_acl': [{'action': 'REJECT'}] },
|
|
|
+ { 'origin': TEST_ZONE_NAME_STR,
|
|
|
+ 'class': TEST_RRCLASS_STR,
|
|
|
+ 'update_acl': [{'action': 'ACCEPT'}] } ] }
|
|
|
+ answer = self.ddns_server.config_handler(dup_config)
|
|
|
+ self.assertEqual((0, None), isc.config.parse_answer(answer))
|
|
|
+ acl = self.ddns_server._zone_config[(TEST_ZONE_NAME, TEST_RRCLASS)]
|
|
|
+ self.assertEqual(ACCEPT, acl.execute(TEST_ACL_CONTEXT))
|
|
|
|
|
|
def test_shutdown_command(self):
|
|
|
'''Test whether the shutdown command works'''
|
|
@@ -361,6 +547,186 @@ class TestDDNSServer(unittest.TestCase):
|
|
|
self.__select_expected = ([1, 2], [], [], None)
|
|
|
self.assertRaises(select.error, self.ddns_server.run)
|
|
|
|
|
|
+def create_msg(opcode=Opcode.UPDATE(), zones=[TEST_ZONE_RECORD], prereq=[],
|
|
|
+ tsigctx=None):
|
|
|
+ msg = Message(Message.RENDER)
|
|
|
+ msg.set_qid(TEST_QID)
|
|
|
+ msg.set_opcode(opcode)
|
|
|
+ msg.set_rcode(Rcode.NOERROR())
|
|
|
+ for z in zones:
|
|
|
+ msg.add_question(z)
|
|
|
+ for p in prereq:
|
|
|
+ msg.add_rrset(SECTION_PREREQUISITE, p)
|
|
|
+
|
|
|
+ renderer = MessageRenderer()
|
|
|
+ if tsigctx is not None:
|
|
|
+ msg.to_wire(renderer, tsigctx)
|
|
|
+ else:
|
|
|
+ msg.to_wire(renderer)
|
|
|
+
|
|
|
+ # re-read the created data in the parse mode
|
|
|
+ msg.clear(Message.PARSE)
|
|
|
+ msg.from_wire(renderer.get_data())
|
|
|
+
|
|
|
+ return renderer.get_data()
|
|
|
+
|
|
|
+
|
|
|
+class TestDDNSession(unittest.TestCase):
|
|
|
+ def setUp(self):
|
|
|
+ cc_session = MyCCSession()
|
|
|
+ self.assertFalse(cc_session._started)
|
|
|
+ self.orig_tsig_keyring = isc.server_common.tsig_keyring
|
|
|
+ isc.server_common.tsig_keyring = FakeKeyringModule()
|
|
|
+ self.server = ddns.DDNSServer(cc_session)
|
|
|
+ self.server._UpdateSessionClass = self.__fake_session_creator
|
|
|
+ self.__faked_result = UPDATE_SUCCESS # will be returned by fake session
|
|
|
+ self.__sock = FakeSocket(-1)
|
|
|
+
|
|
|
+ def tearDown(self):
|
|
|
+ self.assertTrue(isc.server_common.tsig_keyring.initialized)
|
|
|
+ isc.server_common.tsig_keyring = self.orig_tsig_keyring
|
|
|
+
|
|
|
+ def __fake_session_creator(self, req_message, client_addr, zone_config):
|
|
|
+ # remember the passed message for possible inspection later.
|
|
|
+ self.__req_message = req_message
|
|
|
+ return FakeUpdateSession(req_message, client_addr, zone_config,
|
|
|
+ self.__faked_result)
|
|
|
+
|
|
|
+ def check_update_response(self, resp_wire, expected_rcode=Rcode.NOERROR(),
|
|
|
+ tsig_ctx=None):
|
|
|
+ '''Check if given wire data are valid form of update response.
|
|
|
+
|
|
|
+ In this implementation, zone/prerequisite/update sections should be
|
|
|
+ empty in responses.
|
|
|
+
|
|
|
+ If tsig_ctx (isc.dns.TSIGContext) is not None, the response should
|
|
|
+ be TSIG signed and the signature should be verifiable with the context
|
|
|
+ that has signed the corresponding request.
|
|
|
+
|
|
|
+ '''
|
|
|
+ msg = Message(Message.PARSE)
|
|
|
+ msg.from_wire(resp_wire)
|
|
|
+ if tsig_ctx is not None:
|
|
|
+ tsig_record = msg.get_tsig_record()
|
|
|
+ self.assertNotEqual(None, tsig_record)
|
|
|
+ self.assertEqual(TSIGError.NOERROR,
|
|
|
+ tsig_ctx.verify(tsig_record, resp_wire))
|
|
|
+ self.assertEqual(Opcode.UPDATE(), msg.get_opcode())
|
|
|
+ self.assertEqual(expected_rcode, msg.get_rcode())
|
|
|
+ self.assertEqual(TEST_QID, msg.get_qid())
|
|
|
+ for section in [SECTION_ZONE, SECTION_PREREQUISITE, SECTION_UPDATE]:
|
|
|
+ self.assertEqual(0, msg.get_rr_count(section))
|
|
|
+
|
|
|
+ def check_session(self, result=UPDATE_SUCCESS, ipv6=True, tsig_key=None):
|
|
|
+ # reset test parameters
|
|
|
+ self.__sock.clear()
|
|
|
+ self.__faked_result = result
|
|
|
+
|
|
|
+ server_addr = TEST_SERVER6 if ipv6 else TEST_SERVER4
|
|
|
+ client_addr = TEST_CLIENT6 if ipv6 else TEST_CLIENT4
|
|
|
+ tsig = TSIGContext(tsig_key) if tsig_key is not None else None
|
|
|
+ rcode = Rcode.NOERROR() if result == UPDATE_SUCCESS else Rcode.REFUSED()
|
|
|
+ has_response = (result != UPDATE_DROP)
|
|
|
+
|
|
|
+ self.assertEqual(has_response,
|
|
|
+ self.server.handle_request((self.__sock,
|
|
|
+ server_addr, client_addr,
|
|
|
+ create_msg(tsigctx=tsig))))
|
|
|
+ if has_response:
|
|
|
+ self.assertEqual(client_addr, self.__sock._sent_addr)
|
|
|
+ self.check_update_response(self.__sock._sent_data, rcode)
|
|
|
+ else:
|
|
|
+ self.assertEqual((None, None), (self.__sock._sent_addr,
|
|
|
+ self.__sock._sent_data))
|
|
|
+
|
|
|
+ def test_handle_request(self):
|
|
|
+ '''Basic request handling without any unexpected errors.'''
|
|
|
+ # Success, without TSIG
|
|
|
+ self.check_session()
|
|
|
+ # Update will be refused with a response.
|
|
|
+ self.check_session(UPDATE_ERROR, ipv6=False)
|
|
|
+ # Update will be refused and dropped
|
|
|
+ self.check_session(UPDATE_DROP)
|
|
|
+ # Success, with TSIG
|
|
|
+ self.check_session(ipv6=False, tsig_key=TEST_TSIG_KEY)
|
|
|
+ # Update will be refused with a response, with TSIG.
|
|
|
+ self.check_session(UPDATE_ERROR, tsig_key=TEST_TSIG_KEY)
|
|
|
+ # Update will be refused and dropped, with TSIG (doesn't matter though)
|
|
|
+ self.check_session(UPDATE_DROP, ipv6=False, tsig_key=TEST_TSIG_KEY)
|
|
|
+
|
|
|
+ def test_broken_request(self):
|
|
|
+ # Message data too short
|
|
|
+ s = self.__sock
|
|
|
+ self.assertFalse(self.server.handle_request((self.__sock, None,
|
|
|
+ None, b'x' * 11)))
|
|
|
+ self.assertEqual((None, None), (s._sent_data, s._sent_addr))
|
|
|
+
|
|
|
+ # Opcode is not UPDATE
|
|
|
+ self.assertFalse(self.server.handle_request(
|
|
|
+ (self.__sock, None, None, create_msg(opcode=Opcode.QUERY()))))
|
|
|
+ self.assertEqual((None, None), (s._sent_data, s._sent_addr))
|
|
|
+
|
|
|
+ # TSIG verification error. We use UPDATE_DROP to signal check_session
|
|
|
+ # that no response should be given.
|
|
|
+ self.check_session(result=UPDATE_DROP, ipv6=False,
|
|
|
+ tsig_key=BAD_TSIG_KEY)
|
|
|
+
|
|
|
+ def test_socket_error(self):
|
|
|
+ # Have the faked socket raise an exception on sendto()
|
|
|
+ self.__sock._raise_on_send = True
|
|
|
+ # handle_request indicates the failure
|
|
|
+ self.assertFalse(self.server.handle_request((self.__sock, TEST_SERVER6,
|
|
|
+ TEST_SERVER4,
|
|
|
+ create_msg())))
|
|
|
+ # this check ensures sendto() was really attempted.
|
|
|
+ self.check_update_response(self.__sock._sent_data, Rcode.NOERROR())
|
|
|
+
|
|
|
+ def test_tcp_request(self):
|
|
|
+ # Right now TCP request is not supported.
|
|
|
+ s = self.__sock
|
|
|
+ s.proto = socket.IPPROTO_TCP
|
|
|
+ self.assertFalse(self.server.handle_request((s, TEST_SERVER6,
|
|
|
+ TEST_SERVER4,
|
|
|
+ create_msg())))
|
|
|
+ self.assertEqual((None, None), (s._sent_data, s._sent_addr))
|
|
|
+
|
|
|
+ def test_request_message(self):
|
|
|
+ '''Test if the request message stores RRs separately.'''
|
|
|
+ # Specify 'drop' so the passed message won't be modified.
|
|
|
+ self.__faked_result = UPDATE_DROP
|
|
|
+ # Put the same RR twice in the prerequisite section. We should see
|
|
|
+ # them as separate RRs.
|
|
|
+ dummy_record = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.NS(),
|
|
|
+ RRTTL(0))
|
|
|
+ dummy_record.add_rdata(Rdata(RRType.NS(), TEST_RRCLASS, "ns.example"))
|
|
|
+ self.server.handle_request((self.__sock, TEST_SERVER6, TEST_CLIENT6,
|
|
|
+ create_msg(prereq=[dummy_record,
|
|
|
+ dummy_record])))
|
|
|
+ num_rrsets = len(self.__req_message.get_section(SECTION_PREREQUISITE))
|
|
|
+ self.assertEqual(2, num_rrsets)
|
|
|
+
|
|
|
+ def test_session_with_config(self):
|
|
|
+ '''Check a session with more relistic config setups
|
|
|
+
|
|
|
+ We don't have to explore various cases in detail in this test.
|
|
|
+ We're just checking if the expected configured objects are passed
|
|
|
+ to the session object.
|
|
|
+
|
|
|
+ '''
|
|
|
+
|
|
|
+ # reset the session class to the real one
|
|
|
+ self.server._UpdateSessionClass = isc.ddns.session.UpdateSession
|
|
|
+
|
|
|
+ # install all-drop ACL
|
|
|
+ new_config = { 'zones': [ { 'origin': TEST_ZONE_NAME_STR,
|
|
|
+ 'class': TEST_RRCLASS_STR,
|
|
|
+ 'update_acl': [{'action': 'DROP'}] } ] }
|
|
|
+ answer = self.server.config_handler(new_config)
|
|
|
+ self.assertEqual((0, None), isc.config.parse_answer(answer))
|
|
|
+
|
|
|
+ # check the result
|
|
|
+ self.check_session(UPDATE_DROP)
|
|
|
+
|
|
|
class TestMain(unittest.TestCase):
|
|
|
def setUp(self):
|
|
|
self._server = MyDDNSServer()
|
|
@@ -416,6 +782,38 @@ class TestMain(unittest.TestCase):
|
|
|
self.assertRaises(BaseException, ddns.main, self._server)
|
|
|
self.assertTrue(self._server.exception_raised)
|
|
|
|
|
|
+class TestConfig(unittest.TestCase):
|
|
|
+ '''Test some simple config related things that don't need server. '''
|
|
|
+ def setUp(self):
|
|
|
+ self.__ccsession = MyCCSession()
|
|
|
+
|
|
|
+ def test_file_path(self):
|
|
|
+ # Check some common paths
|
|
|
+ self.assertEqual(os.environ["B10_FROM_BUILD"] + "/ddns_socket",
|
|
|
+ ddns.SOCKET_FILE)
|
|
|
+ self.assertEqual(os.environ["B10_FROM_SOURCE"] +
|
|
|
+ "/src/bin/ddns/ddns.spec", ddns.SPECFILE_LOCATION)
|
|
|
+ self.assertEqual(os.environ["B10_FROM_BUILD"] +
|
|
|
+ "/src/bin/auth/auth.spec",
|
|
|
+ ddns.AUTH_SPECFILE_LOCATION)
|
|
|
+
|
|
|
+ def test_get_datasrc_client(self):
|
|
|
+ # The test sqlite DB should contain the example.org zone.
|
|
|
+ rrclass, datasrc_client = ddns.get_datasrc_client(self.__ccsession)
|
|
|
+ self.assertEqual(RRClass.IN(), rrclass)
|
|
|
+ self.assertEqual(DataSourceClient.SUCCESS,
|
|
|
+ datasrc_client.find_zone(Name('example.org'))[0])
|
|
|
+
|
|
|
+ def test_get_datasrc_client_fail(self):
|
|
|
+ # DB file is in a non existent directory, and creatng the client
|
|
|
+ # will fail. get_datasrc_client will return a dummy client, which
|
|
|
+ # will subsequently make find_zone() fail.
|
|
|
+ self.__ccsession.auth_db_file = './notexistentdir/somedb.sqlite3'
|
|
|
+ rrclass, datasrc_client = ddns.get_datasrc_client(self.__ccsession)
|
|
|
+ self.assertEqual(RRClass.IN(), rrclass)
|
|
|
+ self.assertRaises(isc.datasrc.Error,
|
|
|
+ datasrc_client.find_zone, Name('example.org'))
|
|
|
+
|
|
|
if __name__== "__main__":
|
|
|
isc.log.resetUnitTestRootLogger()
|
|
|
unittest.main()
|