123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398 |
- # Copyright (C) 2011 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- '''Tests for the DDNS module'''
- from isc.ddns.session import *
- from isc.dns import *
- from isc.acl.acl import ACCEPT
- import isc.util.cio.socketsession
- from isc.cc.session import SessionTimeout, SessionError, ProtocolError
- from isc.datasrc import DataSourceClient
- from isc.config import module_spec_from_file
- from isc.config.config_data import ConfigData
- from isc.config.ccsession import create_answer, ModuleCCSessionError
- from isc.config.module_spec import ModuleSpecError
- from isc.server_common.dns_tcp import DNSTCPContext
- import ddns
- import errno
- import os
- import select
- import shutil
- import socket
- import unittest
- # Some common test parameters
- TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
- READ_ZONE_DB_FILE = TESTDATA_PATH + "rwtest.sqlite3" # original, to be copied
- TEST_ZONE_NAME = Name('example.org')
- TEST_ZONE_NAME_STR = TEST_ZONE_NAME.to_text()
- UPDATE_RRTYPE = RRType.SOA
- TEST_QID = 5353 # arbitrarily chosen
- TEST_RRCLASS = RRClass.IN
- TEST_RRCLASS_STR = TEST_RRCLASS.to_text()
- TEST_SERVER6 = ('2001:db8::53', 53, 0, 0)
- TEST_CLIENT6 = ('2001:db8::1', 53000, 0, 0)
- TEST_SERVER4 = ('192.0.2.53', 53)
- TEST_CLIENT4 = ('192.0.2.1', 53534)
- TEST_ZONE_RECORD = Question(TEST_ZONE_NAME, TEST_RRCLASS, UPDATE_RRTYPE)
- TEST_ACL_CONTEXT = isc.acl.dns.RequestContext(
- socket.getaddrinfo("192.0.2.1", 1234, 0, socket.SOCK_DGRAM,
- socket.IPPROTO_UDP, socket.AI_NUMERICHOST)[0][4])
- # TSIG key for tests when needed. The key name is TEST_ZONE_NAME.
- TEST_TSIG_KEY = TSIGKey("example.org:SFuWd/q99SzF8Yzd1QbB9g==")
- # TSIG keyring that contains the test key
- TEST_TSIG_KEYRING = TSIGKeyRing()
- TEST_TSIG_KEYRING.add(TEST_TSIG_KEY)
- # Another TSIG key not in the keyring, making verification fail
- BAD_TSIG_KEY = TSIGKey("example.com:SFuWd/q99SzF8Yzd1QbB9g==")
- # Incorporate it so we can use the real default values of zonemgr config
- # in the tests.
- ZONEMGR_MODULE_SPEC = module_spec_from_file(
- os.environ["B10_FROM_BUILD"] + "/src/bin/zonemgr/zonemgr.spec")
- class FakeSocket:
- """
- A fake socket. It only provides a file number, peer name and accept method.
- """
- def __init__(self, fileno, proto=socket.IPPROTO_UDP):
- self.proto = proto
- self.__fileno = fileno
- self._sent_data = None
- self._sent_addr = None
- self._close_called = 0 # number of calls to close()
- self.__send_cc = 0 # waterline of the send buffer (can be reset)
- # customizable by tests; if set to True, sendto() will throw after
- # recording the parameters.
- self._raise_on_send = False
- self._send_buflen = None # imaginary send buffer for partial send
- def fileno(self):
- return self.__fileno
- def getpeername(self):
- if self.proto == socket.IPPROTO_UDP or \
- self.proto == socket.IPPROTO_TCP:
- return TEST_CLIENT4
- return "fake_unix_socket"
- def accept(self):
- return FakeSocket(self.__fileno + 1), '/dummy/path'
- def sendto(self, data, addr):
- self._sent_data = data
- self._sent_addr = addr
- if self._raise_on_send:
- raise socket.error('test socket failure')
- def send(self, data):
- if self._raise_on_send:
- raise socket.error(errno.EPIPE, 'faked connection disruption')
- elif self._send_buflen is None:
- available_space = len(data)
- else:
- available_space = self._send_buflen - self.__send_cc
- if available_space == 0:
- # if there's no space, (assuming it's nonblocking mode) raise
- # EAGAIN.
- raise socket.error(errno.EAGAIN,
- "Resource temporarily unavailable")
- # determine the sendable part of the data, record it, update "buffer".
- cc = min(available_space, len(data))
- if self._sent_data is None:
- self._sent_data = data[:cc]
- else:
- self._sent_data += data[:cc]
- self.__send_cc += cc
- return cc
- def setblocking(self, on):
- # We only need a faked NO-OP implementation.
- pass
- def close(self):
- self._close_called += 1
- def clear(self):
- '''Clear internal instrumental data.'''
- self._sent_data = None
- self._sent_addr = None
- def make_send_ready(self):
- # pretend that the accrued data has been cleared, making room in
- # the send buffer.
- self.__send_cc = 0
- class FakeSessionReceiver:
- """
- A fake socket session receiver, for our tests.
- """
- def __init__(self, socket):
- self._socket = socket
- def socket(self):
- """
- This method is not present in the real receiver, but we use it to
- inspect the socket passed to the constructor.
- """
- return self._socket
- class FakeUpdateSession:
- '''A fake update session, emulating isc.ddns.session.UpdateSession.
- It provides the same interfaces as UpdateSession with skipping complicated
- internal protocol processing and returning given faked results. This
- will help simplify test setups.
- '''
- def __init__(self, msg, client_addr, zone_config, faked_result):
- '''Faked constructor.
- It takes an additional faked_result parameter. It will be used
- as the result value of handle(). If its value is UPDATE_ERROR,
- get_message() will create a response message whose Rcode is
- REFUSED.
- '''
- self.__msg = msg
- self.__faked_result = faked_result
- def handle(self):
- if self.__faked_result == UPDATE_SUCCESS:
- return self.__faked_result, TEST_ZONE_NAME, TEST_RRCLASS
- return self.__faked_result, None, None
- def get_message(self):
- self.__msg.make_response()
- self.__msg.clear_section(SECTION_ZONE)
- if self.__faked_result == UPDATE_SUCCESS:
- self.__msg.set_rcode(Rcode.NOERROR)
- else:
- self.__msg.set_rcode(Rcode.REFUSED)
- return self.__msg
- class FakeKeyringModule:
- '''Fake the entire isc.server_common.tsig_keyring module.'''
- def init_keyring(self, cc):
- '''Set the instrumental attribute to True when called.
- It can be used for a test that confirms TSIG key initialization is
- surely performed. This class doesn't use any CC session, so the
- cc parameter will be ignored.
- '''
- self.initialized = True
- def get_keyring(self):
- '''Simply return the predefined TSIG keyring unconditionally.'''
- return TEST_TSIG_KEYRING
- class MyCCSession(isc.config.ModuleCCSession):
- '''Fake session with minimal interface compliance.'''
- # faked CC sequence used in group_send/recvmsg
- FAKE_SEQUENCE = 53
- def __init__(self):
- module_spec = isc.config.module_spec_from_file(
- ddns.SPECFILE_LOCATION)
- isc.config.ConfigData.__init__(self, module_spec)
- self._started = False
- self._stopped = False
- # Used as the return value of get_remote_config_value. Customizable.
- self.auth_db_file = READ_ZONE_DB_FILE
- # Used as the return value of get_remote_config_value. Customizable.
- self.auth_datasources = None
- # faked cc channel, providing group_send/recvmsg itself. The following
- # attributes are for inspection/customization in tests.
- self._session = self
- self._sent_msg = []
- self._recvmsg_called = 0
- self._answer_code = 0 # code used in answer returned via recvmsg
- self._sendmsg_exception = None # will be raised from sendmsg if !None
- self._recvmsg_exception = None # will be raised from recvmsg if !None
- # Attributes to handle (faked) remote configurations
- self.__callbacks = {} # record callbacks for updates to remote confs
- self._raise_mods = {} # map of module to exceptions to be triggered
- # on add_remote. settable by tests.
- self._auth_config = {} # faked auth cfg, settable by tests
- self._zonemgr_config = {} # faked zonemgr cfg, settable by tests
- def start(self):
- '''Called by DDNSServer initialization, but not used in tests'''
- self._started = True
- def send_stopping(self):
- '''Called by shutdown code'''
- self._stopped = True
- def get_socket(self):
- """
- Used to get the file number for select.
- """
- return FakeSocket(1)
- def add_remote_config_by_name(self, module_name, update_callback=None):
- # If a list of exceptions is given for the module, raise the front one,
- # removing that exception from the list (so the list length controls
- # how many (and which) exceptions should be raised on add_remote).
- if module_name in self._raise_mods.keys() and \
- len(self._raise_mods[module_name]) != 0:
- ex = self._raise_mods[module_name][0]
- self._raise_mods[module_name] = self._raise_mods[module_name][1:]
- raise ex('Failure requesting remote config data')
- if update_callback is not None:
- self.__callbacks[module_name] = update_callback
- if module_name is 'Auth':
- if module_name in self.__callbacks:
- # ddns implementation doesn't use the 2nd element, so just
- # setting it to None
- self.__callbacks[module_name](self._auth_config, None)
- if module_name is 'Zonemgr':
- if module_name in self.__callbacks:
- self.__callbacks[module_name](self._zonemgr_config,
- ConfigData(ZONEMGR_MODULE_SPEC))
- def get_remote_config_value(self, module_name, item):
- if module_name == "Auth" and item == "database_file":
- return self.auth_db_file, False
- if module_name == "Auth" and item == "datasources":
- if self.auth_datasources is None:
- return [], True # default
- else:
- return self.auth_datasources, False
- if module_name == 'Zonemgr' and item == 'secondary_zones':
- if item in self._zonemgr_config:
- return self._zonemgr_config[item], False
- else:
- seczone_default = \
- ConfigData(ZONEMGR_MODULE_SPEC).get_default_value(
- 'secondary_zones')
- return seczone_default, True
- def group_sendmsg(self, msg, group, instance='*', to='*',
- want_answer=False):
- # remember the passed parameter, and return dummy sequence
- self._sent_msg.append((msg, group))
- if self._sendmsg_exception is not None:
- raise self._sendmsg_exception
- return self.FAKE_SEQUENCE
- def group_recvmsg(self, nonblock, seq):
- self._recvmsg_called += 1
- if seq != self.FAKE_SEQUENCE:
- raise RuntimeError('unexpected CC sequence: ' + str(seq))
- if self._recvmsg_exception is not None:
- raise self._recvmsg_exception
- if self._answer_code is 0:
- return create_answer(0), None
- else:
- return create_answer(self._answer_code, "dummy error value"), None
- def clear_msg(self):
- '''Clear instrumental attributes related session messages.'''
- self._sent_msg = []
- self._recvmsg_called = 0
- self._answer_code = 0
- self._sendmsg_exception = None
- self._recvmsg_exception = None
- class MyDDNSServer():
- '''Fake DDNS server used to test the main() function'''
- def __init__(self):
- self.reset()
- def run(self):
- '''
- Fake the run() method of the DDNS server. This will set
- self._run_called to True.
- If self._exception is not None, this is raised as an exception
- '''
- self.run_called = True
- if self._exception is not None:
- self.exception_raised = True
- raise self._exception
- def set_exception(self, exception):
- '''Set an exception to be raised when run() is called'''
- self._exception = exception
- def reset(self):
- '''(Re)set to initial values'''
- self.run_called = False
- self.exception_raised = False
- self._exception = None
- class TestDDNSServer(unittest.TestCase):
- def setUp(self):
- cc_session = MyCCSession()
- self.assertFalse(cc_session._started)
- self.orig_tsig_keyring = isc.server_common.tsig_keyring
- isc.server_common.tsig_keyring = FakeKeyringModule()
- self.ddns_server = ddns.DDNSServer(cc_session)
- self.__cc_session = cc_session
- self.assertTrue(cc_session._started)
- self.__select_expected = None
- self.__select_answer = None
- self.__select_exception = None
- self.__hook_called = False
- # Because we overwrite the _listen_socket, close any existing
- # socket object.
- if self.ddns_server._listen_socket is not None:
- self.ddns_server._listen_socket.close()
- self.ddns_server._listen_socket = FakeSocket(2)
- ddns.select.select = self.__select
- # common private attributes for TCP response tests
- self.__tcp_sock = FakeSocket(10, socket.IPPROTO_TCP)
- self.__tcp_ctx = DNSTCPContext(self.__tcp_sock)
- self.__tcp_data = b'A' * 12 # dummy, just the same size as DNS header
- # some tests will override this, which will be restored in tearDown:
- self.__orig_add_pause = ddns.add_pause
- def tearDown(self):
- ddns.select.select = select.select
- ddns.isc.util.cio.socketsession.SocketSessionReceiver = \
- isc.util.cio.socketsession.SocketSessionReceiver
- isc.server_common.tsig_keyring = self.orig_tsig_keyring
- ddns.add_pause = self.__orig_add_pause
- def test_listen(self):
- '''
- Test the old socket file is removed (if any) and a new socket
- is created when the ddns server is created.
- '''
- # Make sure the socket does not exist now
- ddns.clear_socket()
- # Hook the call for clearing the socket
- orig_clear = ddns.clear_socket
- ddns.clear_socket = self.__hook
- # Create the server
- ddnss = ddns.DDNSServer(MyCCSession())
- ddns.clear_socket = orig_clear
- # The socket is created
- self.assertTrue(os.path.exists(ddns.SOCKET_FILE))
- self.assertTrue(isinstance(ddnss._listen_socket, socket.socket))
- # And deletion of the socket was requested
- self.assertIsNone(self.__hook_called)
- # Now make sure the clear_socket really works
- ddns.clear_socket()
- self.assertFalse(os.path.exists(ddns.SOCKET_FILE))
- # Let ddns object complete any necessary cleanup (not part of the test,
- # but for suppressing any warnings from the Python interpreter)
- ddnss.shutdown_cleanup()
- def test_initial_config(self):
- # right now, the only configuration is the zone configuration, whose
- # default should be an empty map.
- self.assertEqual({}, self.ddns_server._zone_config)
- def test_config_handler(self):
- # Update with a simple zone configuration: including an accept-all ACL
- new_config = { 'zones': [ { 'origin': TEST_ZONE_NAME_STR,
- 'class': TEST_RRCLASS_STR,
- 'update_acl': [{'action': 'ACCEPT'}] } ] }
- answer = self.ddns_server.config_handler(new_config)
- self.assertEqual((0, None), isc.config.parse_answer(answer))
- acl = self.ddns_server._zone_config[(TEST_ZONE_NAME, TEST_RRCLASS)]
- self.assertEqual(ACCEPT, acl.execute(TEST_ACL_CONTEXT))
- # Slightly more complicated one: containing multiple ACLs
- new_config = { 'zones': [ { 'origin': 'example.com',
- 'class': 'CH',
- 'update_acl': [{'action': 'REJECT',
- 'from': '2001:db8::1'}] },
- { 'origin': TEST_ZONE_NAME_STR,
- 'class': TEST_RRCLASS_STR,
- 'update_acl': [{'action': 'ACCEPT'}] },
- { 'origin': 'example.org',
- 'class': 'CH',
- 'update_acl': [{'action': 'DROP'}] } ] }
- answer = self.ddns_server.config_handler(new_config)
- self.assertEqual((0, None), isc.config.parse_answer(answer))
- self.assertEqual(3, len(self.ddns_server._zone_config))
- acl = self.ddns_server._zone_config[(TEST_ZONE_NAME, TEST_RRCLASS)]
- self.assertEqual(ACCEPT, acl.execute(TEST_ACL_CONTEXT))
- # empty zone config
- new_config = { 'zones': [] }
- answer = self.ddns_server.config_handler(new_config)
- self.assertEqual((0, None), isc.config.parse_answer(answer))
- self.assertEqual({}, self.ddns_server._zone_config)
- # bad zone config data: bad name. The previous config shouls be kept.
- bad_config = { 'zones': [ { 'origin': 'bad..example',
- 'class': TEST_RRCLASS_STR,
- 'update_acl': [{'action': 'ACCEPT'}] } ] }
- answer = self.ddns_server.config_handler(bad_config)
- self.assertEqual(1, isc.config.parse_answer(answer)[0])
- self.assertEqual({}, self.ddns_server._zone_config)
- # bad zone config data: bad class.
- bad_config = { 'zones': [ { 'origin': TEST_ZONE_NAME_STR,
- 'class': 'badclass',
- 'update_acl': [{'action': 'ACCEPT'}] } ] }
- answer = self.ddns_server.config_handler(bad_config)
- self.assertEqual(1, isc.config.parse_answer(answer)[0])
- self.assertEqual({}, self.ddns_server._zone_config)
- # bad zone config data: bad ACL.
- bad_config = { 'zones': [ { 'origin': TEST_ZONE_NAME_STR,
- 'class': TEST_RRCLASS_STR,
- 'update_acl': [{'action': 'badaction'}]}]}
- answer = self.ddns_server.config_handler(bad_config)
- self.assertEqual(1, isc.config.parse_answer(answer)[0])
- self.assertEqual({}, self.ddns_server._zone_config)
- # the first zone config is valid, but not the second. the first one
- # shouldn't be installed.
- bad_config = { 'zones': [ { 'origin': TEST_ZONE_NAME_STR,
- 'class': TEST_RRCLASS_STR,
- 'update_acl': [{'action': 'ACCEPT'}] },
- { 'origin': 'bad..example',
- 'class': TEST_RRCLASS_STR,
- 'update_acl': [{'action': 'ACCEPT'}] } ] }
- answer = self.ddns_server.config_handler(bad_config)
- self.assertEqual(1, isc.config.parse_answer(answer)[0])
- self.assertEqual({}, self.ddns_server._zone_config)
- # Half-broken case: 'origin, class' pair is duplicate. For now we
- # we accept it (the latter one will win)
- dup_config = { 'zones': [ { 'origin': TEST_ZONE_NAME_STR,
- 'class': TEST_RRCLASS_STR,
- 'update_acl': [{'action': 'REJECT'}] },
- { 'origin': TEST_ZONE_NAME_STR,
- 'class': TEST_RRCLASS_STR,
- 'update_acl': [{'action': 'ACCEPT'}] } ] }
- answer = self.ddns_server.config_handler(dup_config)
- self.assertEqual((0, None), isc.config.parse_answer(answer))
- acl = self.ddns_server._zone_config[(TEST_ZONE_NAME, TEST_RRCLASS)]
- self.assertEqual(ACCEPT, acl.execute(TEST_ACL_CONTEXT))
- def test_datasrc_config(self):
- # By default (in our faked config) it should be derived from the
- # test data source
- rrclass, datasrc_client = self.ddns_server._datasrc_info
- self.assertEqual(RRClass.IN, rrclass)
- self.assertEqual(DataSourceClient.SUCCESS,
- datasrc_client.find_zone(Name('example.org'))[0])
- # emulating an update. calling add_remote_config_by_name is a
- # convenient faked way to invoke the callback. We set the db file
- # to a bogus one; the current implementation will create an unusable
- # data source client.
- self.__cc_session.auth_db_file = './notexistentdir/somedb.sqlite3'
- self.__cc_session._auth_config = \
- {'database_file': './notexistentdir/somedb.sqlite3'}
- self.__cc_session.add_remote_config_by_name('Auth')
- rrclass, datasrc_client = self.ddns_server._datasrc_info
- self.assertEqual(RRClass.IN, rrclass)
- self.assertRaises(isc.datasrc.Error,
- datasrc_client.find_zone, Name('example.org'))
- # Check the current info isn't changed if the new config doesn't
- # update it.
- info_orig = self.ddns_server._datasrc_info
- self.ddns_server._datasrc_info = 42 # dummy value, should be kept.
- self.__cc_session._auth_config = {'other_config': 'value'}
- self.__cc_session.add_remote_config_by_name('Auth')
- self.assertEqual(42, self.ddns_server._datasrc_info)
- self.ddns_server._datasrc_info = info_orig
- def test_secondary_zones_config(self):
- # By default it should be an empty list
- self.assertEqual(set(), self.ddns_server._secondary_zones)
- # emulating an update.
- self.__cc_session._zonemgr_config = {'secondary_zones': [
- {'name': TEST_ZONE_NAME_STR, 'class': TEST_RRCLASS_STR}]}
- self.__cc_session.add_remote_config_by_name('Zonemgr')
- # The new set of secondary zones should be stored.
- self.assertEqual({(TEST_ZONE_NAME, TEST_RRCLASS)},
- self.ddns_server._secondary_zones)
- # Similar to the above, but 'class' is unspecified. The default value
- # should be used.
- self.__cc_session._zonemgr_config = {'secondary_zones': [
- {'name': TEST_ZONE_NAME_STR}]}
- self.__cc_session.add_remote_config_by_name('Zonemgr')
- self.assertEqual({(TEST_ZONE_NAME, TEST_RRCLASS)},
- self.ddns_server._secondary_zones)
- # The given list has a duplicate. The resulting set should unify them.
- self.__cc_session._zonemgr_config = {'secondary_zones': [
- {'name': TEST_ZONE_NAME_STR, 'class': TEST_RRCLASS_STR},
- {'name': TEST_ZONE_NAME_STR, 'class': TEST_RRCLASS_STR}]}
- self.__cc_session.add_remote_config_by_name('Zonemgr')
- self.assertEqual({(TEST_ZONE_NAME, TEST_RRCLASS)},
- self.ddns_server._secondary_zones)
- # Check the 2ndary zones aren't changed if the new config doesn't
- # update it.
- seczones_orig = self.ddns_server._secondary_zones
- self.ddns_server._secondary_zones = 42 # dummy value, should be kept.
- self.__cc_session._zonemgr_config = {}
- self.__cc_session.add_remote_config_by_name('Zonemgr')
- self.assertEqual(42, self.ddns_server._secondary_zones)
- self.ddns_server._secondary_zones = seczones_orig
- # If the update config is broken, the existing set should be intact.
- self.__cc_session._zonemgr_config = {'secondary_zones': [
- {'name': 'good.example', 'class': TEST_RRCLASS_STR},
- {'name': 'badd..example', 'class': TEST_RRCLASS_STR}]}
- self.__cc_session.add_remote_config_by_name('Zonemgr')
- self.assertEqual({(TEST_ZONE_NAME, TEST_RRCLASS)},
- self.ddns_server._secondary_zones)
- def __check_remote_config_fail(self, mod_name, num_ex, expected_ex):
- '''Subroutine for remote_config_fail test.'''
- # fake pause function for inspection and to avoid having timeouts
- added_pause = []
- ddns.add_pause = lambda sec: added_pause.append(sec)
- # In our current implementation, there will be up to 3 tries of
- # adding the module, each separated by a 1-sec pause. If all attempts
- # fail the exception will be propagated.
- exceptions = [expected_ex for i in range(0, num_ex)]
- self.__cc_session._raise_mods = {mod_name: exceptions}
- if num_ex >= 3:
- self.assertRaises(expected_ex, ddns.DDNSServer, self.__cc_session)
- else:
- ddns.DDNSServer(self.__cc_session)
- self.assertEqual([1 for i in range(0, num_ex)], added_pause)
- def test_remote_config_fail(self):
- # If getting config of Auth or Zonemgr fails on construction of
- # DDNServer, it should result in an exception and a few times
- # of retries. We test all possible cases, changing the number of
- # raised exceptions and the type of exceptions that can happen,
- # which should also cover the fatal error case.
- for i in range(0, 4):
- self.__check_remote_config_fail('Auth', i, ModuleCCSessionError)
- self.__check_remote_config_fail('Auth', i, ModuleSpecError)
- self.__check_remote_config_fail('Zonemgr', i, ModuleCCSessionError)
- self.__check_remote_config_fail('Zonemgr', i, ModuleSpecError)
- def test_shutdown_command(self):
- '''Test whether the shutdown command works'''
- self.assertFalse(self.ddns_server._shutdown)
- answer = self.ddns_server.command_handler('shutdown', None)
- self.assertEqual((0, None), isc.config.parse_answer(answer))
- self.assertTrue(self.ddns_server._shutdown)
- def test_command_handler(self):
- '''Test some commands.'''
- # this command should not exist
- answer = self.ddns_server.command_handler('bad_command', None)
- self.assertEqual((1, 'Unknown command: bad_command'),
- isc.config.parse_answer(answer))
- def test_signal_handler(self):
- '''Test whether signal_handler calls shutdown()'''
- signal_handler = ddns.create_signal_handler(self.ddns_server)
- self.assertFalse(self.ddns_server._shutdown)
- signal_handler(None, None)
- self.assertTrue(self.ddns_server._shutdown)
- def __select(self, reads, writes, exceptions, timeout=None):
- """
- A fake select. It checks it was called with the correct parameters and
- returns a preset answer.
- If there's an exception stored in __select_exception, it is raised
- instead and the exception is cleared.
- """
- self.assertEqual(self.__select_expected, (reads, writes, exceptions,
- timeout))
- if self.__select_exception is not None:
- (self.__select_exception, exception) = (None,
- self.__select_exception)
- raise exception
- answer = self.__select_answer
- self.__select_answer = None
- self.ddns_server._shutdown = True
- return answer
- def __hook(self, param=None):
- """
- A hook that can be installed to any nullary or unary function and see
- if it was really called.
- """
- self.__hook_called = param
- def test_accept_called(self):
- """
- Test we call the accept function when a new connection comes.
- """
- self.ddns_server.accept = self.__hook
- self.__select_expected = ([1, 2], [], [], None)
- self.__select_answer = ([2], [], [])
- self.__hook_called = "Not called"
- self.ddns_server.run()
- self.assertTrue(self.ddns_server._shutdown)
- # The answer got used
- self.assertIsNone(self.__select_answer)
- # Reset, when called without parameter
- self.assertIsNone(self.__hook_called)
- def test_check_command_called(self):
- """
- Test the check_command is called when there's something on the
- socket.
- """
- self.__cc_session.check_command = self.__hook
- self.__select_expected = ([1, 2], [], [], None)
- self.__select_answer = ([1], [], [])
- self.ddns_server.run()
- self.assertTrue(self.ddns_server._shutdown)
- # The answer got used
- self.assertIsNone(self.__select_answer)
- # And the check_command was called with true parameter (eg.
- # non-blocking)
- self.assertTrue(self.__hook_called)
- def test_accept(self):
- """
- Test that we can accept a new connection.
- """
- # There's nothing before the accept
- ddns.isc.util.cio.socketsession.SocketSessionReceiver = \
- FakeSessionReceiver
- self.assertEqual({}, self.ddns_server._socksession_receivers)
- self.ddns_server.accept()
- # Now the new socket session receiver is stored in the dict
- # The 3 comes from _listen_socket.accept() - _listen_socket has
- # fileno 2 and accept returns socket with fileno increased by one.
- self.assertEqual([3],
- list(self.ddns_server._socksession_receivers.keys()))
- (socket, receiver) = self.ddns_server._socksession_receivers[3]
- self.assertTrue(isinstance(socket, FakeSocket))
- self.assertEqual(3, socket.fileno())
- self.assertTrue(isinstance(receiver, FakeSessionReceiver))
- self.assertEqual(socket, receiver.socket())
- def test_accept_fail(self):
- """
- Test we don't crash if an accept fails and that we don't modify the
- internals.
- """
- # Make the accept fail
- def accept_failure():
- raise socket.error(errno.ECONNABORTED)
- orig = self.ddns_server._listen_socket.accept
- self.ddns_server._listen_socket.accept = accept_failure
- self.assertEqual({}, self.ddns_server._socksession_receivers)
- # Doesn't raise the exception
- self.ddns_server.accept()
- # And nothing is stored
- self.assertEqual({}, self.ddns_server._socksession_receivers)
- # Now make the socket receiver fail
- self.ddns_server._listen_socket.accept = orig
- def receiver_failure(sock):
- raise isc.util.cio.socketsession.SocketSessionError('Test error')
- ddns.isc.util.cio.socketsession.SocketSessionReceiver = \
- receiver_failure
- # Doesn't raise the exception
- self.ddns_server.accept()
- # And nothing is stored
- self.assertEqual({}, self.ddns_server._socksession_receivers)
- # Check we don't catch everything, so raise just an exception
- def unexpected_failure(sock):
- raise Exception('Test error')
- ddns.isc.util.cio.socketsession.SocketSessionReceiver = \
- unexpected_failure
- # This one gets through
- self.assertRaises(Exception, self.ddns_server.accept)
- # Nothing is stored as well
- self.assertEqual({}, self.ddns_server._socksession_receivers)
- def test_session_called(self):
- """
- Test the run calls handle_session when there's something on the
- socket.
- """
- socket = FakeSocket(3)
- self.ddns_server._socksession_receivers = \
- {3: (socket, FakeSessionReceiver(socket))}
- self.ddns_server.handle_session = self.__hook
- self.__select_expected = ([1, 2, 3], [], [], None)
- self.__select_answer = ([3], [], [])
- self.ddns_server.run()
- self.assertTrue(self.ddns_server._shutdown)
- self.assertTrue(self.__cc_session._stopped)
- self.assertIsNone(self.__select_answer)
- self.assertEqual(3, self.__hook_called)
- def test_handle_session_ok(self):
- """
- Test the handle_session pops the receiver and calls handle_request
- when everything is OK.
- """
- socket = FakeSocket(3)
- receiver = FakeSessionReceiver(socket)
- # It doesn't really matter what data we use here, it is only passed
- # through the code
- param = (FakeSocket(4), ('127.0.0.1', 1234), ('127.0.0.1', 1235),
- 'Some data')
- def pop():
- return param
- # Prepare data into the receiver
- receiver.pop = pop
- self.ddns_server._socksession_receivers = {3: (socket, receiver)}
- self.ddns_server.handle_request = self.__hook
- # Call it
- self.ddns_server.handle_session(3)
- # The popped data are passed into the handle_request
- self.assertEqual(param, self.__hook_called)
- # The receivers are kept the same
- self.assertEqual({3: (socket, receiver)},
- self.ddns_server._socksession_receivers)
- def test_handle_session_fail(self):
- """
- Test the handle_session removes (and closes) the socket and receiver
- when the receiver complains.
- """
- socket = FakeSocket(3)
- receiver = FakeSessionReceiver(socket)
- def pop():
- raise isc.util.cio.socketsession.SocketSessionError('Test error')
- receiver.pop = pop
- socket.close = self.__hook
- self.__hook_called = False
- self.ddns_server._socksession_receivers = {3: (socket, receiver)}
- self.ddns_server.handle_session(3)
- # The "dead" receiver is removed
- self.assertEqual({}, self.ddns_server._socksession_receivers)
- # Close is called with no parameter, so the default None
- self.assertIsNone(self.__hook_called)
- def test_select_exception_ignored(self):
- """
- Test that the EINTR is ignored in select.
- """
- # Prepare the EINTR exception
- self.__select_exception = select.error(errno.EINTR)
- # We reuse the test here, as it should act the same. The exception
- # should just get ignored.
- self.test_check_command_called()
- def test_select_exception_fatal(self):
- """
- Test that other exceptions are fatal to the run.
- """
- # Prepare a different exception
- self.__select_exception = select.error(errno.EBADF)
- self.__select_expected = ([1, 2], [], [], None)
- self.assertRaises(select.error, self.ddns_server.run)
- def __send_select_tcp(self, buflen, raise_after_select=False):
- '''Common subroutine for some TCP related tests below.'''
- fileno = self.__tcp_sock.fileno()
- self.ddns_server._tcp_ctxs = {fileno: (self.__tcp_ctx, TEST_CLIENT6)}
- # make an initial, incomplete send via the test context
- self.__tcp_sock._send_buflen = buflen
- self.assertEqual(DNSTCPContext.SENDING,
- self.__tcp_ctx.send(self.__tcp_data))
- self.assertEqual(buflen, len(self.__tcp_sock._sent_data))
- # clear the socket "send buffer"
- self.__tcp_sock.make_send_ready()
- # if requested, set up exception
- self.__tcp_sock._raise_on_send = raise_after_select
- # Run select
- self.__select_expected = ([1, 2], [fileno], [], None)
- self.__select_answer = ([], [fileno], [])
- self.ddns_server.run()
- def test_select_send_continued(self):
- '''Test continuation of sending a TCP response.'''
- # Common setup, with the bufsize that would make it complete after a
- # single select call.
- self.__send_select_tcp(7)
- # Now the send should be completed. socket should be closed,
- # and the context should be removed from the server.
- self.assertEqual(14, len(self.__tcp_sock._sent_data))
- self.assertEqual(1, self.__tcp_sock._close_called)
- self.assertEqual(0, len(self.ddns_server._tcp_ctxs))
- def test_select_send_continued_twice(self):
- '''Test continuation of sending a TCP response, still continuing.'''
- # This is similar to the send_continued test, but the continued
- # operation still won't complete the send.
- self.__send_select_tcp(5)
- # Only 10 bytes should have been transmitted, socket is still open,
- # and the context is still in the server (that would require select
- # watch it again).
- self.assertEqual(10, len(self.__tcp_sock._sent_data))
- self.assertEqual(0, self.__tcp_sock._close_called)
- fileno = self.__tcp_sock.fileno()
- self.assertEqual(self.__tcp_ctx,
- self.ddns_server._tcp_ctxs[fileno][0])
- def test_select_send_continued_failed(self):
- '''Test continuation of sending a TCP response, which fails.'''
- # Let the socket raise an exception in the second call to send().
- self.__send_select_tcp(5, raise_after_select=True)
- # Only the data before select() have been transmitted, socket is
- # closed due to the failure, and the context is removed from the
- # server.
- self.assertEqual(5, len(self.__tcp_sock._sent_data))
- self.assertEqual(1, self.__tcp_sock._close_called)
- self.assertEqual(0, len(self.ddns_server._tcp_ctxs))
- def test_select_multi_tcp(self):
- '''Test continuation of sending a TCP response, multiple sockets.'''
- # Check if the implementation still works with multiple outstanding
- # TCP contexts. We use three (arbitrary choice), of which two will be
- # writable after select and complete the send.
- tcp_socks = []
- for i in range(0, 3):
- # Use faked FD of 100, 101, 102 (again, arbitrary choice)
- s = FakeSocket(100 + i, proto=socket.IPPROTO_TCP)
- ctx = DNSTCPContext(s)
- self.ddns_server._tcp_ctxs[s.fileno()] = (ctx, TEST_CLIENT6)
- s._send_buflen = 7 # make sure it requires two send's
- self.assertEqual(DNSTCPContext.SENDING, ctx.send(self.__tcp_data))
- s.make_send_ready()
- tcp_socks.append(s)
- self.__select_expected = ([1, 2], [100, 101, 102], [], None)
- self.__select_answer = ([], [100, 102], [])
- self.ddns_server.run()
- for i in [0, 2]:
- self.assertEqual(14, len(tcp_socks[i]._sent_data))
- self.assertEqual(1, tcp_socks[i]._close_called)
- self.assertEqual(1, len(self.ddns_server._tcp_ctxs))
- def test_select_bad_writefd(self):
- # There's no outstanding TCP context, but select somehow returns
- # writable FD. It should result in an uncaught exception, killing
- # the server. This is okay, because it shouldn't happen and should be
- # an internal bug.
- self.__select_expected = ([1, 2], [], [], None)
- self.__select_answer = ([], [10], [])
- self.assertRaises(KeyError, self.ddns_server.run)
- def create_msg(opcode=Opcode.UPDATE, zones=[TEST_ZONE_RECORD], prereq=[],
- tsigctx=None):
- msg = Message(Message.RENDER)
- msg.set_qid(TEST_QID)
- msg.set_opcode(opcode)
- msg.set_rcode(Rcode.NOERROR)
- for z in zones:
- msg.add_question(z)
- for p in prereq:
- msg.add_rrset(SECTION_PREREQUISITE, p)
- renderer = MessageRenderer()
- msg.to_wire(renderer, tsigctx)
- # re-read the created data in the parse mode
- msg.clear(Message.PARSE)
- msg.from_wire(renderer.get_data())
- return renderer.get_data()
- class TestDDNSSession(unittest.TestCase):
- def setUp(self):
- self.__cc_session = MyCCSession()
- self.assertFalse(self.__cc_session._started)
- self.orig_tsig_keyring = isc.server_common.tsig_keyring
- isc.server_common.tsig_keyring = FakeKeyringModule()
- self.server = ddns.DDNSServer(self.__cc_session)
- # Check that start_ddns_forwarder has been called upon
- # initialization (before we do anything else that might
- # cause messages to be sent)
- self.check_session_start_forwarder_called()
- self.server._UpdateSessionClass = self.__fake_session_creator
- self.__faked_result = UPDATE_SUCCESS # will be returned by fake session
- self.__sock = FakeSocket(-1)
- def tearDown(self):
- self.assertTrue(isc.server_common.tsig_keyring.initialized)
- isc.server_common.tsig_keyring = self.orig_tsig_keyring
- def __fake_session_creator(self, req_message, client_addr, zone_config):
- # remember the passed message for possible inspection later.
- self.__req_message = req_message
- return FakeUpdateSession(req_message, client_addr, zone_config,
- self.__faked_result)
- def check_update_response(self, resp_wire, expected_rcode=Rcode.NOERROR,
- tsig_ctx=None, tcp=False):
- '''Check if given wire data are valid form of update response.
- In this implementation, zone/prerequisite/update sections should be
- empty in responses.
- If tsig_ctx (isc.dns.TSIGContext) is not None, the response should
- be TSIG signed and the signature should be verifiable with the context
- that has signed the corresponding request.
- if tcp is True, the wire data are expected to be prepended with
- a 2-byte length field.
- '''
- if tcp:
- data_len = resp_wire[0] * 256 + resp_wire[1]
- resp_wire = resp_wire[2:]
- self.assertEqual(len(resp_wire), data_len)
- msg = Message(Message.PARSE)
- msg.from_wire(resp_wire)
- if tsig_ctx is not None:
- tsig_record = msg.get_tsig_record()
- self.assertNotEqual(None, tsig_record)
- self.assertEqual(TSIGError.NOERROR,
- tsig_ctx.verify(tsig_record, resp_wire))
- self.assertEqual(Opcode.UPDATE, msg.get_opcode())
- self.assertEqual(expected_rcode, msg.get_rcode())
- self.assertEqual(TEST_QID, msg.get_qid())
- for section in [SECTION_ZONE, SECTION_PREREQUISITE, SECTION_UPDATE]:
- self.assertEqual(0, msg.get_rr_count(section))
- def check_session(self, result=UPDATE_SUCCESS, ipv6=True, tsig_key=None):
- # reset test parameters
- self.__sock.clear()
- self.__faked_result = result
- server_addr = TEST_SERVER6 if ipv6 else TEST_SERVER4
- client_addr = TEST_CLIENT6 if ipv6 else TEST_CLIENT4
- tsig = TSIGContext(tsig_key) if tsig_key is not None else None
- rcode = Rcode.NOERROR if result == UPDATE_SUCCESS else Rcode.REFUSED
- has_response = (result != UPDATE_DROP)
- self.assertEqual(has_response,
- self.server.handle_request((self.__sock,
- server_addr, client_addr,
- create_msg(tsigctx=tsig))))
- if has_response:
- self.assertEqual(client_addr, self.__sock._sent_addr)
- self.check_update_response(self.__sock._sent_data, rcode)
- else:
- self.assertEqual((None, None), (self.__sock._sent_addr,
- self.__sock._sent_data))
- def test_handle_request(self):
- '''Basic request handling without any unexpected errors.'''
- # Success, without TSIG
- self.check_session()
- # Update will be refused with a response.
- self.check_session(UPDATE_ERROR, ipv6=False)
- # Update will be refused and dropped
- self.check_session(UPDATE_DROP)
- # Success, with TSIG
- self.check_session(ipv6=False, tsig_key=TEST_TSIG_KEY)
- # Update will be refused with a response, with TSIG.
- self.check_session(UPDATE_ERROR, tsig_key=TEST_TSIG_KEY)
- # Update will be refused and dropped, with TSIG (doesn't matter though)
- self.check_session(UPDATE_DROP, ipv6=False, tsig_key=TEST_TSIG_KEY)
- def test_broken_request(self):
- # Message data too short
- s = self.__sock
- self.assertFalse(self.server.handle_request((self.__sock, None,
- None, b'x' * 11)))
- self.assertEqual((None, None), (s._sent_data, s._sent_addr))
- # Opcode is not UPDATE
- self.assertFalse(self.server.handle_request(
- (self.__sock, None, None, create_msg(opcode=Opcode.QUERY))))
- self.assertEqual((None, None), (s._sent_data, s._sent_addr))
- # TSIG verification error. We use UPDATE_DROP to signal check_session
- # that no response should be given.
- self.check_session(result=UPDATE_DROP, ipv6=False,
- tsig_key=BAD_TSIG_KEY)
- def test_socket_error(self):
- # Have the faked socket raise an exception on sendto()
- self.__sock._raise_on_send = True
- # handle_request indicates the failure
- self.assertFalse(self.server.handle_request((self.__sock, TEST_SERVER6,
- TEST_CLIENT6,
- create_msg())))
- # this check ensures sendto() was really attempted.
- self.check_update_response(self.__sock._sent_data, Rcode.NOERROR)
- def test_tcp_request(self):
- # A simple case using TCP: all resopnse data are sent out at once.
- s = self.__sock
- s.proto = socket.IPPROTO_TCP
- self.assertTrue(self.server.handle_request((s, TEST_SERVER6,
- TEST_CLIENT6,
- create_msg())))
- self.check_update_response(s._sent_data, Rcode.NOERROR, tcp=True)
- # In the current implementation, the socket should be closed
- # immedidately after a successful send.
- self.assertEqual(1, s._close_called)
- # TCP context shouldn't be held in the server.
- self.assertEqual(0, len(self.server._tcp_ctxs))
- def test_tcp_request_incomplete(self):
- # set the size of the socket "send buffer" to a small value, which
- # should cause partial send.
- s = self.__sock
- s.proto = socket.IPPROTO_TCP
- s._send_buflen = 7
- # before this request there should be no outstanding TCP context.
- self.assertEqual(0, len(self.server._tcp_ctxs))
- self.assertTrue(self.server.handle_request((s, TEST_SERVER6,
- TEST_CLIENT6,
- create_msg())))
- # Only the part of data that fit the send buffer should be transmitted.
- self.assertEqual(s._send_buflen, len(s._sent_data))
- # the socket is not yet closed.
- self.assertEqual(0, s._close_called)
- # and a new context is stored in the server.
- self.assertEqual(1, len(self.server._tcp_ctxs))
- # clear the "send buffer" of the fake socket, and continue the send
- # by hand. The next attempt should complete the send, and the combined
- # data should be the expected response.
- s.make_send_ready()
- self.assertEqual(DNSTCPContext.SEND_DONE,
- self.server._tcp_ctxs[s.fileno()][0].send_ready())
- self.check_update_response(s._sent_data, Rcode.NOERROR, tcp=True)
- def test_tcp_request_error(self):
- # initial send() on the TCP socket will fail. The request handling
- # will be considered failure.
- s = self.__sock
- s.proto = socket.IPPROTO_TCP
- s._raise_on_send = True
- self.assertFalse(self.server.handle_request((s, TEST_SERVER6,
- TEST_CLIENT6,
- create_msg())))
- # the socket should have been closed.
- self.assertEqual(1, s._close_called)
- def test_tcp_request_quota(self):
- '''Test'''
- # Originally the TCP context map should be empty.
- self.assertEqual(0, len(self.server._tcp_ctxs))
- class FakeReceiver:
- '''Faked SessionReceiver, just returning given param by pop()'''
- def __init__(self, param):
- self.__param = param
- def pop(self):
- return self.__param
- def check_tcp_ok(fd, expect_grant):
- '''Supplemental checker to see if TCP request is handled.'''
- s = FakeSocket(fd, proto=socket.IPPROTO_TCP)
- s._send_buflen = 7
- self.server._socksession_receivers[s.fileno()] = \
- (None, FakeReceiver((s, TEST_SERVER6, TEST_CLIENT6,
- create_msg())))
- self.assertEqual(expect_grant,
- self.server.handle_session(s.fileno()))
- self.assertEqual(0 if expect_grant else 1, s._close_called)
- # By default up to 10 TCP clients can coexist (use hardcode
- # intentionally so we can test the default value itself)
- for i in range(0, 10):
- check_tcp_ok(i, True)
- self.assertEqual(10, len(self.server._tcp_ctxs))
- # Beyond that, it should be rejected (by reset)
- check_tcp_ok(11, False)
- # If we remove one context from the server, new client can go in again.
- self.server._tcp_ctxs.pop(5)
- check_tcp_ok(12, True)
- def test_request_message(self):
- '''Test if the request message stores RRs separately.'''
- # Specify 'drop' so the passed message won't be modified.
- self.__faked_result = UPDATE_DROP
- # Put the same RR twice in the prerequisite section. We should see
- # them as separate RRs.
- dummy_record = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.NS,
- RRTTL(0))
- dummy_record.add_rdata(Rdata(RRType.NS, TEST_RRCLASS, "ns.example."))
- self.server.handle_request((self.__sock, TEST_SERVER6, TEST_CLIENT6,
- create_msg(prereq=[dummy_record,
- dummy_record])))
- num_rrsets = len(self.__req_message.get_section(SECTION_PREREQUISITE))
- self.assertEqual(2, num_rrsets)
- def check_session_msg(self, result, expect_recv=2):
- '''Check post update communication with other modules.'''
- # iff the update succeeds, b10-ddns should tell interested other
- # modules the information about the update zone. Possible modules
- # are xfrout and auth: for xfrout, the message format should be:
- # {'command': ['notify', {'zone_name': <updated_zone_name>,
- # 'zone_class', <updated_zone_class>}]}
- # for auth, it should be:
- # {'command': ['loadzone', {'origin': <updated_zone_name>,
- # 'class', <updated_zone_class>}]}
- # and expect an answer by calling group_recvmsg().
- #
- # expect_recv indicates the expected number of calls to
- # group_recvmsg(), which is normally 2, but can be 0 if send fails;
- # if the message is to be sent
- if result == UPDATE_SUCCESS:
- expected_sentmsg = 2
- self.assertEqual(expected_sentmsg,
- len(self.__cc_session._sent_msg))
- self.assertEqual(expect_recv, self.__cc_session._recvmsg_called)
- msg_cnt = 0
- sent_msg, sent_group = self.__cc_session._sent_msg[msg_cnt]
- sent_cmd = sent_msg['command']
- self.assertEqual('Auth', sent_group)
- self.assertEqual('loadzone', sent_cmd[0])
- self.assertEqual(2, len(sent_cmd[1]))
- self.assertEqual(TEST_ZONE_NAME.to_text(),
- sent_cmd[1]['origin'])
- self.assertEqual(TEST_RRCLASS.to_text(),
- sent_cmd[1]['class'])
- msg_cnt += 1
- sent_msg, sent_group = self.__cc_session._sent_msg[msg_cnt]
- sent_cmd = sent_msg['command']
- self.assertEqual('Xfrout', sent_group)
- self.assertEqual('notify', sent_cmd[0])
- self.assertEqual(2, len(sent_cmd[1]))
- self.assertEqual(TEST_ZONE_NAME.to_text(), sent_cmd[1]['zone_name'])
- self.assertEqual(TEST_RRCLASS.to_text(), sent_cmd[1]['zone_class'])
- else:
- # for other result cases neither send nor recvmsg should be called.
- self.assertEqual([], self.__cc_session._sent_msg)
- self.assertEqual(0, self.__cc_session._recvmsg_called)
- def check_session_start_forwarder_called(self):
- '''Check that the command 'start_ddns_forwarder' has been called
- This test removes said message from the sent message queue.
- '''
- sent_msg, sent_group = self.__cc_session._sent_msg.pop(0)
- sent_cmd = sent_msg['command']
- self.assertEqual('Auth', sent_group)
- self.assertEqual('start_ddns_forwarder', sent_cmd[0])
- self.assertEqual(1, len(sent_cmd))
- self.assertEqual(1, self.__cc_session._recvmsg_called)
- # reset it for other tests
- self.__cc_session._recvmsg_called = 0
- def check_session_stop_forwarder_called(self):
- '''Check that the command 'stop_ddns_forwarder' has been called
- This test removes said message from the sent message queue.
- '''
- # check the last message sent
- sent_msg, sent_group = self.__cc_session._sent_msg.pop()
- sent_cmd = sent_msg['command']
- self.assertEqual('Auth', sent_group)
- self.assertEqual('stop_ddns_forwarder', sent_cmd[0])
- self.assertEqual(1, len(sent_cmd))
- def test_session_msg(self):
- '''Test post update communication with other modules.'''
- # Normal cases, confirming communication takes place iff update
- # succeeds
- for r in [UPDATE_SUCCESS, UPDATE_ERROR, UPDATE_DROP]:
- self.__cc_session.clear_msg()
- self.check_session(result=r)
- self.check_session_msg(r)
- # Return an error from the remote module, which should be just ignored.
- self.__cc_session.clear_msg()
- self.__cc_session._answer_code = 1
- self.check_session()
- self.check_session_msg(UPDATE_SUCCESS)
- # raise some exceptions from the faked session. Expected ones are
- # simply (logged and) ignored
- self.__cc_session.clear_msg()
- self.__cc_session._recvmsg_exception = SessionTimeout('dummy timeout')
- self.check_session()
- self.check_session_msg(UPDATE_SUCCESS)
- self.__cc_session.clear_msg()
- self.__cc_session._recvmsg_exception = SessionError('dummy error')
- self.check_session()
- self.check_session_msg(UPDATE_SUCCESS)
- self.__cc_session.clear_msg()
- self.__cc_session._recvmsg_exception = ProtocolError('dummy perror')
- self.check_session()
- self.check_session_msg(UPDATE_SUCCESS)
- # Similar to the previous cases, but sendmsg() raises, so there should
- # be no call to recvmsg().
- self.__cc_session.clear_msg()
- self.__cc_session._sendmsg_exception = SessionError('send error')
- self.check_session()
- self.check_session_msg(UPDATE_SUCCESS, expect_recv=0)
- # Unexpected exception will be propagated (and will terminate the
- # server)
- self.__cc_session.clear_msg()
- self.__cc_session._sendmsg_exception = RuntimeError('unexpected')
- self.assertRaises(RuntimeError, self.check_session)
- def test_session_shutdown_cleanup(self):
- '''Test that the stop forwarding message is sent'''
- self.server.shutdown_cleanup()
- self.check_session_stop_forwarder_called()
- def test_session_msg_for_auth(self):
- '''Test post update communication with other modules including Auth.'''
- # Let the CC session return in-memory config with sqlite3 backend.
- # (The default case was covered by other tests.)
- self.__cc_session.auth_datasources = \
- [{'type': 'memory', 'class': 'IN', 'zones': [
- {'origin': TEST_ZONE_NAME_STR, 'filetype': 'sqlite3'}]}]
- self.check_session()
- self.check_session_msg(UPDATE_SUCCESS)
- # Let sendmsg() raise an exception. The first exception shouldn't
- # stop sending the second message. There's just no recv calls.
- self.__cc_session.clear_msg()
- self.__cc_session._sendmsg_exception = SessionError('send error')
- self.check_session()
- self.check_session_msg(UPDATE_SUCCESS, expect_recv=0)
- # Likewise, in the case recvmsg() raises (and there should be recv
- # calls in this case)
- self.__cc_session.clear_msg()
- self.__cc_session._recvmsg_exception = SessionError('recv error')
- self.check_session()
- self.check_session_msg(UPDATE_SUCCESS)
- def test_session_with_config(self):
- '''Check a session with more realistic config setups.
- We don't have to explore various cases in detail in this test.
- We're just checking if the expected configured objects are passed
- to the session object.
- '''
- # reset the session class to the real one
- self.server._UpdateSessionClass = isc.ddns.session.UpdateSession
- # install all-drop ACL
- new_config = { 'zones': [ { 'origin': TEST_ZONE_NAME_STR,
- 'class': TEST_RRCLASS_STR,
- 'update_acl': [{'action': 'DROP'}] } ] }
- answer = self.server.config_handler(new_config)
- self.assertEqual((0, None), isc.config.parse_answer(answer))
- # check the result
- self.check_session(UPDATE_DROP)
- def test_session_start_stop_forwarder_failures(self):
- '''Check that we don't crash if the server reports an error
- setting up or closing down the DDNS UPDATE message forwarder,
- or if there is an exception from the message queue.'''
- self.__cc_session._answer_code = 1
- self.server._DDNSServer__notify_start_forwarder()
- self.server._DDNSServer__notify_stop_forwarder()
- for exc in [ SessionError("sessionerror"),
- SessionTimeout("sessiontimeout"),
- ProtocolError("protocolerror") ]:
- self.__cc_session._recvmsg_exception = exc
- self.server._DDNSServer__notify_start_forwarder()
- self.server._DDNSServer__notify_stop_forwarder()
- self.__cc_session._recvmsg_exception = None
- self.__cc_session._sendmsg_exception = exc
- self.server._DDNSServer__notify_start_forwarder()
- self.server._DDNSServer__notify_stop_forwarder()
- self.__cc_session._recvmsg_exception = None
- def test_session_auth_started(self):
- '''Check that 'start_ddns_forwarder' is sent (again) when the
- notification 'auth_started' is received'''
- # auth_started message should trigger it again
- answer = self.server.command_handler('auth_started', None)
- self.check_session_start_forwarder_called()
- class TestMain(unittest.TestCase):
- def setUp(self):
- self._server = MyDDNSServer()
- self.__orig_clear = ddns.clear_socket
- ddns.clear_socket = self.__clear_socket
- self.__clear_called = False
- def tearDown(self):
- ddns.clear_socket = self.__orig_clear
- def test_main(self):
- self.assertFalse(self._server.run_called)
- ddns.main(self._server)
- self.assertTrue(self._server.run_called)
- self.assertTrue(self.__clear_called)
- def __clear_socket(self):
- self.__clear_called = True
- # Get rid of the socket file too
- self.__orig_clear()
- def check_exception(self, ex):
- '''Common test sequence to see if the given exception is caused.
- '''
- # Should technically not be necessary, but reset server to be sure
- self._server.reset()
- self.assertFalse(self._server.exception_raised)
- self._server.set_exception(ex)
- ddns.main(self._server)
- self.assertTrue(self._server.exception_raised)
- def test_exceptions(self):
- '''
- Test whether exceptions are caught in main()
- These exceptions should not bubble up.
- '''
- self._server.set_exception(KeyboardInterrupt())
- self.assertFalse(self._server.exception_raised)
- ddns.main(self._server)
- self.assertTrue(self._server.exception_raised)
- self.check_exception(isc.cc.SessionError("error"))
- self.check_exception(isc.config.ModuleCCSessionError("error"))
- self.check_exception(ddns.DDNSConfigError("error"))
- self.check_exception(isc.cc.SessionTimeout("error"))
- # Add one that is not a subclass of Exception, and hence not
- # caught. Misuse BaseException for that.
- self._server.reset()
- self.assertFalse(self._server.exception_raised)
- self._server.set_exception(BaseException("error"))
- self.assertRaises(BaseException, ddns.main, self._server)
- self.assertTrue(self._server.exception_raised)
- class TestConfig(unittest.TestCase):
- '''Test some simple config related things that don't need server. '''
- def setUp(self):
- self.__ccsession = MyCCSession()
- def test_file_path(self):
- # Check some common paths
- self.assertEqual(os.environ["B10_FROM_BUILD"] + "/ddns_socket",
- ddns.SOCKET_FILE)
- self.assertEqual(os.environ["B10_FROM_SOURCE"] +
- "/src/bin/ddns/ddns.spec", ddns.SPECFILE_LOCATION)
- if __name__== "__main__":
- isc.log.resetUnitTestRootLogger()
- unittest.main()
|