# Copyright (C) 2011 Internet Systems Consortium. # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # Most of the time, we omit the "bind10_src" for brevity. Sometimes, # we want to be explicit about what we do, like when hijacking a library # call used by the bind10_src. from bind10_src import ProcessInfo, BoB, parse_args, dump_pid, unlink_pid_file, _BASETIME import bind10_src # XXX: environment tests are currently disabled, due to the preprocessor # setup that we have now complicating the environment import unittest import sys import os import os.path import copy import signal import socket from isc.net.addr import IPAddr import time import isc import isc.log import isc.bind10.socket_cache import errno import random from isc.testutils.parse_args import TestOptParser, OptsError from isc.testutils.ccsession_mock import MockModuleCCSession class TestProcessInfo(unittest.TestCase): def setUp(self): # redirect stdout to a pipe so we can check that our # process spawning is doing the right thing with stdout self.old_stdout = os.dup(sys.stdout.fileno()) self.pipes = os.pipe() os.dup2(self.pipes[1], sys.stdout.fileno()) os.close(self.pipes[1]) # note that we use dup2() to restore the original stdout # to the main program ASAP in each test... this prevents # hangs reading from the child process (as the pipe is only # open in the child), and also insures nice pretty output def tearDown(self): # clean up our stdout munging os.dup2(self.old_stdout, sys.stdout.fileno()) os.close(self.pipes[0]) def test_init(self): pi = ProcessInfo('Test Process', [ '/bin/echo', 'foo' ]) pi.spawn() os.dup2(self.old_stdout, sys.stdout.fileno()) self.assertEqual(pi.name, 'Test Process') self.assertEqual(pi.args, [ '/bin/echo', 'foo' ]) # self.assertEqual(pi.env, { 'PATH': os.environ['PATH'], # 'PYTHON_EXEC': os.environ['PYTHON_EXEC'] }) self.assertEqual(pi.dev_null_stdout, False) self.assertEqual(os.read(self.pipes[0], 100), b"foo\n") self.assertNotEqual(pi.process, None) self.assertTrue(type(pi.pid) is int) # def test_setting_env(self): # pi = ProcessInfo('Test Process', [ '/bin/true' ], env={'FOO': 'BAR'}) # os.dup2(self.old_stdout, sys.stdout.fileno()) # self.assertEqual(pi.env, { 'PATH': os.environ['PATH'], # 'PYTHON_EXEC': os.environ['PYTHON_EXEC'], # 'FOO': 'BAR' }) def test_setting_null_stdout(self): pi = ProcessInfo('Test Process', [ '/bin/echo', 'foo' ], dev_null_stdout=True) pi.spawn() os.dup2(self.old_stdout, sys.stdout.fileno()) self.assertEqual(pi.dev_null_stdout, True) self.assertEqual(os.read(self.pipes[0], 100), b"") def test_respawn(self): pi = ProcessInfo('Test Process', [ '/bin/echo', 'foo' ]) pi.spawn() # wait for old process to work... self.assertEqual(os.read(self.pipes[0], 100), b"foo\n") # respawn it old_pid = pi.pid pi.respawn() os.dup2(self.old_stdout, sys.stdout.fileno()) # make sure the new one started properly self.assertEqual(pi.name, 'Test Process') self.assertEqual(pi.args, [ '/bin/echo', 'foo' ]) # self.assertEqual(pi.env, { 'PATH': os.environ['PATH'], # 'PYTHON_EXEC': os.environ['PYTHON_EXEC'] }) self.assertEqual(pi.dev_null_stdout, False) self.assertEqual(os.read(self.pipes[0], 100), b"foo\n") self.assertNotEqual(pi.process, None) self.assertTrue(type(pi.pid) is int) self.assertNotEqual(pi.pid, old_pid) class TestCacheCommands(unittest.TestCase): """ Test methods of boss related to the socket cache and socket handling. """ def setUp(self): """ Prepare the boss for some tests. Also prepare some variables we need. """ self.__boss = BoB() # Fake the cache here so we can pretend it is us and hijack the # calls to its methods. self.__boss._socket_cache = self self.__boss._socket_path = '/socket/path' self.__raise_exception = None self.__socket_args = { "port": 53, "address": "::", "protocol": "UDP", "share_mode": "ANY", "share_name": "app" } # What was and wasn't called. self.__drop_app_called = None self.__get_socket_called = None self.__send_fd_called = None self.__get_token_called = None self.__drop_socket_called = None bind10_src.libutil_io_python.send_fd = self.__send_fd def __send_fd(self, to, socket): """ A function to hook the send_fd in the bind10_src. """ self.__send_fd_called = (to, socket) class FalseSocket: """ A socket where we can fake methods we need instead of having a real socket. """ def __init__(self): self.send = b"" def fileno(self): """ The file number. Used for identifying the remote application. """ return 42 def sendall(self, data): """ Adds data to the self.send. """ self.send += data def drop_application(self, application): """ Part of pretending to be the cache. Logs the parameter to self.__drop_app_called. In the case self.__raise_exception is set, the exception there is raised instead. """ if self.__raise_exception is not None: raise self.__raise_exception self.__drop_app_called = application def test_consumer_dead(self): """ Test that it calls the drop_application method of the cache. """ self.__boss.socket_consumer_dead(self.FalseSocket()) self.assertEqual(42, self.__drop_app_called) def test_consumer_dead_invalid(self): """ Test that it doesn't crash in case the application is not known to the cache, the boss doesn't crash, as this actually can happen in practice. """ self.__raise_exception = ValueError("This application is unknown") # This doesn't crash self.__boss.socket_consumer_dead(self.FalseSocket()) def get_socket(self, token, application): """ Part of pretending to be the cache. If there's anything in __raise_exception, it is raised. Otherwise, the call is logged into __get_socket_called and a number is returned. """ if self.__raise_exception is not None: raise self.__raise_exception self.__get_socket_called = (token, application) return 13 def test_request_handler(self): """ Test that a request for socket is forwarded and the socket is sent back, if it returns a socket. """ socket = self.FalseSocket() # An exception from the cache self.__raise_exception = ValueError("Test value error") self.__boss.socket_request_handler(b"token", socket) # It was called, but it threw, so it is not noted here self.assertIsNone(self.__get_socket_called) self.assertEqual(b"0\n", socket.send) # It should not have sent any socket. self.assertIsNone(self.__send_fd_called) # Now prepare a valid scenario self.__raise_exception = None socket.send = b"" self.__boss.socket_request_handler(b"token", socket) self.assertEqual(b"1\n", socket.send) self.assertEqual((42, 13), self.__send_fd_called) self.assertEqual(("token", 42), self.__get_socket_called) def get_token(self, protocol, address, port, share_mode, share_name): """ Part of pretending to be the cache. If there's anything in __raise_exception, it is raised. Otherwise, the parameters are logged into __get_token_called and a token is returned. """ if self.__raise_exception is not None: raise self.__raise_exception self.__get_token_called = (protocol, address, port, share_mode, share_name) return "token" def test_get_socket_ok(self): """ Test the successful scenario of getting a socket. """ result = self.__boss._get_socket(self.__socket_args) [code, answer] = result['result'] self.assertEqual(0, code) self.assertEqual({ 'token': 'token', 'path': '/socket/path' }, answer) addr = self.__get_token_called[1] self.assertTrue(isinstance(addr, IPAddr)) self.assertEqual("::", str(addr)) self.assertEqual(("UDP", addr, 53, "ANY", "app"), self.__get_token_called) def test_get_socket_error(self): """ Test that bad inputs are handled correctly, etc. """ def check_code(code, args): """ Pass the args there and check if it returns success or not. The rest is not tested, as it is already checked in the test_get_socket_ok. """ [rcode, ranswer] = self.__boss._get_socket(args)['result'] self.assertEqual(code, rcode) if code != 0: # This should be an error message. The exact formatting # is unknown, but we check it is string at least self.assertTrue(isinstance(ranswer, str)) def mod_args(name, value): """ Override a parameter in the args. """ result = dict(self.__socket_args) result[name] = value return result # Port too large check_code(1, mod_args('port', 65536)) # Not numeric address check_code(1, mod_args('address', 'example.org.')) # Some bad values of enum-like params check_code(1, mod_args('protocol', 'BAD PROTO')) check_code(1, mod_args('share_mode', 'BAD SHARE')) # Check missing parameters for param in self.__socket_args.keys(): args = dict(self.__socket_args) del args[param] check_code(1, args) # These are OK values for the enum-like parameters # The ones from test_get_socket_ok are not tested here check_code(0, mod_args('protocol', 'TCP')) check_code(0, mod_args('share_mode', 'SAMEAPP')) check_code(0, mod_args('share_mode', 'NO')) # If an exception is raised from within the cache, it is converted # to an error, not propagated self.__raise_exception = Exception("Test exception") check_code(1, self.__socket_args) # The special "expected" exceptions self.__raise_exception = \ isc.bind10.socket_cache.ShareError("Not shared") check_code(3, self.__socket_args) self.__raise_exception = \ isc.bind10.socket_cache.SocketError("Not shared", 13) check_code(2, self.__socket_args) def drop_socket(self, token): """ Part of pretending to be the cache. If there's anything in __raise_exception, it is raised. Otherwise, the parameter is stored in __drop_socket_called. """ if self.__raise_exception is not None: raise self.__raise_exception self.__drop_socket_called = token def test_drop_socket(self): """ Check the drop_socket command. It should directly call the method on the cache. Exceptions should be translated to error messages. """ # This should be OK and just propagated to the call. self.assertEqual({"result": [0]}, self.__boss.command_handler("drop_socket", {"token": "token"})) self.assertEqual("token", self.__drop_socket_called) self.__drop_socket_called = None # Missing parameter self.assertEqual({"result": [1, "Missing token parameter"]}, self.__boss.command_handler("drop_socket", {})) self.assertIsNone(self.__drop_socket_called) # An exception is raised from within the cache self.__raise_exception = ValueError("Test error") self.assertEqual({"result": [1, "Test error"]}, self.__boss.command_handler("drop_socket", {"token": "token"})) class TestBoB(unittest.TestCase): def test_init(self): bob = BoB() self.assertEqual(bob.verbose, False) self.assertEqual(bob.msgq_socket_file, None) self.assertEqual(bob.cc_session, None) self.assertEqual(bob.ccs, None) self.assertEqual(bob.components, {}) self.assertEqual(bob.runnable, False) self.assertEqual(bob.uid, None) self.assertEqual(bob.username, None) self.assertIsNone(bob._socket_cache) def test_set_creator(self): """ Test the call to set_creator. First time, the cache is created with the passed creator. The next time, it throws an exception. """ bob = BoB() # The cache doesn't use it at start, so just create an empty class class Creator: pass creator = Creator() bob.set_creator(creator) self.assertTrue(isinstance(bob._socket_cache, isc.bind10.socket_cache.Cache)) self.assertEqual(creator, bob._socket_cache._creator) self.assertRaises(ValueError, bob.set_creator, creator) def test_socket_srv(self): """Tests init_socket_srv() and remove_socket_srv() work as expected.""" bob = BoB() self.assertIsNone(bob._srv_socket) self.assertIsNone(bob._tmpdir) self.assertIsNone(bob._socket_path) bob.init_socket_srv() self.assertIsNotNone(bob._srv_socket) self.assertNotEqual(-1, bob._srv_socket.fileno()) self.assertEqual(os.path.join(bob._tmpdir, 'sockcreator'), bob._srv_socket.getsockname()) self.assertIsNotNone(bob._tmpdir) self.assertTrue(os.path.isdir(bob._tmpdir)) self.assertIsNotNone(bob._socket_path) self.assertTrue(os.path.exists(bob._socket_path)) # Check that it's possible to connect to the socket file (this # only works if the socket file exists and the server listens on # it). s = socket.socket(socket.AF_UNIX) try: s.connect(bob._socket_path) can_connect = True s.close() except socket.error as e: can_connect = False self.assertTrue(can_connect) bob.remove_socket_srv() self.assertEqual(-1, bob._srv_socket.fileno()) self.assertFalse(os.path.exists(bob._socket_path)) self.assertFalse(os.path.isdir(bob._tmpdir)) # These should not fail either: # second call bob.remove_socket_srv() bob._srv_socket = None bob.remove_socket_srv() def test_init_alternate_socket(self): bob = BoB("alt_socket_file") self.assertEqual(bob.verbose, False) self.assertEqual(bob.msgq_socket_file, "alt_socket_file") self.assertEqual(bob.cc_session, None) self.assertEqual(bob.ccs, None) self.assertEqual(bob.components, {}) self.assertEqual(bob.runnable, False) self.assertEqual(bob.uid, None) self.assertEqual(bob.username, None) def test_command_handler(self): class DummySession(): def group_sendmsg(self, msg, group): (self.msg, self.group) = (msg, group) def group_recvmsg(self, nonblock, seq): pass class DummyModuleCCSession(): module_spec = isc.config.module_spec.ModuleSpec({ "module_name": "Boss", "statistics": [ { "item_name": "boot_time", "item_type": "string", "item_optional": False, "item_default": "1970-01-01T00:00:00Z", "item_title": "Boot time", "item_description": "A date time when bind10 process starts initially", "item_format": "date-time" } ] }) def get_module_spec(self): return self.module_spec bob = BoB() bob.verbose = True bob.cc_session = DummySession() bob.ccs = DummyModuleCCSession() # a bad command self.assertEqual(bob.command_handler(-1, None), isc.config.ccsession.create_answer(1, "bad command")) # "shutdown" command self.assertEqual(bob.command_handler("shutdown", None), isc.config.ccsession.create_answer(0)) self.assertFalse(bob.runnable) # "getstats" command self.assertEqual(bob.command_handler("getstats", None), isc.config.ccsession.create_answer(0, { 'boot_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', _BASETIME) })) # "ping" command self.assertEqual(bob.command_handler("ping", None), isc.config.ccsession.create_answer(0, "pong")) # "show_processes" command self.assertEqual(bob.command_handler("show_processes", None), isc.config.ccsession.create_answer(0, bob.get_processes())) # an unknown command self.assertEqual(bob.command_handler("__UNKNOWN__", None), isc.config.ccsession.create_answer(1, "Unknown command")) # Fake the get_token of cache and test the command works bob._socket_path = '/socket/path' class cache: def get_token(self, protocol, addr, port, share_mode, share_name): return str(addr) + ':' + str(port) bob._socket_cache = cache() args = { "port": 53, "address": "0.0.0.0", "protocol": "UDP", "share_mode": "ANY", "share_name": "app" } # at all and this is the easiest way to check. self.assertEqual({'result': [0, {'token': '0.0.0.0:53', 'path': '/socket/path'}]}, bob.command_handler("get_socket", args)) # The drop_socket is not tested here, but in TestCacheCommands. # It needs the cache mocks to be in place and they are there. def test_stop_process(self): """ Test checking the stop_process method sends the right message over the message bus. """ class DummySession(): def group_sendmsg(self, msg, group, instance="*"): (self.msg, self.group, self.instance) = (msg, group, instance) bob = BoB() bob.cc_session = DummySession() bob.stop_process('process', 'address', 42) self.assertEqual('address', bob.cc_session.group) self.assertEqual('address', bob.cc_session.instance) self.assertEqual({'command': ['shutdown', {'pid': 42}]}, bob.cc_session.msg) # Mock class for testing BoB's usage of ProcessInfo class MockProcessInfo: def __init__(self, name, args, env={}, dev_null_stdout=False, dev_null_stderr=False): self.name = name self.args = args self.env = env self.dev_null_stdout = dev_null_stdout self.dev_null_stderr = dev_null_stderr self.process = None self.pid = None def spawn(self): # set some pid (only used for testing that it is not None anymore) self.pid = 42147 # Class for testing the BoB without actually starting processes. # This is used for testing the start/stop components routines and # the BoB commands. # # Testing that external processes start is outside the scope # of the unit test, by overriding the process start methods we can check # that the right processes are started depending on the configuration # options. class MockBob(BoB): def __init__(self): BoB.__init__(self) # Set flags as to which of the overridden methods has been run. self.msgq = False self.cfgmgr = False self.ccsession = False self.auth = False self.resolver = False self.xfrout = False self.xfrin = False self.zonemgr = False self.stats = False self.stats_httpd = False self.cmdctl = False self.dhcp6 = False self.dhcp4 = False self.c_channel_env = {} self.components = { } self.creator = False self.get_process_exit_status_called = False class MockSockCreator(isc.bind10.component.Component): def __init__(self, process, boss, kind, address=None, params=None): isc.bind10.component.Component.__init__(self, process, boss, kind, 'SockCreator') self._start_func = boss.start_creator specials = isc.bind10.special_component.get_specials() specials['sockcreator'] = MockSockCreator self._component_configurator = \ isc.bind10.component.Configurator(self, specials) def start_creator(self): self.creator = True procinfo = ProcessInfo('b10-sockcreator', ['/bin/false']) procinfo.pid = 1 return procinfo def _read_bind10_config(self): # Configuration options are set directly pass def start_msgq(self): self.msgq = True procinfo = ProcessInfo('b10-msgq', ['/bin/false']) procinfo.pid = 2 return procinfo def start_ccsession(self, c_channel_env): # this is not a process, don't have to do anything with procinfo self.ccsession = True def start_cfgmgr(self): self.cfgmgr = True procinfo = ProcessInfo('b10-cfgmgr', ['/bin/false']) procinfo.pid = 3 return procinfo def start_auth(self): self.auth = True procinfo = ProcessInfo('b10-auth', ['/bin/false']) procinfo.pid = 5 return procinfo def start_resolver(self): self.resolver = True procinfo = ProcessInfo('b10-resolver', ['/bin/false']) procinfo.pid = 6 return procinfo def start_simple(self, name): procmap = { 'b10-zonemgr': self.start_zonemgr, 'b10-stats': self.start_stats, 'b10-stats-httpd': self.start_stats_httpd, 'b10-cmdctl': self.start_cmdctl, 'b10-dhcp6': self.start_dhcp6, 'b10-dhcp4': self.start_dhcp4, 'b10-xfrin': self.start_xfrin, 'b10-xfrout': self.start_xfrout } return procmap[name]() def start_xfrout(self): self.xfrout = True procinfo = ProcessInfo('b10-xfrout', ['/bin/false']) procinfo.pid = 7 return procinfo def start_xfrin(self): self.xfrin = True procinfo = ProcessInfo('b10-xfrin', ['/bin/false']) procinfo.pid = 8 return procinfo def start_zonemgr(self): self.zonemgr = True procinfo = ProcessInfo('b10-zonemgr', ['/bin/false']) procinfo.pid = 9 return procinfo def start_stats(self): self.stats = True procinfo = ProcessInfo('b10-stats', ['/bin/false']) procinfo.pid = 10 return procinfo def start_stats_httpd(self): self.stats_httpd = True procinfo = ProcessInfo('b10-stats-httpd', ['/bin/false']) procinfo.pid = 11 return procinfo def start_cmdctl(self): self.cmdctl = True procinfo = ProcessInfo('b10-cmdctl', ['/bin/false']) procinfo.pid = 12 return procinfo def start_dhcp6(self): self.dhcp6 = True procinfo = ProcessInfo('b10-dhcp6', ['/bin/false']) procinfo.pid = 13 return procinfo def start_dhcp4(self): self.dhcp4 = True procinfo = ProcessInfo('b10-dhcp4', ['/bin/false']) procinfo.pid = 14 return procinfo def stop_process(self, process, recipient, pid): procmap = { 'b10-auth': self.stop_auth, 'b10-resolver': self.stop_resolver, 'b10-xfrout': self.stop_xfrout, 'b10-xfrin': self.stop_xfrin, 'b10-zonemgr': self.stop_zonemgr, 'b10-stats': self.stop_stats, 'b10-stats-httpd': self.stop_stats_httpd, 'b10-cmdctl': self.stop_cmdctl } procmap[process]() # Some functions to pretend we stop processes, use by stop_process def stop_msgq(self): if self.msgq: del self.components[2] self.msgq = False def stop_cfgmgr(self): if self.cfgmgr: del self.components[3] self.cfgmgr = False def stop_auth(self): if self.auth: del self.components[5] self.auth = False def stop_resolver(self): if self.resolver: del self.components[6] self.resolver = False def stop_xfrout(self): if self.xfrout: del self.components[7] self.xfrout = False def stop_xfrin(self): if self.xfrin: del self.components[8] self.xfrin = False def stop_zonemgr(self): if self.zonemgr: del self.components[9] self.zonemgr = False def stop_stats(self): if self.stats: del self.components[10] self.stats = False def stop_stats_httpd(self): if self.stats_httpd: del self.components[11] self.stats_httpd = False def stop_cmdctl(self): if self.cmdctl: del self.components[12] self.cmdctl = False def _get_process_exit_status(self): if self.get_process_exit_status_called: return (0, 0) self.get_process_exit_status_called = True return (53, 0) def _get_process_exit_status_unknown_pid(self): if self.get_process_exit_status_called: return (0, 0) self.get_process_exit_status_called = True return (42, 0) def _get_process_exit_status_raises_oserror_echild(self): raise OSError(errno.ECHILD, 'Mock error') def _get_process_exit_status_raises_oserror_other(self): raise OSError(0, 'Mock error') def _get_process_exit_status_raises_other(self): raise Exception('Mock error') def _make_mock_process_info(self, name, args, c_channel_env, dev_null_stdout=False, dev_null_stderr=False): return MockProcessInfo(name, args, c_channel_env, dev_null_stdout, dev_null_stderr) class MockBobSimple(BoB): def __init__(self): BoB.__init__(self) # Set which process has been started self.started_process_name = None self.started_process_args = None def _make_mock_process_info(self, name, args, c_channel_env, dev_null_stdout=False, dev_null_stderr=False): return MockProcessInfo(name, args, c_channel_env, dev_null_stdout, dev_null_stderr) def start_process(self, name, args, c_channel_env, port=None, address=None): self.started_process_name = name self.started_process_args = args return None class TestStartStopProcessesBob(unittest.TestCase): """ Check that the start_all_components method starts the right combination of components and that the right components are started and stopped according to changes in configuration. """ def check_environment_unchanged(self): # Check whether the environment has not been changed self.assertEqual(original_os_environ, os.environ) def check_started(self, bob, core, auth, resolver): """ Check that the right sets of services are started. The ones that should be running are specified by the core, auth and resolver parameters (they are groups of processes, eg. auth means b10-auth, -xfrout, -xfrin and -zonemgr). """ self.assertEqual(bob.msgq, core) self.assertEqual(bob.cfgmgr, core) self.assertEqual(bob.ccsession, core) self.assertEqual(bob.creator, core) self.assertEqual(bob.auth, auth) self.assertEqual(bob.resolver, resolver) self.assertEqual(bob.xfrout, auth) self.assertEqual(bob.xfrin, auth) self.assertEqual(bob.zonemgr, auth) self.assertEqual(bob.stats, core) self.assertEqual(bob.stats_httpd, core) self.assertEqual(bob.cmdctl, core) self.check_environment_unchanged() def check_preconditions(self, bob): self.check_started(bob, False, False, False) def check_started_none(self, bob): """ Check that the situation is according to configuration where no servers should be started. Some components still need to be running. """ self.check_started(bob, True, False, False) self.check_environment_unchanged() def check_started_both(self, bob): """ Check the situation is according to configuration where both servers (auth and resolver) are enabled. """ self.check_started(bob, True, True, True) self.check_environment_unchanged() def check_started_auth(self, bob): """ Check the set of components needed to run auth only is started. """ self.check_started(bob, True, True, False) self.check_environment_unchanged() def check_started_resolver(self, bob): """ Check the set of components needed to run resolver only is started. """ self.check_started(bob, True, False, True) self.check_environment_unchanged() def check_started_dhcp(self, bob, v4, v6): """ Check if proper combinations of DHCPv4 and DHCpv6 can be started """ self.assertEqual(v4, bob.dhcp4) self.assertEqual(v6, bob.dhcp6) self.check_environment_unchanged() def construct_config(self, start_auth, start_resolver): # The things that are common, not turned on an off config = {} config['b10-stats'] = { 'kind': 'dispensable', 'address': 'Stats' } config['b10-stats-httpd'] = { 'kind': 'dispensable', 'address': 'StatsHttpd' } config['b10-cmdctl'] = { 'kind': 'needed', 'special': 'cmdctl' } if start_auth: config['b10-auth'] = { 'kind': 'needed', 'special': 'auth' } config['b10-xfrout'] = { 'kind': 'dispensable', 'address': 'Xfrout' } config['b10-xfrin'] = { 'kind': 'dispensable', 'address': 'Xfrin' } config['b10-zonemgr'] = { 'kind': 'dispensable', 'address': 'Zonemgr' } if start_resolver: config['b10-resolver'] = { 'kind': 'needed', 'special': 'resolver' } return {'components': config} def config_start_init(self, start_auth, start_resolver): """ Test the configuration is loaded at the startup. """ bob = MockBob() config = self.construct_config(start_auth, start_resolver) class CC: def get_full_config(self): return config # Provide the fake CC with data bob.ccs = CC() # And make sure it's not overwritten def start_ccsession(): bob.ccsession = True bob.start_ccsession = lambda _: start_ccsession() # We need to return the original _read_bind10_config bob._read_bind10_config = lambda: BoB._read_bind10_config(bob) bob.start_all_components() self.check_started(bob, True, start_auth, start_resolver) self.check_environment_unchanged() def test_start_none(self): self.config_start_init(False, False) def test_start_resolver(self): self.config_start_init(False, True) def test_start_auth(self): self.config_start_init(True, False) def test_start_both(self): self.config_start_init(True, True) def test_config_start(self): """ Test that the configuration starts and stops components according to configuration changes. """ # Create BoB and ensure correct initialization bob = MockBob() self.check_preconditions(bob) bob.start_all_components() bob.runnable = True bob.config_handler(self.construct_config(False, False)) self.check_started_none(bob) # Enable both at once bob.config_handler(self.construct_config(True, True)) self.check_started_both(bob) # Not touched by empty change bob.config_handler({}) self.check_started_both(bob) # Not touched by change to the same configuration bob.config_handler(self.construct_config(True, True)) self.check_started_both(bob) # Turn them both off again bob.config_handler(self.construct_config(False, False)) self.check_started_none(bob) # Not touched by empty change bob.config_handler({}) self.check_started_none(bob) # Not touched by change to the same configuration bob.config_handler(self.construct_config(False, False)) self.check_started_none(bob) # Start and stop auth separately bob.config_handler(self.construct_config(True, False)) self.check_started_auth(bob) bob.config_handler(self.construct_config(False, False)) self.check_started_none(bob) # Start and stop resolver separately bob.config_handler(self.construct_config(False, True)) self.check_started_resolver(bob) bob.config_handler(self.construct_config(False, False)) self.check_started_none(bob) # Alternate bob.config_handler(self.construct_config(True, False)) self.check_started_auth(bob) bob.config_handler(self.construct_config(False, True)) self.check_started_resolver(bob) bob.config_handler(self.construct_config(True, False)) self.check_started_auth(bob) def test_config_start_once(self): """ Tests that a component is started only once. """ # Create BoB and ensure correct initialization bob = MockBob() self.check_preconditions(bob) bob.start_all_components() bob.runnable = True bob.config_handler(self.construct_config(True, True)) self.check_started_both(bob) bob.start_auth = lambda: self.fail("Started auth again") bob.start_xfrout = lambda: self.fail("Started xfrout again") bob.start_xfrin = lambda: self.fail("Started xfrin again") bob.start_zonemgr = lambda: self.fail("Started zonemgr again") bob.start_resolver = lambda: self.fail("Started resolver again") # Send again we want to start them. Should not do it, as they are. bob.config_handler(self.construct_config(True, True)) def test_config_not_started_early(self): """ Test that components are not started by the config handler before startup. """ bob = MockBob() self.check_preconditions(bob) bob.start_auth = lambda: self.fail("Started auth again") bob.start_xfrout = lambda: self.fail("Started xfrout again") bob.start_xfrin = lambda: self.fail("Started xfrin again") bob.start_zonemgr = lambda: self.fail("Started zonemgr again") bob.start_resolver = lambda: self.fail("Started resolver again") bob.config_handler({'start_auth': True, 'start_resolver': True}) # Checks that DHCP (v4 and v6) components are started when expected def test_start_dhcp(self): # Create BoB and ensure correct initialization bob = MockBob() self.check_preconditions(bob) bob.start_all_components() bob.config_handler(self.construct_config(False, False)) self.check_started_dhcp(bob, False, False) def test_start_dhcp_v6only(self): # Create BoB and ensure correct initialization bob = MockBob() self.check_preconditions(bob) # v6 only enabled bob.start_all_components() bob.runnable = True bob._BoB_started = True config = self.construct_config(False, False) config['components']['b10-dhcp6'] = { 'kind': 'needed', 'address': 'Dhcp6' } bob.config_handler(config) self.check_started_dhcp(bob, False, True) # uncomment when dhcpv4 becomes implemented # v4 only enabled #bob.cfg_start_dhcp6 = False #bob.cfg_start_dhcp4 = True #self.check_started_dhcp(bob, True, False) # both v4 and v6 enabled #bob.cfg_start_dhcp6 = True #bob.cfg_start_dhcp4 = True #self.check_started_dhcp(bob, True, True) class MockComponent: def __init__(self, name, pid, address=None): self.name = lambda: name self.pid = lambda: pid self.address = lambda: address self.restarted = False self.forceful = False self.running = True self.has_failed = False def get_restart_time(self): return 0 # arbitrary dummy value def restart(self, now): self.restarted = True return True def is_running(self): return self.running def failed(self, status): return self.has_failed def kill(self, forceful): self.forceful = forceful class TestBossCmd(unittest.TestCase): def test_ping(self): """ Confirm simple ping command works. """ bob = MockBob() answer = bob.command_handler("ping", None) self.assertEqual(answer, {'result': [0, 'pong']}) def test_show_processes_empty(self): """ Confirm getting a list of processes works. """ bob = MockBob() answer = bob.command_handler("show_processes", None) self.assertEqual(answer, {'result': [0, []]}) def test_show_processes(self): """ Confirm getting a list of processes works. """ bob = MockBob() bob.register_process(1, MockComponent('first', 1)) bob.register_process(2, MockComponent('second', 2, 'Second')) answer = bob.command_handler("show_processes", None) processes = [[1, 'first', None], [2, 'second', 'Second']] self.assertEqual(answer, {'result': [0, processes]}) class TestParseArgs(unittest.TestCase): """ This tests parsing of arguments of the bind10 master process. """ #TODO: Write tests for the original parsing, bad options, etc. def test_no_opts(self): """ Test correct default values when no options are passed. """ options = parse_args([], TestOptParser) self.assertEqual(None, options.data_path) self.assertEqual(None, options.config_file) self.assertEqual(None, options.cmdctl_port) def test_data_path(self): """ Test it can parse the data path. """ self.assertRaises(OptsError, parse_args, ['-p'], TestOptParser) self.assertRaises(OptsError, parse_args, ['--data-path'], TestOptParser) options = parse_args(['-p', '/data/path'], TestOptParser) self.assertEqual('/data/path', options.data_path) options = parse_args(['--data-path=/data/path'], TestOptParser) self.assertEqual('/data/path', options.data_path) def test_config_filename(self): """ Test it can parse the config switch. """ self.assertRaises(OptsError, parse_args, ['-c'], TestOptParser) self.assertRaises(OptsError, parse_args, ['--config-file'], TestOptParser) options = parse_args(['-c', 'config-file'], TestOptParser) self.assertEqual('config-file', options.config_file) options = parse_args(['--config-file=config-file'], TestOptParser) self.assertEqual('config-file', options.config_file) def test_clear_config(self): options = parse_args([], TestOptParser) self.assertEqual(False, options.clear_config) options = parse_args(['--clear-config'], TestOptParser) self.assertEqual(True, options.clear_config) def test_nokill(self): options = parse_args([], TestOptParser) self.assertEqual(False, options.nokill) options = parse_args(['--no-kill'], TestOptParser) self.assertEqual(True, options.nokill) options = parse_args([], TestOptParser) self.assertEqual(False, options.nokill) options = parse_args(['-i'], TestOptParser) self.assertEqual(True, options.nokill) def test_cmdctl_port(self): """ Test it can parse the command control port. """ self.assertRaises(OptsError, parse_args, ['--cmdctl-port=abc'], TestOptParser) self.assertRaises(OptsError, parse_args, ['--cmdctl-port=100000000'], TestOptParser) self.assertRaises(OptsError, parse_args, ['--cmdctl-port'], TestOptParser) options = parse_args(['--cmdctl-port=1234'], TestOptParser) self.assertEqual(1234, options.cmdctl_port) class TestPIDFile(unittest.TestCase): def setUp(self): self.pid_file = '@builddir@' + os.sep + 'bind10.pid' if os.path.exists(self.pid_file): os.unlink(self.pid_file) def tearDown(self): if os.path.exists(self.pid_file): os.unlink(self.pid_file) def check_pid_file(self): # dump PID to the file, and confirm the content is correct dump_pid(self.pid_file) my_pid = os.getpid() with open(self.pid_file, "r") as f: self.assertEqual(my_pid, int(f.read())) def test_dump_pid(self): self.check_pid_file() # make sure any existing content will be removed with open(self.pid_file, "w") as f: f.write('dummy data\n') self.check_pid_file() def test_unlink_pid_file_notexist(self): dummy_data = 'dummy_data\n' with open(self.pid_file, "w") as f: f.write(dummy_data) unlink_pid_file("no_such_pid_file") # the file specified for unlink_pid_file doesn't exist, # and the original content of the file should be intact. with open(self.pid_file, "r") as f: self.assertEqual(dummy_data, f.read()) def test_dump_pid_with_none(self): # Check the behavior of dump_pid() and unlink_pid_file() with None. # This should be no-op. dump_pid(None) self.assertFalse(os.path.exists(self.pid_file)) dummy_data = 'dummy_data\n' with open(self.pid_file, "w") as f: f.write(dummy_data) unlink_pid_file(None) with open(self.pid_file, "r") as f: self.assertEqual(dummy_data, f.read()) def test_dump_pid_failure(self): # the attempt to open file will fail, which should result in exception. self.assertRaises(IOError, dump_pid, 'nonexistent_dir' + os.sep + 'bind10.pid') class TestBossComponents(unittest.TestCase): """ Test the boss propagates component configuration properly to the component configurator and acts sane. """ def setUp(self): self.__param = None self.__called = False self.__compconfig = { 'comp': { 'kind': 'needed', 'process': 'cat' } } def __unary_hook(self, param): """ A hook function that stores the parameter for later examination. """ self.__param = param def __nullary_hook(self): """ A hook function that notes down it was called. """ self.__called = True def __check_core(self, config): """ A function checking that the config contains parts for the valid core component configuration. """ self.assertIsNotNone(config) for component in ['sockcreator', 'msgq', 'cfgmgr']: self.assertTrue(component in config) self.assertEqual(component, config[component]['special']) self.assertEqual('core', config[component]['kind']) def __check_extended(self, config): """ This checks that the config contains the core and one more component. """ self.__check_core(config) self.assertTrue('comp' in config) self.assertEqual('cat', config['comp']['process']) self.assertEqual('needed', config['comp']['kind']) self.assertEqual(4, len(config)) def test_correct_run(self): """ Test the situation when we run in usual scenario, nothing fails, we just start, reconfigure and then stop peacefully. """ bob = MockBob() # Start it orig = bob._component_configurator.startup bob._component_configurator.startup = self.__unary_hook bob.start_all_components() bob._component_configurator.startup = orig self.__check_core(self.__param) self.assertEqual(3, len(self.__param)) # Reconfigure it self.__param = None orig = bob._component_configurator.reconfigure bob._component_configurator.reconfigure = self.__unary_hook # Otherwise it does not work bob.runnable = True bob.config_handler({'components': self.__compconfig}) self.__check_extended(self.__param) currconfig = self.__param # If we reconfigure it, but it does not contain the components part, # nothing is called bob.config_handler({}) self.assertEqual(self.__param, currconfig) self.__param = None bob._component_configurator.reconfigure = orig # Check a configuration that messes up the core components is rejected. compconf = dict(self.__compconfig) compconf['msgq'] = { 'process': 'echo' } result = bob.config_handler({'components': compconf}) # Check it rejected it self.assertEqual(1, result['result'][0]) # We can't call shutdown, that one relies on the stuff in main # We check somewhere else that the shutdown is actually called # from there (the test_kills). def __real_test_kill(self, nokill=False, ex_on_kill=None): """ Helper function that does the actual kill functionality testing. """ bob = MockBob() bob.nokill = nokill killed = [] class ImmortalComponent: """ An immortal component. It does not stop when it is told so (anyway it is not told so). It does not die if it is killed the first time. It dies only when killed forcefully. """ def __init__(self): # number of kill() calls, preventing infinite loop. self.__call_count = 0 def kill(self, forceful=False): self.__call_count += 1 if self.__call_count > 2: raise Exception('Too many calls to ImmortalComponent.kill') killed.append(forceful) if ex_on_kill is not None: # If exception is given by the test, raise it here. # In the case of ESRCH, the process should have gone # somehow, so we clear the components. if ex_on_kill.errno == errno.ESRCH: bob.components = {} raise ex_on_kill if forceful: bob.components = {} def pid(self): return 1 def name(self): return "Immortal" bob.components = {} bob.register_process(1, ImmortalComponent()) # While at it, we check the configurator shutdown is actually called orig = bob._component_configurator.shutdown bob._component_configurator.shutdown = self.__nullary_hook self.__called = False bob.ccs = MockModuleCCSession() self.assertFalse(bob.ccs.stopped) bob.shutdown() self.assertTrue(bob.ccs.stopped) # Here, killed is an array where False is added if SIGTERM # should be sent, or True if SIGKILL should be sent, in order in # which they're sent. if nokill: self.assertEqual([], killed) else: if ex_on_kill is not None: self.assertEqual([False], killed) else: self.assertEqual([False, True], killed) self.assertTrue(self.__called) bob._component_configurator.shutdown = orig def test_kills(self): """ Test that the boss kills components which don't want to stop. """ self.__real_test_kill() def test_kill_fail(self): """Test cases where kill() results in an exception due to OS error. The behavior should be different for EPERM, so we test two cases. """ ex = OSError() ex.errno, ex.strerror = errno.ESRCH, 'No such process' self.__real_test_kill(ex_on_kill=ex) ex.errno, ex.strerror = errno.EPERM, 'Operation not permitted' self.__real_test_kill(ex_on_kill=ex) def test_nokill(self): """ Test that the boss *doesn't* kill components which don't want to stop, when asked not to (by passing the --no-kill option which sets bob.nokill to True). """ self.__real_test_kill(True) def test_component_shutdown(self): """ Test the component_shutdown sets all variables accordingly. """ bob = MockBob() self.assertRaises(Exception, bob.component_shutdown, 1) self.assertEqual(1, bob.exitcode) bob._BoB__started = True bob.component_shutdown(2) self.assertEqual(2, bob.exitcode) self.assertFalse(bob.runnable) def test_init_config(self): """ Test initial configuration is loaded. """ bob = MockBob() # Start it bob._component_configurator.reconfigure = self.__unary_hook # We need to return the original read_bind10_config bob._read_bind10_config = lambda: BoB._read_bind10_config(bob) # And provide a session to read the data from class CC: pass bob.ccs = CC() bob.ccs.get_full_config = lambda: {'components': self.__compconfig} bob.start_all_components() self.__check_extended(self.__param) def __setup_restart(self, bob, component): '''Common procedure for restarting a component used below.''' bob.components_to_restart = { component } component.restarted = False bob.restart_processes() def test_restart_processes(self): '''Check some behavior on restarting processes.''' bob = MockBob() bob.runnable = True component = MockComponent('test', 53) # A component to be restarted will actually be restarted iff it's # in the configurator's configuration. # We bruteforce the configurator internal below; ugly, but the easiest # way for the test. bob._component_configurator._components['test'] = (None, component) self.__setup_restart(bob, component) self.assertTrue(component.restarted) self.assertFalse(component in bob.components_to_restart) # Remove the component from the configuration. It won't be restarted # even if scheduled, nor will remain in the to-be-restarted list. del bob._component_configurator._components['test'] self.__setup_restart(bob, component) self.assertFalse(component.restarted) self.assertFalse(component in bob.components_to_restart) def test_get_processes(self): '''Test that procsses are returned correctly, sorted by pid.''' bob = MockBob() pids = [] pids.extend(range(0, 20)) random.shuffle(pids) for i in range(0, 20): pid = pids[i] component = MockComponent('test' + str(pid), pid, 'Test' + str(pid)) bob.components[pid] = component process_list = bob.get_processes() self.assertEqual(20, len(process_list)) last_pid = -1 for process in process_list: pid = process[0] self.assertLessEqual(last_pid, pid) last_pid = pid self.assertEqual([pid, 'test' + str(pid), 'Test' + str(pid)], process) def _test_reap_children_helper(self, runnable, is_running, failed): '''Construct a BoB instance, set various data in it according to passed args and check if the component was added to the list of components to restart.''' bob = MockBob() bob.runnable = runnable component = MockComponent('test', 53) component.running = is_running component.has_failed = failed bob.components[53] = component self.assertFalse(component in bob.components_to_restart) bob.reap_children() if runnable and is_running and not failed: self.assertTrue(bob.components_to_restart) else: self.assertFalse(bob.components_to_restart) def test_reap_children(self): '''Test that children are queued to be restarted when they ask for it.''' # test various combinations of 3 booleans # (BoB.runnable, component.is_running(), component.failed()) self._test_reap_children_helper(False, False, False) self._test_reap_children_helper(False, False, True) self._test_reap_children_helper(False, True, False) self._test_reap_children_helper(False, True, True) self._test_reap_children_helper(True, False, False) self._test_reap_children_helper(True, False, True) self._test_reap_children_helper(True, True, False) self._test_reap_children_helper(True, True, True) # setup for more tests below bob = MockBob() bob.runnable = True component = MockComponent('test', 53) bob.components[53] = component # case where the returned pid is unknown to us. nothing should # happpen then. bob.get_process_exit_status_called = False bob._get_process_exit_status = bob._get_process_exit_status_unknown_pid bob.components_to_restart = [] # this should do nothing as the pid is unknown bob.reap_children() self.assertFalse(bob.components_to_restart) # case where bob._get_process_exit_status() raises OSError with errno.ECHILD bob._get_process_exit_status = bob._get_process_exit_status_raises_oserror_echild bob.components_to_restart = [] # this should catch and handle the OSError bob.reap_children() self.assertFalse(bob.components_to_restart) # case where bob._get_process_exit_status() raises OSError with # errno other than ECHILD bob._get_process_exit_status = bob._get_process_exit_status_raises_oserror_other with self.assertRaises(OSError): bob.reap_children() # case where bob._get_process_exit_status() raises something # other than OSError bob._get_process_exit_status = bob._get_process_exit_status_raises_other with self.assertRaises(Exception): bob.reap_children() def test_kill_started_components(self): '''Test that started components are killed.''' bob = MockBob() component = MockComponent('test', 53, 'Test') bob.components[53] = component self.assertEqual([[53, 'test', 'Test']], bob.get_processes()) bob.kill_started_components() self.assertFalse(bob.get_processes()) self.assertTrue(component.forceful) def test_start_msgq(self): '''Test that b10-msgq is started.''' bob = MockBobSimple() bob.c_channel_env = {} bob.msgq_timeout = 0 # use the MockProcessInfo creator bob._make_process_info = bob._make_mock_process_info # non-verbose case bob.verbose = False pi = bob.start_msgq() self.assertEqual('b10-msgq', pi.name) self.assertEqual(['b10-msgq'], pi.args) self.assertTrue(pi.dev_null_stdout) self.assertTrue(pi.dev_null_stderr) self.assertEqual({}, pi.env) # this is set by ProcessInfo.spawn() self.assertEqual(42147, pi.pid) # verbose case bob.verbose = True pi = bob.start_msgq() self.assertEqual('b10-msgq', pi.name) self.assertEqual(['b10-msgq'], pi.args) self.assertTrue(pi.dev_null_stdout) self.assertFalse(pi.dev_null_stderr) self.assertEqual({}, pi.env) # this is set by ProcessInfo.spawn() self.assertEqual(42147, pi.pid) def test_start_msgq_timeout(self): '''Test that b10-msgq startup attempts connections several times and times out eventually.''' bob = MockBobSimple() bob.c_channel_env = {} # keep the timeout small for the test to complete quickly bob.msgq_timeout = 1 # use the MockProcessInfo creator bob._make_process_info = bob._make_mock_process_info global attempts attempts = 0 tmp_time = time.time def _my_time(): global attempts attempts += 1 return tmp_time() time.time = _my_time thrown = False # An exception will be thrown here when it eventually times out. try: pi = bob.start_msgq() except bind10_src.CChannelConnectError as e: thrown = True # We just check that an exception was thrown, and that several # attempts were made to connect. self.assertTrue(thrown) # 1 second of attempts every 0.1 seconds should result in at least 5 attempts self.assertGreater(attempts, 5) time.time = tmp_time def test_start_cfgmgr(self): '''Test that b10-cfgmgr is started.''' class DummySession(): def group_recvmsg(self): return (None, None) bob = MockBobSimple() bob.c_channel_env = {} bob.cc_session = DummySession() bob.wait_time = 0 # use the MockProcessInfo creator bob._make_process_info = bob._make_mock_process_info # defaults pi = bob.start_cfgmgr() self.assertEqual('b10-cfgmgr', pi.name) self.assertEqual(['b10-cfgmgr'], pi.args) self.assertEqual({}, pi.env) # this is set by ProcessInfo.spawn() self.assertEqual(42147, pi.pid) # data_path is specified bob.data_path = '/var/lib/test' pi = bob.start_cfgmgr() self.assertEqual('b10-cfgmgr', pi.name) self.assertEqual(['b10-cfgmgr', '--data-path=/var/lib/test'], pi.args) self.assertEqual({}, pi.env) # this is set by ProcessInfo.spawn() self.assertEqual(42147, pi.pid) # config_filename is specified bob.config_filename = 'foo.cfg' pi = bob.start_cfgmgr() self.assertEqual('b10-cfgmgr', pi.name) self.assertEqual(['b10-cfgmgr', '--data-path=/var/lib/test', '--config-filename=foo.cfg'], pi.args) self.assertEqual({}, pi.env) # this is set by ProcessInfo.spawn() self.assertEqual(42147, pi.pid) # clear_config is specified bob.clear_config = True pi = bob.start_cfgmgr() self.assertEqual('b10-cfgmgr', pi.name) self.assertEqual(['b10-cfgmgr', '--data-path=/var/lib/test', '--config-filename=foo.cfg', '--clear-config'], pi.args) self.assertEqual({}, pi.env) # this is set by ProcessInfo.spawn() self.assertEqual(42147, pi.pid) def test_start_ccsession(self): '''Test that CC session is started.''' class DummySession(): def __init__(self, specfile, config_handler, command_handler, socket_file): self.specfile = specfile self.config_handler = config_handler self.command_handler = command_handler self.socket_file = socket_file self.started = False def start(self): self.started = True bob = MockBobSimple() tmp = isc.config.ModuleCCSession isc.config.ModuleCCSession = DummySession ccs = bob.start_ccsession({}) self.assertEqual(bind10_src.SPECFILE_LOCATION, ccs.specfile) self.assertEqual(bob.config_handler, ccs.config_handler) self.assertEqual(bob.command_handler, ccs.command_handler) self.assertEqual(bob.msgq_socket_file, ccs.socket_file) self.assertTrue(ccs.started) isc.config.ModuleCCSession = tmp def test_start_process(self): '''Test that processes can be started.''' bob = MockBob() # use the MockProcessInfo creator bob._make_process_info = bob._make_mock_process_info pi = bob.start_process('Test Process', ['/bin/true'], {}) self.assertEqual('Test Process', pi.name) self.assertEqual(['/bin/true'], pi.args) self.assertEqual({}, pi.env) # this is set by ProcessInfo.spawn() self.assertEqual(42147, pi.pid) def test_register_process(self): '''Test that processes can be registered with BoB.''' bob = MockBob() component = MockComponent('test', 53, 'Test') self.assertFalse(53 in bob.components) bob.register_process(53, component) self.assertTrue(53 in bob.components) def test_start_simple(self): '''Test simple process startup.''' bob = MockBobSimple() bob.c_channel_env = {} # non-verbose case bob.verbose = False bob.start_simple('/bin/true') self.assertEqual('/bin/true', bob.started_process_name) self.assertEqual(['/bin/true'], bob.started_process_args) # verbose case bob.verbose = True bob.start_simple('/bin/true') self.assertEqual('/bin/true', bob.started_process_name) self.assertEqual(['/bin/true', '-v'], bob.started_process_args) def test_start_auth(self): '''Test that b10-auth is started.''' bob = MockBobSimple() bob.c_channel_env = {} # non-verbose case bob.verbose = False bob.start_auth() self.assertEqual('b10-auth', bob.started_process_name) self.assertEqual(['b10-auth'], bob.started_process_args) # verbose case bob.verbose = True bob.start_auth() self.assertEqual('b10-auth', bob.started_process_name) self.assertEqual(['b10-auth', '-v'], bob.started_process_args) def test_start_resolver(self): '''Test that b10-resolver is started.''' bob = MockBobSimple() bob.c_channel_env = {} # non-verbose case bob.verbose = False bob.start_resolver() self.assertEqual('b10-resolver', bob.started_process_name) self.assertEqual(['b10-resolver'], bob.started_process_args) # verbose case bob.verbose = True bob.start_resolver() self.assertEqual('b10-resolver', bob.started_process_name) self.assertEqual(['b10-resolver', '-v'], bob.started_process_args) def test_start_cmdctl(self): '''Test that b10-cmdctl is started.''' bob = MockBobSimple() bob.c_channel_env = {} # non-verbose case bob.verbose = False bob.start_cmdctl() self.assertEqual('b10-cmdctl', bob.started_process_name) self.assertEqual(['b10-cmdctl'], bob.started_process_args) # verbose case bob.verbose = True bob.start_cmdctl() self.assertEqual('b10-cmdctl', bob.started_process_name) self.assertEqual(['b10-cmdctl', '-v'], bob.started_process_args) # with port bob.verbose = True bob.cmdctl_port = 9353 bob.start_cmdctl() self.assertEqual('b10-cmdctl', bob.started_process_name) self.assertEqual(['b10-cmdctl', '--port=9353', '-v'], bob.started_process_args) class SocketSrvTest(unittest.TestCase): """ This tests some methods of boss related to the unix domain sockets used to transfer other sockets to applications. """ def setUp(self): """ Create the boss to test, testdata and backup some functions. """ self.__boss = BoB() self.__select_backup = bind10_src.select.select self.__select_called = None self.__socket_data_called = None self.__consumer_dead_called = None self.__socket_request_handler_called = None def tearDown(self): """ Restore functions. """ bind10_src.select.select = self.__select_backup class __FalseSocket: """ A mock socket for the select and accept and stuff like that. """ def __init__(self, owner, fileno=42): self.__owner = owner self.__fileno = fileno self.data = None self.closed = False def fileno(self): return self.__fileno def accept(self): return (self.__class__(self.__owner, 13), "/path/to/socket") def recv(self, bufsize, flags=0): self.__owner.assertEqual(1, bufsize) self.__owner.assertEqual(socket.MSG_DONTWAIT, flags) if isinstance(self.data, socket.error): raise self.data elif self.data is not None: if len(self.data): result = self.data[0:1] self.data = self.data[1:] return result else: raise socket.error(errno.EAGAIN, "Would block") else: return b'' def close(self): self.closed = True class __CCS: """ A mock CCS, just to provide the socket file number. """ class __Socket: def fileno(self): return 1 def get_socket(self): return self.__Socket() def __select_accept(self, r, w, x, t): self.__select_called = (r, w, x, t) return ([42], [], []) def __select_data(self, r, w, x, t): self.__select_called = (r, w, x, t) return ([13], [], []) def __accept(self): """ Hijact the accept method of the boss. Notes down it was called and stops the boss. """ self.__accept_called = True self.__boss.runnable = False def test_srv_accept_called(self): """ Test that the _srv_accept method of boss is called when the listening socket is readable. """ self.__boss.runnable = True self.__boss._srv_socket = self.__FalseSocket(self) self.__boss._srv_accept = self.__accept self.__boss.ccs = self.__CCS() bind10_src.select.select = self.__select_accept self.__boss.run(2) # It called the accept self.assertTrue(self.__accept_called) # And the select had the right parameters self.assertEqual(([2, 1, 42], [], [], None), self.__select_called) def test_srv_accept(self): """ Test how the _srv_accept method works. """ self.__boss._srv_socket = self.__FalseSocket(self) self.__boss._srv_accept() # After we accepted, a new socket is added there socket = self.__boss._unix_sockets[13][0] # The socket is properly stored there self.assertTrue(isinstance(socket, self.__FalseSocket)) # And the buffer (yet empty) is there self.assertEqual({13: (socket, b'')}, self.__boss._unix_sockets) def __socket_data(self, socket): self.__boss.runnable = False self.__socket_data_called = socket def test_socket_data(self): """ Test that a socket that wants attention gets it. """ self.__boss._srv_socket = self.__FalseSocket(self) self.__boss._socket_data = self.__socket_data self.__boss.ccs = self.__CCS() self.__boss._unix_sockets = {13: (self.__FalseSocket(self, 13), b'')} self.__boss.runnable = True bind10_src.select.select = self.__select_data self.__boss.run(2) self.assertEqual(13, self.__socket_data_called) self.assertEqual(([2, 1, 42, 13], [], [], None), self.__select_called) def __prepare_data(self, data): socket = self.__FalseSocket(self, 13) self.__boss._unix_sockets = {13: (socket, b'')} socket.data = data self.__boss.socket_consumer_dead = self.__consumer_dead self.__boss.socket_request_handler = self.__socket_request_handler return socket def __consumer_dead(self, socket): self.__consumer_dead_called = socket def __socket_request_handler(self, token, socket): self.__socket_request_handler_called = (token, socket) def test_socket_closed(self): """ Test that a socket is removed and the socket_consumer_dead is called when it is closed. """ socket = self.__prepare_data(None) self.__boss._socket_data(13) self.assertEqual(socket, self.__consumer_dead_called) self.assertEqual({}, self.__boss._unix_sockets) self.assertTrue(socket.closed) def test_socket_short(self): """ Test that if there's not enough data to get the whole socket, it is kept there, but nothing is called. """ socket = self.__prepare_data(b'tok') self.__boss._socket_data(13) self.assertEqual({13: (socket, b'tok')}, self.__boss._unix_sockets) self.assertFalse(socket.closed) self.assertIsNone(self.__consumer_dead_called) self.assertIsNone(self.__socket_request_handler_called) def test_socket_continue(self): """ Test that we call the token handling function when the whole token comes. This test pretends to continue reading where the previous one stopped. """ socket = self.__prepare_data(b"en\nanothe") # The data to finish self.__boss._unix_sockets[13] = (socket, b'tok') self.__boss._socket_data(13) self.assertEqual({13: (socket, b'anothe')}, self.__boss._unix_sockets) self.assertFalse(socket.closed) self.assertIsNone(self.__consumer_dead_called) self.assertEqual((b'token', socket), self.__socket_request_handler_called) def test_broken_socket(self): """ If the socket raises an exception during the read other than EAGAIN, it is broken and we remove it. """ sock = self.__prepare_data(socket.error(errno.ENOMEM, "There's more memory available, but not for you")) self.__boss._socket_data(13) self.assertEqual(sock, self.__consumer_dead_called) self.assertEqual({}, self.__boss._unix_sockets) self.assertTrue(sock.closed) class TestFunctions(unittest.TestCase): def setUp(self): self.lockfile_testpath = \ "@abs_top_builddir@/src/bin/bind10/tests/lockfile_test" self.assertFalse(os.path.exists(self.lockfile_testpath)) os.mkdir(self.lockfile_testpath) self.assertTrue(os.path.isdir(self.lockfile_testpath)) def tearDown(self): os.rmdir(self.lockfile_testpath) self.assertFalse(os.path.isdir(self.lockfile_testpath)) os.environ["B10_LOCKFILE_DIR_FROM_BUILD"] = "@abs_top_builddir@" def test_remove_lock_files(self): os.environ["B10_LOCKFILE_DIR_FROM_BUILD"] = self.lockfile_testpath # create lockfiles for the testcase lockfiles = ["logger_lockfile"] for f in lockfiles: fname = os.environ["B10_LOCKFILE_DIR_FROM_BUILD"] + '/' + f self.assertFalse(os.path.exists(fname)) open(fname, "w").close() self.assertTrue(os.path.isfile(fname)) # first call should clear up all the lockfiles bind10_src.remove_lock_files() # check if the lockfiles exist for f in lockfiles: fname = os.environ["B10_LOCKFILE_DIR_FROM_BUILD"] + '/' + f self.assertFalse(os.path.isfile(fname)) # second call should not assert anyway bind10_src.remove_lock_files() def test_get_signame(self): # just test with some samples signame = bind10_src.get_signame(signal.SIGTERM) self.assertEqual('SIGTERM', signame) signame = bind10_src.get_signame(signal.SIGKILL) self.assertEqual('SIGKILL', signame) # 59426 is hopefully an unused signal on most platforms signame = bind10_src.get_signame(59426) self.assertEqual('Unknown signal 59426', signame) def test_fatal_signal(self): self.assertIsNone(bind10_src.boss_of_bind) bind10_src.boss_of_bind = BoB() bind10_src.boss_of_bind.runnable = True bind10_src.fatal_signal(signal.SIGTERM, None) # Now, runnable must be False self.assertFalse(bind10_src.boss_of_bind.runnable) bind10_src.boss_of_bind = None if __name__ == '__main__': # store os.environ for test_unchanged_environment original_os_environ = copy.deepcopy(os.environ) isc.log.resetUnitTestRootLogger() unittest.main()