#!@PYTHON@ # Copyright (C) 2010 Internet Systems Consortium. # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """\ This file implements the Boss of Bind (BoB, or bob) program. Its purpose is to start up the BIND 10 system, and then manage the processes, by starting and stopping processes, plus restarting processes that exit. To start the system, it first runs the c-channel program (msgq), then connects to that. It then runs the configuration manager, and reads its own configuration. Then it proceeds to starting other modules. The Python subprocess module is used for starting processes, but because this is not efficient for managing groups of processes, SIGCHLD signals are caught and processed using the signal module. Most of the logic is contained in the BoB class. However, since Python requires that signal processing happen in the main thread, we do signal handling outside of that class, in the code running for __main__. """ import sys; sys.path.append ('@@PYTHONPATH@@') import os # If B10_FROM_SOURCE is set in the environment, we use data files # from a directory relative to that, otherwise we use the ones # installed on the system if "B10_FROM_SOURCE" in os.environ: SPECFILE_LOCATION = os.environ["B10_FROM_SOURCE"] + "/src/bin/bind10/bob.spec" else: PREFIX = "@prefix@" DATAROOTDIR = "@datarootdir@" SPECFILE_LOCATION = "@datadir@/@PACKAGE@/bob.spec".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX) import subprocess import signal import re import errno import time import select import random import socket from optparse import OptionParser, OptionValueError import io import pwd import posix import isc.cc # This is the version that gets displayed to the user. # The VERSION string consists of the module name, the module version # number, and the overall BIND 10 version number (set in configure.ac). VERSION = "bind10 20100916 (BIND 10 @PACKAGE_VERSION@)" class RestartSchedule: """ Keeps state when restarting something (in this case, a process). When a process dies unexpectedly, we need to restart it. However, if it fails to restart for some reason, then we should not simply keep restarting it at high speed. A more sophisticated algorithm can be developed, but for now we choose a simple set of rules: * If a process was been running for >=10 seconds, we restart it right away. * If a process was running for <10 seconds, we wait until 10 seconds after it was started. To avoid programs getting into lockstep, we use a normal distribution to avoid being restarted at exactly 10 seconds.""" def __init__(self, restart_frequency=10.0): self.restart_frequency = restart_frequency self.run_start_time = None self.run_stop_time = None self.restart_time = None def set_run_start_time(self, when=None): if when is None: when = time.time() self.run_start_time = when sigma = self.restart_frequency * 0.05 self.restart_time = when + random.normalvariate(self.restart_frequency, sigma) def set_run_stop_time(self, when=None): """We don't actually do anything with stop time now, but it might be useful for future algorithms.""" if when is None: when = time.time() self.run_stop_time = when def get_restart_time(self, when=None): if when is None: when = time.time() return max(when, self.restart_time) class ProcessInfoError(Exception): pass class ProcessInfo: """Information about a process""" dev_null = open(os.devnull, "w") def __init__(self, name, args, env={}, dev_null_stdout=False, dev_null_stderr=False, uid=None, username=None): self.name = name self.args = args self.env = env self.dev_null_stdout = dev_null_stdout self.dev_null_stderr = dev_null_stderr self.restart_schedule = RestartSchedule() self.uid = uid self.username = username self._spawn() def _setuid(self): """Function used before running a program that needs to run as a different user.""" if self.uid is not None: try: posix.setuid(self.uid) except OSError as e: if e.errno == errno.EPERM: # if we failed to change user due to permission report that raise ProcessInfoError("Unable to change to user %s (uid %d)" % (self.username, self.uid)) else: # otherwise simply re-raise whatever error we found raise def _spawn(self): if self.dev_null_stdout: spawn_stdout = self.dev_null else: spawn_stdout = None if self.dev_null_stderr: spawn_stderr = self.dev_null else: spawn_stderr = None # Environment variables for the child process will be a copy of those # of the boss process with any additional specific variables given # on construction (self.env). spawn_env = os.environ spawn_env.update(self.env) if 'B10_FROM_SOURCE' not in os.environ: spawn_env['PATH'] = "@@LIBEXECDIR@@:" + spawn_env['PATH'] self.process = subprocess.Popen(self.args, stdin=subprocess.PIPE, stdout=spawn_stdout, stderr=spawn_stderr, close_fds=True, env=spawn_env, preexec_fn=self._setuid) self.pid = self.process.pid self.restart_schedule.set_run_start_time() def respawn(self): self._spawn() class IPAddr: """Stores an IPv4 or IPv6 address.""" family = None addr = None def __init__(self, addr): try: a = socket.inet_pton(socket.AF_INET, addr) self.family = socket.AF_INET self.addr = a return except: pass try: a = socket.inet_pton(socket.AF_INET6, addr) self.family = socket.AF_INET6 self.addr = a return except Exception as e: raise e def __str__(self): return socket.inet_ntop(self.family, self.addr) class BoB: """Boss of BIND class.""" def __init__(self, msgq_socket_file=None, auth_port=5300, address='', nocache=False, verbose=False, setuid=None, username=None): """Initialize the Boss of BIND. This is a singleton (only one can run). The msgq_socket_file specifies the UNIX domain socket file that the msgq process listens on. If verbose is True, then the boss reports what it is doing. """ self.verbose = verbose self.msgq_socket_file = msgq_socket_file self.auth_port = auth_port self.address = None if address: self.address = IPAddr(address) self.cc_session = None self.ccs = None self.processes = {} self.dead_processes = {} self.runnable = False self.uid = setuid self.username = username self.nocache = nocache def config_handler(self, new_config): if self.verbose: sys.stdout.write("[bind10] handling new config:\n") sys.stdout.write(new_config + "\n") answer = isc.config.ccsession.create_answer(0) return answer # TODO def command_handler(self, command, args): if self.verbose: sys.stdout.write("[bind10] Boss got command:\n") sys.stdout.write(command + "\n") answer = isc.config.ccsession.create_answer(1, "command not implemented") if type(command) != str: answer = isc.config.ccsession.create_answer(1, "bad command") else: cmd = command if cmd == "shutdown": sys.stdout.write("[bind10] got shutdown command\n") self.runnable = False answer = isc.config.ccsession.create_answer(0) else: answer = isc.config.ccsession.create_answer(1, "Unknown command") return answer def startup(self): """Start the BoB instance. Returns None if successful, otherwise an string describing the problem. """ # try to connect to the c-channel daemon, # to see if it is already running c_channel_env = {} if self.msgq_socket_file is not None: c_channel_env["BIND10_MSGQ_SOCKET_FILE"] = self.msgq_socket_file if self.verbose: sys.stdout.write("[bind10] Checking for already running b10-msgq\n") # try to connect, and if we can't wait a short while try: self.cc_session = isc.cc.Session(self.msgq_socket_file) return "b10-msgq already running, or socket file not cleaned , cannot start" except isc.cc.session.SessionError: # this is the case we want, where the msgq is not running pass # start the c-channel daemon if self.verbose: if self.msgq_socket_file: sys.stdout.write("[bind10] Starting b10-msgq\n") try: c_channel = ProcessInfo("b10-msgq", ["b10-msgq"], c_channel_env, True, not self.verbose, uid=self.uid, username=self.username) except Exception as e: return "Unable to start b10-msgq; " + str(e) self.processes[c_channel.pid] = c_channel if self.verbose: sys.stdout.write("[bind10] Started b10-msgq (PID %d)\n" % c_channel.pid) # now connect to the c-channel cc_connect_start = time.time() while self.cc_session is None: # if we have been trying for "a while" give up if (time.time() - cc_connect_start) > 5: c_channel.process.kill() return "Unable to connect to c-channel after 5 seconds" # try to connect, and if we can't wait a short while try: self.cc_session = isc.cc.Session(self.msgq_socket_file) except isc.cc.session.SessionError: time.sleep(0.1) # start the configuration manager if self.verbose: sys.stdout.write("[bind10] Starting b10-cfgmgr\n") try: bind_cfgd = ProcessInfo("b10-cfgmgr", ["b10-cfgmgr"], c_channel_env, uid=self.uid, username=self.username) except Exception as e: c_channel.process.kill() return "Unable to start b10-cfgmgr; " + str(e) self.processes[bind_cfgd.pid] = bind_cfgd if self.verbose: sys.stdout.write("[bind10] Started b10-cfgmgr (PID %d)\n" % bind_cfgd.pid) # sleep until b10-cfgmgr is fully up and running, this is a good place # to have a (short) timeout on synchronized groupsend/receive # TODO: replace the sleep by a listen for ConfigManager started # message time.sleep(1) if self.verbose: sys.stdout.write("[bind10] starting ccsession\n") self.ccs = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler) self.ccs.start() if self.verbose: sys.stdout.write("[bind10] ccsession started\n") # start b10-auth # XXX: this must be read from the configuration manager in the future authargs = ['b10-auth', '-p', str(self.auth_port)] if self.address: authargs += ['-a', str(self.address)] if self.nocache: authargs += ['-n'] if self.uid: authargs += ['-u', str(self.uid)] if self.verbose: authargs += ['-v'] sys.stdout.write("Starting b10-auth using port %d" % self.auth_port) if self.address: sys.stdout.write(" on %s" % str(self.address)) sys.stdout.write("\n") try: auth = ProcessInfo("b10-auth", authargs, c_channel_env) except Exception as e: c_channel.process.kill() bind_cfgd.process.kill() xfrout.process.kill() return "Unable to start b10-auth; " + str(e) self.processes[auth.pid] = auth if self.verbose: sys.stdout.write("[bind10] Started b10-auth (PID %d)\n" % auth.pid) # everything after the authoritative server can run as non-root if self.uid is not None: posix.setuid(self.uid) # start the xfrout before auth-server, to make sure every xfr-query can # be processed properly. xfrout_args = ['b10-xfrout'] if self.verbose: sys.stdout.write("[bind10] Starting b10-xfrout\n") xfrout_args += ['-v'] try: xfrout = ProcessInfo("b10-xfrout", xfrout_args, c_channel_env ) except Exception as e: c_channel.process.kill() bind_cfgd.process.kill() return "Unable to start b10-xfrout; " + str(e) self.processes[xfrout.pid] = xfrout if self.verbose: sys.stdout.write("[bind10] Started b10-xfrout (PID %d)\n" % xfrout.pid) # start b10-xfrin xfrin_args = ['b10-xfrin'] if self.verbose: sys.stdout.write("[bind10] Starting b10-xfrin\n") xfrin_args += ['-v'] try: xfrind = ProcessInfo("b10-xfrin", xfrin_args, c_channel_env) except Exception as e: c_channel.process.kill() bind_cfgd.process.kill() xfrout.process.kill() auth.process.kill() return "Unable to start b10-xfrin; " + str(e) self.processes[xfrind.pid] = xfrind if self.verbose: sys.stdout.write("[bind10] Started b10-xfrin (PID %d)\n" % xfrind.pid) # start b10-zonemgr zonemgr_args = ['b10-zonemgr'] if self.verbose: sys.stdout.write("[bind10] Starting b10-zonemgr\n") zonemgr_args += ['-v'] try: zonemgr = ProcessInfo("b10-zonemgr", zonemgr_args, c_channel_env) except Exception as e: c_channel.process.kill() bind_cfgd.process.kill() xfrout.process.kill() auth.process.kill() xfrind.process.kill() return "Unable to start b10-zonemgr; " + str(e) self.processes[zonemgr.pid] = zonemgr if self.verbose: sys.stdout.write("[bind10] Started b10-zonemgr(PID %d)\n" % zonemgr.pid) # start the b10-cmdctl # XXX: we hardcode port 8080 cmdctl_args = ['b10-cmdctl'] if self.verbose: sys.stdout.write("[bind10] Starting b10-cmdctl on port 8080\n") cmdctl_args += ['-v'] try: cmd_ctrld = ProcessInfo("b10-cmdctl", cmdctl_args, c_channel_env) except Exception as e: c_channel.process.kill() bind_cfgd.process.kill() xfrout.process.kill() auth.process.kill() xfrind.process.kill() zonemgr.process.kill() return "Unable to start b10-cmdctl; " + str(e) self.processes[cmd_ctrld.pid] = cmd_ctrld if self.verbose: sys.stdout.write("[bind10] Started b10-cmdctl (PID %d)\n" % cmd_ctrld.pid) self.runnable = True return None def stop_all_processes(self): """Stop all processes.""" cmd = { "command": ['shutdown']} self.cc_session.group_sendmsg(cmd, 'Boss', 'Cmdctl') self.cc_session.group_sendmsg(cmd, "Boss", "ConfigManager") self.cc_session.group_sendmsg(cmd, "Boss", "Auth") self.cc_session.group_sendmsg(cmd, "Boss", "Xfrout") self.cc_session.group_sendmsg(cmd, "Boss", "Xfrin") self.cc_session.group_sendmsg(cmd, "Boss", "Zonemgr") def stop_process(self, process): """Stop the given process, friendly-like.""" # XXX nothing yet pass def shutdown(self): """Stop the BoB instance.""" if self.verbose: sys.stdout.write("[bind10] Stopping the server.\n") # first try using the BIND 10 request to stop try: self.stop_all_processes() except: pass # XXX: some delay probably useful... how much is uncertain time.sleep(0.5) self.reap_children() # next try sending a SIGTERM processes_to_stop = list(self.processes.values()) for proc_info in processes_to_stop: if self.verbose: sys.stdout.write("[bind10] Sending SIGTERM to %s (PID %d).\n" % (proc_info.name, proc_info.pid)) try: proc_info.process.terminate() except OSError: # ignore these (usually ESRCH because the child # finally exited) pass # finally, send SIGKILL (unmaskable termination) until everybody dies while self.processes: # XXX: some delay probably useful... how much is uncertain time.sleep(0.1) self.reap_children() processes_to_stop = list(self.processes.values()) for proc_info in processes_to_stop: if self.verbose: sys.stdout.write("[bind10] Sending SIGKILL to %s (PID %d).\n" % (proc_info.name, proc_info.pid)) try: proc_info.process.kill() except OSError: # ignore these (usually ESRCH because the child # finally exited) pass if self.verbose: sys.stdout.write("[bind10] All processes ended, server done.\n") def reap_children(self): """Check to see if any of our child processes have exited, and note this for later handling. """ while True: try: (pid, exit_status) = os.waitpid(-1, os.WNOHANG) except OSError as o: if o.errno == errno.ECHILD: break # XXX: should be impossible to get any other error here raise if pid == 0: break if pid in self.processes: proc_info = self.processes.pop(pid) proc_info.restart_schedule.set_run_stop_time() self.dead_processes[proc_info.pid] = proc_info if self.verbose: sys.stdout.write("[bind10] Process %s (PID %d) died.\n" % (proc_info.name, proc_info.pid)) if proc_info.name == "b10-msgq": if self.verbose and self.runnable: sys.stdout.write( "[bind10] The b10-msgq process died, shutting down.\n") self.runnable = False else: sys.stdout.write("[bind10] Unknown child pid %d exited.\n" % pid) def restart_processes(self): """Restart any dead processes. Returns the time when the next process is ready to be restarted. If the server is shutting down, returns 0. If there are no processes, returns None. The values returned can be safely passed into select() as the timeout value.""" next_restart = None # if we're shutting down, then don't restart if not self.runnable: return 0 # otherwise look through each dead process and try to restart still_dead = {} now = time.time() for proc_info in self.dead_processes.values(): restart_time = proc_info.restart_schedule.get_restart_time(now) if restart_time > now: if (next_restart is None) or (next_restart > restart_time): next_restart = restart_time still_dead[proc_info.pid] = proc_info else: if self.verbose: sys.stdout.write("[bind10] Resurrecting dead %s process...\n" % proc_info.name) try: proc_info.respawn() self.processes[proc_info.pid] = proc_info if self.verbose: sys.stdout.write("[bind10] Resurrected %s (PID %d)\n" % (proc_info.name, proc_info.pid)) except: still_dead[proc_info.pid] = proc_info # remember any processes that refuse to be resurrected self.dead_processes = still_dead # return the time when the next process is ready to be restarted return next_restart # global variables, needed for signal handlers options = None boss_of_bind = None def reaper(signal_number, stack_frame): """A child process has died (SIGCHLD received).""" # don't do anything... # the Python signal handler has been set up to write # down a pipe, waking up our select() bit pass def get_signame(signal_number): """Return the symbolic name for a signal.""" for sig in dir(signal): if sig.startswith("SIG") and sig[3].isalnum(): if getattr(signal, sig) == signal_number: return sig return "Unknown signal %d" % signal_number # XXX: perhaps register atexit() function and invoke that instead def fatal_signal(signal_number, stack_frame): """We need to exit (SIGINT or SIGTERM received).""" global options global boss_of_bind if options.verbose: sys.stdout.write("[bind10] Received %s.\n" % get_signame(signal_number)) signal.signal(signal.SIGCHLD, signal.SIG_DFL) boss_of_bind.runnable = False def check_port(option, opt_str, value, parser): """Function to insure that the port we are passed is actually a valid port number. Used by OptionParser() on startup.""" if not re.match('^(6553[0-5]|655[0-2]\d|65[0-4]\d\d|6[0-4]\d{3}|[1-5]\d{4}|[1-9]\d{0,3}|0)$', value): raise OptionValueError("%s requires a port number (0-65535)" % opt_str) if (opt_str == '-m' or opt_str == '--msgq-port'): parser.values.msgq_port = value elif (opt_str == '-p' or opt_str == '--port'): parser.values.auth_port = value else: raise OptionValueError("Unknown option " + opt_str) def check_addr(option, opt_str, value, parser): """Function to insure that the address we are passed is actually a valid address. Used by OptionParser() on startup.""" try: IPAddr(value) except: raise OptionValueError("%s requires a valid IPv4 or IPv6 address" % opt_str) if (opt_str == '-a' or opt_str == '--address'): parser.values.address = value else: raise OptionValueError("Unknown option " + opt_str) def main(): global options global boss_of_bind # Enforce line buffering on stdout, even when not a TTY sys.stdout = io.TextIOWrapper(sys.stdout.detach(), line_buffering=True) # Parse any command-line options. parser = OptionParser(version=VERSION) parser.add_option("-a", "--address", dest="address", type="string", action="callback", callback=check_addr, default='', help="address the b10-auth daemon will use (default: listen on all addresses)") parser.add_option("-m", "--msgq-socket-file", dest="msgq_socket_file", type="string", default=None, help="UNIX domain socket file the b10-msgq daemon will use") parser.add_option("-n", "--no-cache", action="store_true", dest="nocache", default=False, help="disable hot-spot cache in b10-auth") parser.add_option("-p", "--port", dest="auth_port", type="string", action="callback", callback=check_port, default="5300", help="port the b10-auth daemon will use (default 5300)") parser.add_option("-u", "--user", dest="user", type="string", default=None, help="Change user after startup (must run as root)") parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="display more about what is going on") (options, args) = parser.parse_args() if args: parser.print_help() sys.exit(1) # Check user ID. setuid = None username = None if options.user: # Try getting information about the user, assuming UID passed. try: pw_ent = pwd.getpwuid(int(options.user)) setuid = pw_ent.pw_uid username = pw_ent.pw_name except ValueError: pass except KeyError: pass # Next try getting information about the user, assuming user name # passed. # If the information is both a valid user name and user number, we # prefer the name because we try it second. A minor point, hopefully. try: pw_ent = pwd.getpwnam(options.user) setuid = pw_ent.pw_uid username = pw_ent.pw_name except KeyError: pass if setuid is None: sys.stderr.write("bind10: invalid user: '%s'\n" % options.user) sys.exit(1) # Announce startup. if options.verbose: sys.stdout.write("%s\n" % VERSION) # TODO: set process name, perhaps by: # http://code.google.com/p/procname/ # http://github.com/lericson/procname/ # Create wakeup pipe for signal handlers wakeup_pipe = os.pipe() signal.set_wakeup_fd(wakeup_pipe[1]) # Set signal handlers for catching child termination, as well # as our own demise. signal.signal(signal.SIGCHLD, reaper) signal.siginterrupt(signal.SIGCHLD, False) signal.signal(signal.SIGINT, fatal_signal) signal.signal(signal.SIGTERM, fatal_signal) # Go bob! boss_of_bind = BoB(options.msgq_socket_file, int(options.auth_port), options.address, options.nocache, options.verbose, setuid, username) startup_result = boss_of_bind.startup() if startup_result: sys.stderr.write("[bind10] Error on startup: %s\n" % startup_result) sys.exit(1) sys.stdout.write("[bind10] BIND 10 started\n") # In our main loop, we check for dead processes or messages # on the c-channel. wakeup_fd = wakeup_pipe[0] ccs_fd = boss_of_bind.ccs.get_socket().fileno() while boss_of_bind.runnable: # clean up any processes that exited boss_of_bind.reap_children() next_restart = boss_of_bind.restart_processes() if next_restart is None: wait_time = None else: wait_time = max(next_restart - time.time(), 0) # select() can raise EINTR when a signal arrives, # even if they are resumable, so we have to catch # the exception try: (rlist, wlist, xlist) = select.select([wakeup_fd, ccs_fd], [], [], wait_time) except select.error as err: if err.args[0] == errno.EINTR: (rlist, wlist, xlist) = ([], [], []) else: sys.stderr.write("[bind10] Error with select(); %s\n" % err) break for fd in rlist + xlist: if fd == ccs_fd: try: boss_of_bind.ccs.check_command() except isc.cc.session.ProtocolError: if options.verbose: sys.stderr.write("[bind10] msgq channel disappeared.\n") break elif fd == wakeup_fd: os.read(wakeup_fd, 32) # shutdown signal.signal(signal.SIGCHLD, signal.SIG_DFL) boss_of_bind.shutdown() sys.exit(0) if __name__ == "__main__": main()