# Copyright (C) 2011 Internet Systems Consortium. # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. from bind10_src import ProcessInfo, BoB, parse_args, dump_pid, unlink_pid_file, _BASETIME from isc.bind10.component import Component # XXX: environment tests are currently disabled, due to the preprocessor # setup that we have now complicating the environment import unittest import sys import os import signal import socket from isc.net.addr import IPAddr import time import isc import isc.log from isc.testutils.parse_args import TestOptParser, OptsError class TestProcessInfo(unittest.TestCase): def setUp(self): # redirect stdout to a pipe so we can check that our # process spawning is doing the right thing with stdout self.old_stdout = os.dup(sys.stdout.fileno()) self.pipes = os.pipe() os.dup2(self.pipes[1], sys.stdout.fileno()) os.close(self.pipes[1]) # note that we use dup2() to restore the original stdout # to the main program ASAP in each test... this prevents # hangs reading from the child process (as the pipe is only # open in the child), and also insures nice pretty output def tearDown(self): # clean up our stdout munging os.dup2(self.old_stdout, sys.stdout.fileno()) os.close(self.pipes[0]) def test_init(self): pi = ProcessInfo('Test Process', [ '/bin/echo', 'foo' ]) pi.spawn() os.dup2(self.old_stdout, sys.stdout.fileno()) self.assertEqual(pi.name, 'Test Process') self.assertEqual(pi.args, [ '/bin/echo', 'foo' ]) # self.assertEqual(pi.env, { 'PATH': os.environ['PATH'], # 'PYTHON_EXEC': os.environ['PYTHON_EXEC'] }) self.assertEqual(pi.dev_null_stdout, False) self.assertEqual(os.read(self.pipes[0], 100), b"foo\n") self.assertNotEqual(pi.process, None) self.assertTrue(type(pi.pid) is int) # def test_setting_env(self): # pi = ProcessInfo('Test Process', [ '/bin/true' ], env={'FOO': 'BAR'}) # os.dup2(self.old_stdout, sys.stdout.fileno()) # self.assertEqual(pi.env, { 'PATH': os.environ['PATH'], # 'PYTHON_EXEC': os.environ['PYTHON_EXEC'], # 'FOO': 'BAR' }) def test_setting_null_stdout(self): pi = ProcessInfo('Test Process', [ '/bin/echo', 'foo' ], dev_null_stdout=True) pi.spawn() os.dup2(self.old_stdout, sys.stdout.fileno()) self.assertEqual(pi.dev_null_stdout, True) self.assertEqual(os.read(self.pipes[0], 100), b"") def test_respawn(self): pi = ProcessInfo('Test Process', [ '/bin/echo', 'foo' ]) pi.spawn() # wait for old process to work... self.assertEqual(os.read(self.pipes[0], 100), b"foo\n") # respawn it old_pid = pi.pid pi.respawn() os.dup2(self.old_stdout, sys.stdout.fileno()) # make sure the new one started properly self.assertEqual(pi.name, 'Test Process') self.assertEqual(pi.args, [ '/bin/echo', 'foo' ]) # self.assertEqual(pi.env, { 'PATH': os.environ['PATH'], # 'PYTHON_EXEC': os.environ['PYTHON_EXEC'] }) self.assertEqual(pi.dev_null_stdout, False) self.assertEqual(os.read(self.pipes[0], 100), b"foo\n") self.assertNotEqual(pi.process, None) self.assertTrue(type(pi.pid) is int) self.assertNotEqual(pi.pid, old_pid) class TestBoB(unittest.TestCase): def test_init(self): bob = BoB() self.assertEqual(bob.verbose, False) self.assertEqual(bob.msgq_socket_file, None) self.assertEqual(bob.cc_session, None) self.assertEqual(bob.ccs, None) self.assertEqual(bob.processes, {}) self.assertEqual(bob.dead_processes, {}) self.assertEqual(bob.runnable, False) self.assertEqual(bob.uid, None) self.assertEqual(bob.username, None) self.assertEqual(bob.nocache, False) def test_init_alternate_socket(self): bob = BoB("alt_socket_file") self.assertEqual(bob.verbose, False) self.assertEqual(bob.msgq_socket_file, "alt_socket_file") self.assertEqual(bob.cc_session, None) self.assertEqual(bob.ccs, None) self.assertEqual(bob.processes, {}) self.assertEqual(bob.dead_processes, {}) self.assertEqual(bob.runnable, False) self.assertEqual(bob.uid, None) self.assertEqual(bob.username, None) self.assertEqual(bob.nocache, False) def test_command_handler(self): class DummySession(): def group_sendmsg(self, msg, group): (self.msg, self.group) = (msg, group) def group_recvmsg(self, nonblock, seq): pass bob = BoB() bob.verbose = True bob.cc_session = DummySession() # a bad command self.assertEqual(bob.command_handler(-1, None), isc.config.ccsession.create_answer(1, "bad command")) # "shutdown" command self.assertEqual(bob.command_handler("shutdown", None), isc.config.ccsession.create_answer(0)) self.assertFalse(bob.runnable) # "getstats" command self.assertEqual(bob.command_handler("getstats", None), isc.config.ccsession.create_answer(0, { "stats_data": { 'bind10.boot_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', _BASETIME) }})) # "sendstats" command self.assertEqual(bob.command_handler("sendstats", None), isc.config.ccsession.create_answer(0)) self.assertEqual(bob.cc_session.group, "Stats") self.assertEqual(bob.cc_session.msg, isc.config.ccsession.create_command( 'set', { "stats_data": { 'bind10.boot_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', _BASETIME) }})) # "ping" command self.assertEqual(bob.command_handler("ping", None), isc.config.ccsession.create_answer(0, "pong")) # "show_processes" command self.assertEqual(bob.command_handler("show_processes", None), isc.config.ccsession.create_answer(0, bob.get_processes())) # an unknown command self.assertEqual(bob.command_handler("__UNKNOWN__", None), isc.config.ccsession.create_answer(1, "Unknown command")) class MockComponent: def __init__(self, name, pid): self.name = lambda: name self.pid = lambda: pid class MockConfigurator: def startup(self, config): pass def shutdown(self): pass def reconfigure(self, config): pass # Class for testing the BoB without actually starting processes. # This is used for testing the start/stop components routines and # the BoB commands. # # Testing that external processes start is outside the scope # of the unit test, by overriding the process start methods we can check # that the right processes are started depending on the configuration # options. class MockBob(BoB): def __init__(self): BoB.__init__(self) # Set flags as to which of the overridden methods has been run. self.msgq = False self.cfgmgr = False self.ccsession = False self.auth = False self.resolver = False self.xfrout = False self.xfrin = False self.zonemgr = False self.stats = False self.stats_httpd = False self.cmdctl = False self.c_channel_env = {} self.processes = { 1: MockComponent('first', 1), 2: MockComponent('second', 2) } self._component_configurator = MockConfigurator() def read_bind10_config(self): # Configuration options are set directly pass def start_ccsession(self, c_channel_env): pass class TestBossComponents(unittest.TestCase): """ Test the boss propagates component configuration properly to the component configurator and acts sane. """ def setUp(self): self.__param = None self.__called = False self.__compconfig = { 'comp': { 'kind': 'needed', 'process': 'cat' } } def __unary_hook(self, param): """ A hook function that stores the parameter for later examination. """ self.__param = param def __nullary_hook(self): """ A hook function that notes down it was called. """ self.__called = True def __check_core(self, config): """ A function checking that the config contains parts for the valid core component configuration. """ self.assertIsNotNone(config) for component in ['sockcreator', 'msgq', 'cfgmgr']: self.assertTrue(component in config) self.assertEqual(component, config[component]['special']) self.assertEqual('core', config[component]['kind']) def __check_extended(self, config): """ This checks that the config contains the core and one more component. """ self.__check_core(config) self.assertTrue('comp' in config) self.assertEqual('cat', config['comp']['process']) self.assertEqual('needed', config['comp']['kind']) self.assertEqual(4, len(config)) def test_correct_run(self): """ Test the situation when we run in usual scenario, nothing fails, we just start, reconfigure and then stop peacefully. """ bob = MockBob() # Start it orig = bob._component_configurator.startup bob._component_configurator.startup = self.__unary_hook bob.start_all_processes() bob._component_configurator.startup = orig self.__check_core(self.__param) self.assertEqual(3, len(self.__param)) # Reconfigure it self.__param = None orig = bob._component_configurator.reconfigure bob._component_configurator.reconfigure = self.__unary_hook # Otherwise it does not work bob.runnable = True bob.config_handler({'components': self.__compconfig}) self.__check_extended(self.__param) currconfig = self.__param # If we reconfigure it, but it does not contain the components part, # nothing is called bob.config_handler({}) self.assertEqual(self.__param, currconfig) self.__param = None bob._component_configurator.reconfigure = orig # Check a configuration that messes up the core components is rejected. compconf = dict(self.__compconfig) compconf['msgq'] = { 'process': 'echo' } # Is it OK to raise, or should it catch also and convert to error # answer? self.assertRaises(Exception, bob.config_handler, {'components': compconf}) # Stop it orig = bob._component_configurator.shutdown bob._component_configurator.shutdown = self.__nullary_hook self.__called = False # We can't call shutdown, that one relies on the stuff in main bob.stop_all_processes() bob._component_configurator.shutdown = orig self.assertTrue(self.__called) def test_init_config(self): """ Test initial configuration is loaded. """ bob = MockBob() # Start it bob._component_configurator.reconfigure = self.__unary_hook # We need to return the original read_bind10_config bob.read_bind10_config = lambda: BoB.read_bind10_config(bob) # And provide a session to read the data from class CC: pass bob.ccs = CC() bob.ccs.get_full_config = lambda: {'components': self.__compconfig} bob.start_all_processes() self.__check_extended(self.__param) def test_shutdown_no_pid(self): """ Test the boss doesn't fail when ve don't have a PID of a component (which means it's not running). """ bob = MockBob() class NoComponent: def pid(self): return None bob.processes = {} bob.register_process(1, NoComponent()) bob.shutdown() class TestBossCmd(unittest.TestCase): def test_ping(self): """ Confirm simple ping command works. """ bob = MockBob() answer = bob.command_handler("ping", None) self.assertEqual(answer, {'result': [0, 'pong']}) def test_show_processes(self): """ Confirm getting a list of processes works. """ bob = MockBob() bob.processes = {} answer = bob.command_handler("show_processes", None) self.assertEqual(answer, {'result': [0, []]}) def test_show_processes_started(self): """ Confirm getting a list of processes works. """ bob = MockBob() bob.start_all_processes() answer = bob.command_handler("show_processes", None) processes = [[1, 'first'], [2, 'second']] self.assertEqual(answer, {'result': [0, processes]}) class TestParseArgs(unittest.TestCase): """ This tests parsing of arguments of the bind10 master process. """ #TODO: Write tests for the original parsing, bad options, etc. def test_no_opts(self): """ Test correct default values when no options are passed. """ options = parse_args([], TestOptParser) self.assertEqual(None, options.data_path) self.assertEqual(None, options.config_file) self.assertEqual(None, options.cmdctl_port) def test_data_path(self): """ Test it can parse the data path. """ self.assertRaises(OptsError, parse_args, ['-p'], TestOptParser) self.assertRaises(OptsError, parse_args, ['--data-path'], TestOptParser) options = parse_args(['-p', '/data/path'], TestOptParser) self.assertEqual('/data/path', options.data_path) options = parse_args(['--data-path=/data/path'], TestOptParser) self.assertEqual('/data/path', options.data_path) def test_config_filename(self): """ Test it can parse the config switch. """ self.assertRaises(OptsError, parse_args, ['-c'], TestOptParser) self.assertRaises(OptsError, parse_args, ['--config-file'], TestOptParser) options = parse_args(['-c', 'config-file'], TestOptParser) self.assertEqual('config-file', options.config_file) options = parse_args(['--config-file=config-file'], TestOptParser) self.assertEqual('config-file', options.config_file) def test_cmdctl_port(self): """ Test it can parse the command control port. """ self.assertRaises(OptsError, parse_args, ['--cmdctl-port=abc'], TestOptParser) self.assertRaises(OptsError, parse_args, ['--cmdctl-port=100000000'], TestOptParser) self.assertRaises(OptsError, parse_args, ['--cmdctl-port'], TestOptParser) options = parse_args(['--cmdctl-port=1234'], TestOptParser) self.assertEqual(1234, options.cmdctl_port) def test_brittle(self): """ Test we can use the "brittle" flag. """ options = parse_args([], TestOptParser) self.assertFalse(options.brittle) options = parse_args(['--brittle'], TestOptParser) self.assertTrue(options.brittle) class TestPIDFile(unittest.TestCase): def setUp(self): self.pid_file = '@builddir@' + os.sep + 'bind10.pid' if os.path.exists(self.pid_file): os.unlink(self.pid_file) def tearDown(self): if os.path.exists(self.pid_file): os.unlink(self.pid_file) def check_pid_file(self): # dump PID to the file, and confirm the content is correct dump_pid(self.pid_file) my_pid = os.getpid() self.assertEqual(my_pid, int(open(self.pid_file, "r").read())) def test_dump_pid(self): self.check_pid_file() # make sure any existing content will be removed open(self.pid_file, "w").write('dummy data\n') self.check_pid_file() def test_unlink_pid_file_notexist(self): dummy_data = 'dummy_data\n' open(self.pid_file, "w").write(dummy_data) unlink_pid_file("no_such_pid_file") # the file specified for unlink_pid_file doesn't exist, # and the original content of the file should be intact. self.assertEqual(dummy_data, open(self.pid_file, "r").read()) def test_dump_pid_with_none(self): # Check the behavior of dump_pid() and unlink_pid_file() with None. # This should be no-op. dump_pid(None) self.assertFalse(os.path.exists(self.pid_file)) dummy_data = 'dummy_data\n' open(self.pid_file, "w").write(dummy_data) unlink_pid_file(None) self.assertEqual(dummy_data, open(self.pid_file, "r").read()) def test_dump_pid_failure(self): # the attempt to open file will fail, which should result in exception. self.assertRaises(IOError, dump_pid, 'nonexistent_dir' + os.sep + 'bind10.pid') # TODO: Do we want brittle mode? Probably yes. So we need to re-enable to after that. @unittest.skip("Brittle mode temporarily broken") class TestBrittle(unittest.TestCase): def test_brittle_disabled(self): bob = MockBob() bob.start_all_processes() bob.runnable = True bob.reap_children() self.assertTrue(bob.runnable) def simulated_exit(self): ret_val = self.exit_info self.exit_info = (0, 0) return ret_val def test_brittle_enabled(self): bob = MockBob() bob.start_all_processes() bob.runnable = True bob.brittle = True self.exit_info = (5, 0) bob._get_process_exit_status = self.simulated_exit old_stdout = sys.stdout sys.stdout = open("/dev/null", "w") bob.reap_children() sys.stdout = old_stdout self.assertFalse(bob.runnable) if __name__ == '__main__': isc.log.resetUnitTestRootLogger() unittest.main()