# Copyright (C) 2011 Internet Systems Consortium. # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. from bind10_src import ProcessInfo, BoB, parse_args, dump_pid, unlink_pid_file, _BASETIME # XXX: environment tests are currently disabled, due to the preprocessor # setup that we have now complicating the environment import unittest import sys import os import copy import signal import socket from isc.net.addr import IPAddr import time import isc import isc.log from isc.testutils.parse_args import TestOptParser, OptsError class TestProcessInfo(unittest.TestCase): def setUp(self): # redirect stdout to a pipe so we can check that our # process spawning is doing the right thing with stdout self.old_stdout = os.dup(sys.stdout.fileno()) self.pipes = os.pipe() os.dup2(self.pipes[1], sys.stdout.fileno()) os.close(self.pipes[1]) # note that we use dup2() to restore the original stdout # to the main program ASAP in each test... this prevents # hangs reading from the child process (as the pipe is only # open in the child), and also insures nice pretty output def tearDown(self): # clean up our stdout munging os.dup2(self.old_stdout, sys.stdout.fileno()) os.close(self.pipes[0]) def test_init(self): pi = ProcessInfo('Test Process', [ '/bin/echo', 'foo' ]) pi.spawn() os.dup2(self.old_stdout, sys.stdout.fileno()) self.assertEqual(pi.name, 'Test Process') self.assertEqual(pi.args, [ '/bin/echo', 'foo' ]) # self.assertEqual(pi.env, { 'PATH': os.environ['PATH'], # 'PYTHON_EXEC': os.environ['PYTHON_EXEC'] }) self.assertEqual(pi.dev_null_stdout, False) self.assertEqual(os.read(self.pipes[0], 100), b"foo\n") self.assertNotEqual(pi.process, None) self.assertTrue(type(pi.pid) is int) # def test_setting_env(self): # pi = ProcessInfo('Test Process', [ '/bin/true' ], env={'FOO': 'BAR'}) # os.dup2(self.old_stdout, sys.stdout.fileno()) # self.assertEqual(pi.env, { 'PATH': os.environ['PATH'], # 'PYTHON_EXEC': os.environ['PYTHON_EXEC'], # 'FOO': 'BAR' }) def test_setting_null_stdout(self): pi = ProcessInfo('Test Process', [ '/bin/echo', 'foo' ], dev_null_stdout=True) pi.spawn() os.dup2(self.old_stdout, sys.stdout.fileno()) self.assertEqual(pi.dev_null_stdout, True) self.assertEqual(os.read(self.pipes[0], 100), b"") def test_respawn(self): pi = ProcessInfo('Test Process', [ '/bin/echo', 'foo' ]) pi.spawn() # wait for old process to work... self.assertEqual(os.read(self.pipes[0], 100), b"foo\n") # respawn it old_pid = pi.pid pi.respawn() os.dup2(self.old_stdout, sys.stdout.fileno()) # make sure the new one started properly self.assertEqual(pi.name, 'Test Process') self.assertEqual(pi.args, [ '/bin/echo', 'foo' ]) # self.assertEqual(pi.env, { 'PATH': os.environ['PATH'], # 'PYTHON_EXEC': os.environ['PYTHON_EXEC'] }) self.assertEqual(pi.dev_null_stdout, False) self.assertEqual(os.read(self.pipes[0], 100), b"foo\n") self.assertNotEqual(pi.process, None) self.assertTrue(type(pi.pid) is int) self.assertNotEqual(pi.pid, old_pid) class TestBoB(unittest.TestCase): def test_init(self): bob = BoB() self.assertEqual(bob.verbose, False) self.assertEqual(bob.msgq_socket_file, None) self.assertEqual(bob.cc_session, None) self.assertEqual(bob.ccs, None) self.assertEqual(bob.processes, {}) self.assertEqual(bob.dead_processes, {}) self.assertEqual(bob.runnable, False) self.assertEqual(bob.uid, None) self.assertEqual(bob.username, None) self.assertEqual(bob.nocache, False) self.assertEqual(bob.cfg_start_auth, True) self.assertEqual(bob.cfg_start_resolver, False) self.assertEqual(bob.cfg_start_dhcp4, False) self.assertEqual(bob.cfg_start_dhcp6, False) def test_init_alternate_socket(self): bob = BoB("alt_socket_file") self.assertEqual(bob.verbose, False) self.assertEqual(bob.msgq_socket_file, "alt_socket_file") self.assertEqual(bob.cc_session, None) self.assertEqual(bob.ccs, None) self.assertEqual(bob.processes, {}) self.assertEqual(bob.dead_processes, {}) self.assertEqual(bob.runnable, False) self.assertEqual(bob.uid, None) self.assertEqual(bob.username, None) self.assertEqual(bob.nocache, False) self.assertEqual(bob.cfg_start_auth, True) self.assertEqual(bob.cfg_start_resolver, False) self.assertEqual(bob.cfg_start_dhcp4, False) self.assertEqual(bob.cfg_start_dhcp6, False) def test_command_handler(self): class DummySession(): def group_sendmsg(self, msg, group): (self.msg, self.group) = (msg, group) def group_recvmsg(self, nonblock, seq): pass class DummyModuleCCSession(): module_spec = isc.config.module_spec.ModuleSpec({ "module_name": "Boss", "statistics": [ { "item_name": "boot_time", "item_type": "string", "item_optional": False, "item_default": "1970-01-01T00:00:00Z", "item_title": "Boot time", "item_description": "A date time when bind10 process starts initially", "item_format": "date-time" } ] }) def get_module_spec(self): return self.module_spec bob = BoB() bob.verbose = True bob.cc_session = DummySession() bob.ccs = DummyModuleCCSession() # a bad command self.assertEqual(bob.command_handler(-1, None), isc.config.ccsession.create_answer(1, "bad command")) # "shutdown" command self.assertEqual(bob.command_handler("shutdown", None), isc.config.ccsession.create_answer(0)) self.assertFalse(bob.runnable) # "getstats" command self.assertEqual(bob.command_handler("getstats", None), isc.config.ccsession.create_answer(0, { "owner": "Boss", "data": { 'boot_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', _BASETIME) }})) # "sendstats" command self.assertEqual(bob.command_handler("sendstats", None), isc.config.ccsession.create_answer(0)) self.assertEqual(bob.cc_session.group, "Stats") self.assertEqual(bob.cc_session.msg, isc.config.ccsession.create_command( "set", { "owner": "Boss", "data": { "boot_time": time.strftime("%Y-%m-%dT%H:%M:%SZ", _BASETIME) }})) # "ping" command self.assertEqual(bob.command_handler("ping", None), isc.config.ccsession.create_answer(0, "pong")) # "show_processes" command self.assertEqual(bob.command_handler("show_processes", None), isc.config.ccsession.create_answer(0, bob.get_processes())) # an unknown command self.assertEqual(bob.command_handler("__UNKNOWN__", None), isc.config.ccsession.create_answer(1, "Unknown command")) # Class for testing the BoB without actually starting processes. # This is used for testing the start/stop components routines and # the BoB commands. # # Testing that external processes start is outside the scope # of the unit test, by overriding the process start methods we can check # that the right processes are started depending on the configuration # options. class MockBob(BoB): def __init__(self): BoB.__init__(self) # Set flags as to which of the overridden methods has been run. self.msgq = False self.cfgmgr = False self.ccsession = False self.auth = False self.resolver = False self.xfrout = False self.xfrin = False self.zonemgr = False self.stats = False self.stats_httpd = False self.cmdctl = False self.dhcp6 = False self.dhcp4 = False self.c_channel_env = {} self.processes = { } self.creator = False class MockSockCreator(isc.bind10.component.Component): def __init__(self, process, boss, kind, address=None, params=None): isc.bind10.component.Component.__init__(self, process, boss, kind, 'SockCreator') self._start_func = boss.start_creator specials = isc.bind10.special_component.get_specials() specials['sockcreator'] = MockSockCreator self._component_configurator = \ isc.bind10.component.Configurator(self, specials) def start_creator(self): self.creator = True procinfo = ProcessInfo('b10-sockcreator', ['/bin/false']) procinfo.pid = 1 return procinfo def stop_creator(self, kill=False): self.creator = False def read_bind10_config(self): # Configuration options are set directly pass def start_msgq(self): self.msgq = True procinfo = ProcessInfo('b10-msgq', ['/bin/false']) procinfo.pid = 2 return procinfo def start_ccsession(self, c_channel_env): # this is not a process, don't have to do anything with procinfo self.ccsession = True def start_cfgmgr(self): self.cfgmgr = True procinfo = ProcessInfo('b10-cfgmgr', ['/bin/false']) procinfo.pid = 3 return procinfo def start_auth(self): self.auth = True procinfo = ProcessInfo('b10-auth', ['/bin/false']) procinfo.pid = 5 return procinfo def start_resolver(self): self.resolver = True procinfo = ProcessInfo('b10-resolver', ['/bin/false']) procinfo.pid = 6 return procinfo def start_simple(self, name): procmap = { 'b10-xfrout': self.start_xfrout, 'b10-zonemgr': self.start_zonemgr, 'b10-stats': self.start_stats, 'b10-stats-httpd': self.start_stats_httpd, 'b10-cmdctl': self.start_cmdctl, 'b10-dhcp6': self.start_dhcp6, 'b10-dhcp4': self.start_dhcp4 } return procmap[name]() def start_xfrout(self): self.xfrout = True procinfo = ProcessInfo('b10-xfrout', ['/bin/false']) procinfo.pid = 7 return procinfo def start_xfrin(self): self.xfrin = True procinfo = ProcessInfo('b10-xfrin', ['/bin/false']) procinfo.pid = 8 return procinfo def start_zonemgr(self): self.zonemgr = True procinfo = ProcessInfo('b10-zonemgr', ['/bin/false']) procinfo.pid = 9 return procinfo def start_stats(self): self.stats = True procinfo = ProcessInfo('b10-stats', ['/bin/false']) procinfo.pid = 10 return procinfo def start_stats_httpd(self): self.stats_httpd = True procinfo = ProcessInfo('b10-stats-httpd', ['/bin/false']) procinfo.pid = 11 return procinfo def start_cmdctl(self): self.cmdctl = True procinfo = ProcessInfo('b10-cmdctl', ['/bin/false']) procinfo.pid = 12 return procinfo def start_dhcp6(self): self.dhcp6 = True procinfo = ProcessInfo('b10-dhcp6', ['/bin/false']) procinfo.pid = 13 return procinfo def start_dhcp4(self): self.dhcp4 = True procinfo = ProcessInfo('b10-dhcp4', ['/bin/false']) procinfo.pid = 14 return procinfo def stop_process(self, process, recipient): procmap = { 'b10-auth': self.stop_auth, 'b10-resolver': self.stop_resolver, 'b10-xfrout': self.stop_xfrout, 'b10-xfrin': self.stop_xfrin, 'b10-zonemgr': self.stop_zonemgr, 'b10-stats': self.stop_stats, 'b10-stats-httpd': self.stop_stats_httpd, 'b10-cmdctl': self.stop_cmdctl } procmap[process]() # Some functions to pretend we stop processes, use by stop_process def stop_msgq(self): if self.msgq: del self.processes[2] self.msgq = False def stop_cfgmgr(self): if self.cfgmgr: del self.processes[3] self.cfgmgr = False def stop_auth(self): if self.auth: del self.processes[5] self.auth = False def stop_resolver(self): if self.resolver: del self.processes[6] self.resolver = False def stop_xfrout(self): if self.xfrout: del self.processes[7] self.xfrout = False def stop_xfrin(self): if self.xfrin: del self.processes[8] self.xfrin = False def stop_zonemgr(self): if self.zonemgr: del self.processes[9] self.zonemgr = False def stop_stats(self): if self.stats: del self.processes[10] self.stats = False def stop_stats_httpd(self): if self.stats_httpd: del self.processes[11] self.stats_httpd = False def stop_cmdctl(self): if self.cmdctl: del self.processes[12] self.cmdctl = False class TestStartStopProcessesBob(unittest.TestCase): """ Check that the start_all_processes method starts the right combination of processes and that the right processes are started and stopped according to changes in configuration. """ def check_environment_unchanged(self): # Check whether the environment has not been changed self.assertEqual(original_os_environ, os.environ) def check_started(self, bob, core, auth, resolver): """ Check that the right sets of services are started. The ones that should be running are specified by the core, auth and resolver parameters (they are groups of processes, eg. auth means b10-auth, -xfrout, -xfrin and -zonemgr). """ self.assertEqual(bob.msgq, core) self.assertEqual(bob.cfgmgr, core) self.assertEqual(bob.ccsession, core) self.assertEqual(bob.creator, core) self.assertEqual(bob.auth, auth) self.assertEqual(bob.resolver, resolver) self.assertEqual(bob.xfrout, auth) self.assertEqual(bob.xfrin, auth) self.assertEqual(bob.zonemgr, auth) self.assertEqual(bob.stats, core) self.assertEqual(bob.stats_httpd, core) self.assertEqual(bob.cmdctl, core) self.check_environment_unchanged() def check_preconditions(self, bob): self.check_started(bob, False, False, False) def check_started_none(self, bob): """ Check that the situation is according to configuration where no servers should be started. Some processes still need to be running. """ self.check_started(bob, True, False, False) self.check_environment_unchanged() def check_started_both(self, bob): """ Check the situation is according to configuration where both servers (auth and resolver) are enabled. """ self.check_started(bob, True, True, True) self.check_environment_unchanged() def check_started_auth(self, bob): """ Check the set of processes needed to run auth only is started. """ self.check_started(bob, True, True, False) self.check_environment_unchanged() def check_started_resolver(self, bob): """ Check the set of processes needed to run resolver only is started. """ self.check_started(bob, True, False, True) self.check_environment_unchanged() def check_started_dhcp(self, bob, v4, v6): """ Check if proper combinations of DHCPv4 and DHCpv6 can be started """ self.assertEqual(v4, bob.dhcp4) self.assertEqual(v6, bob.dhcp6) self.check_environment_unchanged() def construct_config(self, start_auth, start_resolver): # The things that are common, not turned on an off config = {} config['b10-stats'] = { 'kind': 'dispensable', 'address': 'Stats' } config['b10-stats-httpd'] = { 'kind': 'dispensable', 'address': 'StatsHttpd' } config['b10-cmdctl'] = { 'kind': 'needed', 'special': 'cmdctl' } if start_auth: config['b10-auth'] = { 'kind': 'needed', 'special': 'auth' } config['b10-xfrout'] = { 'kind': 'dispensable', 'address': 'Xfrout' } config['b10-xfrin'] = { 'kind': 'dispensable', 'special': 'xfrin' } config['b10-zonemgr'] = { 'kind': 'dispensable', 'address': 'Zonemgr' } if start_resolver: config['b10-resolver'] = { 'kind': 'needed', 'special': 'resolver' } return {'components': config} def test_config_start(self): """ Test that the configuration starts and stops processes according to configuration changes. """ # Create BoB and ensure correct initialization bob = MockBob() self.check_preconditions(bob) bob.start_all_processes() bob.runnable = True bob._BoB_started = True bob.config_handler(self.construct_config(False, False)) self.check_started_none(bob) # Enable both at once bob.config_handler(self.construct_config(True, True)) self.check_started_both(bob) # Not touched by empty change bob.config_handler({}) self.check_started_both(bob) # Not touched by change to the same configuration bob.config_handler(self.construct_config(True, True)) self.check_started_both(bob) # Turn them both off again bob.config_handler(self.construct_config(False, False)) self.check_started_none(bob) # Not touched by empty change bob.config_handler({}) self.check_started_none(bob) # Not touched by change to the same configuration bob.config_handler(self.construct_config(False, False)) self.check_started_none(bob) # Start and stop auth separately bob.config_handler(self.construct_config(True, False)) self.check_started_auth(bob) bob.config_handler(self.construct_config(False, False)) self.check_started_none(bob) # Start and stop resolver separately bob.config_handler(self.construct_config(False, True)) self.check_started_resolver(bob) bob.config_handler(self.construct_config(False, False)) self.check_started_none(bob) # Alternate bob.config_handler(self.construct_config(True, False)) self.check_started_auth(bob) bob.config_handler(self.construct_config(False, True)) self.check_started_resolver(bob) bob.config_handler(self.construct_config(True, False)) self.check_started_auth(bob) def test_config_start_once(self): """ Tests that a process is started only once. """ # Create BoB and ensure correct initialization bob = MockBob() self.check_preconditions(bob) bob.start_all_processes() bob._BoB_started = True bob.runnable = True bob.config_handler(self.construct_config(True, True)) self.check_started_both(bob) bob.start_auth = lambda: self.fail("Started auth again") bob.start_xfrout = lambda: self.fail("Started xfrout again") bob.start_xfrin = lambda: self.fail("Started xfrin again") bob.start_zonemgr = lambda: self.fail("Started zonemgr again") bob.start_resolver = lambda: self.fail("Started resolver again") # Send again we want to start them. Should not do it, as they are. bob.config_handler(self.construct_config(True, True)) def test_config_not_started_early(self): """ Test that processes are not started by the config handler before startup. """ bob = MockBob() self.check_preconditions(bob) bob.start_auth = lambda: self.fail("Started auth again") bob.start_xfrout = lambda: self.fail("Started xfrout again") bob.start_xfrin = lambda: self.fail("Started xfrin again") bob.start_zonemgr = lambda: self.fail("Started zonemgr again") bob.start_resolver = lambda: self.fail("Started resolver again") bob.config_handler({'start_auth': True, 'start_resolver': True}) # Checks that DHCP (v4 and v6) processes are started when expected def test_start_dhcp(self): # Create BoB and ensure correct initialization bob = MockBob() self.check_preconditions(bob) bob.start_all_processes() bob._BoB_started = True bob.runnable = True bob.config_handler(self.construct_config(False, False)) self.check_started_dhcp(bob, False, False) def test_start_dhcp_v6only(self): # Create BoB and ensure correct initialization bob = MockBob() self.check_preconditions(bob) # v6 only enabled bob.start_all_processes() bob.runnable = True bob._BoB_started = True config = self.construct_config(False, False) config['components']['b10-dhcp6'] = { 'kind': 'needed', 'address': 'Dhcp6' } bob.config_handler(config) self.check_started_dhcp(bob, False, True) # uncomment when dhcpv4 becomes implemented # v4 only enabled #bob.cfg_start_dhcp6 = False #bob.cfg_start_dhcp4 = True #self.check_started_dhcp(bob, True, False) # both v4 and v6 enabled #bob.cfg_start_dhcp6 = True #bob.cfg_start_dhcp4 = True #self.check_started_dhcp(bob, True, True) class MockComponent: def __init__(self, name, pid): self.name = lambda: name self.pid = lambda: pid class TestBossCmd(unittest.TestCase): def test_ping(self): """ Confirm simple ping command works. """ bob = MockBob() answer = bob.command_handler("ping", None) self.assertEqual(answer, {'result': [0, 'pong']}) def test_show_processes(self): """ Confirm getting a list of processes works. """ bob = MockBob() answer = bob.command_handler("show_processes", None) self.assertEqual(answer, {'result': [0, []]}) def test_show_processes_started(self): """ Confirm getting a list of processes works. """ bob = MockBob() bob.register_process(1, MockComponent('first', 1)) bob.register_process(2, MockComponent('second', 2)) answer = bob.command_handler("show_processes", None) processes = [[1, 'first'], [2, 'second']] self.assertEqual(answer, {'result': [0, processes]}) class TestParseArgs(unittest.TestCase): """ This tests parsing of arguments of the bind10 master process. """ #TODO: Write tests for the original parsing, bad options, etc. def test_no_opts(self): """ Test correct default values when no options are passed. """ options = parse_args([], TestOptParser) self.assertEqual(None, options.data_path) self.assertEqual(None, options.config_file) self.assertEqual(None, options.cmdctl_port) def test_data_path(self): """ Test it can parse the data path. """ self.assertRaises(OptsError, parse_args, ['-p'], TestOptParser) self.assertRaises(OptsError, parse_args, ['--data-path'], TestOptParser) options = parse_args(['-p', '/data/path'], TestOptParser) self.assertEqual('/data/path', options.data_path) options = parse_args(['--data-path=/data/path'], TestOptParser) self.assertEqual('/data/path', options.data_path) def test_config_filename(self): """ Test it can parse the config switch. """ self.assertRaises(OptsError, parse_args, ['-c'], TestOptParser) self.assertRaises(OptsError, parse_args, ['--config-file'], TestOptParser) options = parse_args(['-c', 'config-file'], TestOptParser) self.assertEqual('config-file', options.config_file) options = parse_args(['--config-file=config-file'], TestOptParser) self.assertEqual('config-file', options.config_file) def test_cmdctl_port(self): """ Test it can parse the command control port. """ self.assertRaises(OptsError, parse_args, ['--cmdctl-port=abc'], TestOptParser) self.assertRaises(OptsError, parse_args, ['--cmdctl-port=100000000'], TestOptParser) self.assertRaises(OptsError, parse_args, ['--cmdctl-port'], TestOptParser) options = parse_args(['--cmdctl-port=1234'], TestOptParser) self.assertEqual(1234, options.cmdctl_port) def test_brittle(self): """ Test we can use the "brittle" flag. """ options = parse_args([], TestOptParser) self.assertFalse(options.brittle) options = parse_args(['--brittle'], TestOptParser) self.assertTrue(options.brittle) class TestPIDFile(unittest.TestCase): def setUp(self): self.pid_file = '@builddir@' + os.sep + 'bind10.pid' if os.path.exists(self.pid_file): os.unlink(self.pid_file) def tearDown(self): if os.path.exists(self.pid_file): os.unlink(self.pid_file) def check_pid_file(self): # dump PID to the file, and confirm the content is correct dump_pid(self.pid_file) my_pid = os.getpid() self.assertEqual(my_pid, int(open(self.pid_file, "r").read())) def test_dump_pid(self): self.check_pid_file() # make sure any existing content will be removed open(self.pid_file, "w").write('dummy data\n') self.check_pid_file() def test_unlink_pid_file_notexist(self): dummy_data = 'dummy_data\n' open(self.pid_file, "w").write(dummy_data) unlink_pid_file("no_such_pid_file") # the file specified for unlink_pid_file doesn't exist, # and the original content of the file should be intact. self.assertEqual(dummy_data, open(self.pid_file, "r").read()) def test_dump_pid_with_none(self): # Check the behavior of dump_pid() and unlink_pid_file() with None. # This should be no-op. dump_pid(None) self.assertFalse(os.path.exists(self.pid_file)) dummy_data = 'dummy_data\n' open(self.pid_file, "w").write(dummy_data) unlink_pid_file(None) self.assertEqual(dummy_data, open(self.pid_file, "r").read()) def test_dump_pid_failure(self): # the attempt to open file will fail, which should result in exception. self.assertRaises(IOError, dump_pid, 'nonexistent_dir' + os.sep + 'bind10.pid') # TODO: Do we want brittle mode? Probably yes. So we need to re-enable to after that. @unittest.skip("Brittle mode temporarily broken") class TestBrittle(unittest.TestCase): def test_brittle_disabled(self): bob = MockBob() bob.start_all_processes() bob.runnable = True bob.reap_children() self.assertTrue(bob.runnable) def simulated_exit(self): ret_val = self.exit_info self.exit_info = (0, 0) return ret_val def test_brittle_enabled(self): bob = MockBob() bob.start_all_processes() bob.runnable = True bob.brittle = True self.exit_info = (5, 0) bob._get_process_exit_status = self.simulated_exit old_stdout = sys.stdout sys.stdout = open("/dev/null", "w") bob.reap_children() sys.stdout = old_stdout self.assertFalse(bob.runnable) class TestBossComponents(unittest.TestCase): """ Test the boss propagates component configuration properly to the component configurator and acts sane. """ def setUp(self): self.__param = None self.__called = False self.__compconfig = { 'comp': { 'kind': 'needed', 'process': 'cat' } } def __unary_hook(self, param): """ A hook function that stores the parameter for later examination. """ self.__param = param def __nullary_hook(self): """ A hook function that notes down it was called. """ self.__called = True def __check_core(self, config): """ A function checking that the config contains parts for the valid core component configuration. """ self.assertIsNotNone(config) for component in ['sockcreator', 'msgq', 'cfgmgr']: self.assertTrue(component in config) self.assertEqual(component, config[component]['special']) self.assertEqual('core', config[component]['kind']) def __check_extended(self, config): """ This checks that the config contains the core and one more component. """ self.__check_core(config) self.assertTrue('comp' in config) self.assertEqual('cat', config['comp']['process']) self.assertEqual('needed', config['comp']['kind']) self.assertEqual(4, len(config)) def test_correct_run(self): """ Test the situation when we run in usual scenario, nothing fails, we just start, reconfigure and then stop peacefully. """ bob = MockBob() # Start it orig = bob._component_configurator.startup bob._component_configurator.startup = self.__unary_hook bob.start_all_processes() bob._component_configurator.startup = orig self.__check_core(self.__param) self.assertEqual(3, len(self.__param)) # Reconfigure it self.__param = None orig = bob._component_configurator.reconfigure bob._component_configurator.reconfigure = self.__unary_hook # Otherwise it does not work bob.runnable = True bob.config_handler({'components': self.__compconfig}) self.__check_extended(self.__param) currconfig = self.__param # If we reconfigure it, but it does not contain the components part, # nothing is called bob.config_handler({}) self.assertEqual(self.__param, currconfig) self.__param = None bob._component_configurator.reconfigure = orig # Check a configuration that messes up the core components is rejected. compconf = dict(self.__compconfig) compconf['msgq'] = { 'process': 'echo' } result = bob.config_handler({'components': compconf}) # Check it rejected it self.assertEqual(1, result['result'][0]) # We can't call shutdown, that one relies on the stuff in main # We check somewhere else that the shutdown is actually called # from there (the test_kills). def test_kills(self): """ Test that the boss kills processes which don't want to stop. """ bob = MockBob() killed = [] class ImmortalComponent: """ An immortal component. It does not stop when it is told so (anyway it is not told so). It does not die if it is killed the first time. It dies only when killed forcefully. """ def kill(self, forcefull=False): killed.append(forcefull) if forcefull: bob.processes = {} def pid(self): return 1 def name(self): return "Immortal" bob.processes = {} bob.register_process(1, ImmortalComponent()) # While at it, we check the configurator shutdown is actually called orig = bob._component_configurator.shutdown bob._component_configurator.shutdown = self.__nullary_hook self.__called = False bob.shutdown() self.assertEqual([False, True], killed) self.assertTrue(self.__called) bob._component_configurator.shutdown = orig def test_component_shutdown(self): """ Test the component_shutdown sets all variables accordingly. """ bob = MockBob() self.assertRaises(Exception, bob.component_shutdown, 1) self.assertEqual(1, bob.exitcode) bob._BoB__started = True bob.runnable = True bob.component_shutdown(2) self.assertEqual(2, bob.exitcode) self.assertFalse(bob.runnable) def test_init_config(self): """ Test initial configuration is loaded. """ bob = MockBob() # Start it bob._component_configurator.reconfigure = self.__unary_hook # We need to return the original read_bind10_config bob.read_bind10_config = lambda: BoB.read_bind10_config(bob) # And provide a session to read the data from class CC: pass bob.ccs = CC() bob.ccs.get_full_config = lambda: {'components': self.__compconfig} bob.start_all_processes() self.__check_extended(self.__param) if __name__ == '__main__': # store os.environ for test_unchanged_environment original_os_environ = copy.deepcopy(os.environ) isc.log.resetUnitTestRootLogger() unittest.main()