bind10.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. """\
  2. This file implements the Boss of Bind (BoB, or bob) program.
  3. It's purpose is to start up the BIND 10 system, and then manage the
  4. processes, by starting and stopping processes, plus restarting
  5. processes that exit.
  6. To start the system, it first runs the c-channel program (msgq), then
  7. connects to that. It then runs the configuration manager, and reads
  8. its own configuration. Then it proceeds to starting other modules.
  9. The Python subprocess module is used for starting processes, but
  10. because this is not efficient for managing groups of processes,
  11. SIGCHLD signals are caught and processed using the signal module.
  12. Most of the logic is contained in the BoB class. However, since Python
  13. requires that signal processing happen in the main thread, we do
  14. signal handling outside of that class, in the code running for
  15. __main__.
  16. """
  17. # TODO: start up statistics thingy
  18. import subprocess
  19. import signal
  20. import os
  21. import sys
  22. import re
  23. import errno
  24. import time
  25. import select
  26. import pprint
  27. from optparse import OptionParser, OptionValueError
  28. import ISC.CC
  29. # This is the version that gets displayed to the user.
  30. __version__ = "v20091030 (Paving the DNS Parking Lot)"
  31. # Nothing at all to do with the 1990-12-10 article here:
  32. # http://www.subgenius.com/subg-digest/v2/0056.html
  33. class ProcessInfo:
  34. """Information about a process"""
  35. dev_null = open("/dev/null", "w")
  36. def _spawn(self):
  37. if self.dev_null_stdout:
  38. spawn_stdout = self.dev_null
  39. else:
  40. spawn_stdout = None
  41. spawn_env = self.env
  42. spawn_env['PATH'] = os.environ['PATH']
  43. spawn_env['PYTHON_EXEC'] = os.environ['PYTHON_EXEC']
  44. self.process = subprocess.Popen(self.args,
  45. stdin=subprocess.PIPE,
  46. stdout=spawn_stdout,
  47. stderr=spawn_stdout,
  48. close_fds=True,
  49. env=spawn_env,)
  50. self.pid = self.process.pid
  51. def __init__(self, name, args, env={}, dev_null_stdout=False):
  52. self.name = name
  53. self.args = args
  54. self.env = env
  55. self.dev_null_stdout = dev_null_stdout
  56. self._spawn()
  57. def respawn(self):
  58. self._spawn()
  59. class BoB:
  60. """Boss of BIND class."""
  61. def __init__(self, c_channel_port=9912, verbose=False):
  62. """Initialize the Boss of BIND. This is a singleton (only one
  63. can run).
  64. The c_channel_port specifies the TCP/IP port that the msgq
  65. process listens on. If verbose is True, then the boss reports
  66. what it is doing.
  67. """
  68. self.verbose = True
  69. self.c_channel_port = c_channel_port
  70. self.cc_session = None
  71. self.processes = {}
  72. self.dead_processes = {}
  73. self.runnable = False
  74. def startup(self):
  75. """Start the BoB instance.
  76. Returns None if successful, otherwise an string describing the
  77. problem.
  78. """
  79. # start the c-channel daemon
  80. if self.verbose:
  81. sys.stdout.write("Starting msgq using port %d\n" %
  82. self.c_channel_port)
  83. c_channel_env = { "ISC_MSGQ_PORT": str(self.c_channel_port), }
  84. try:
  85. #c_channel = ProcessInfo("msgq", "msgq", c_channel_env, True)
  86. c_channel = ProcessInfo("msgq", "msgq", c_channel_env)
  87. except Exception as e:
  88. return "Unable to start msgq; " + str(e)
  89. self.processes[c_channel.pid] = c_channel
  90. if self.verbose:
  91. sys.stdout.write("Started msgq (PID %d)\n" % c_channel.pid)
  92. # now connect to the c-channel
  93. cc_connect_start = time.time()
  94. while self.cc_session is None:
  95. # if we have been trying for "a while" give up
  96. if (time.time() - cc_connect_start) > 5:
  97. c_channel.kill()
  98. return "Unable to connect to c-channel after 5 seconds"
  99. # try to connect, and if we can't wait a short while
  100. try:
  101. self.cc_session = ISC.CC.Session(self.c_channel_port)
  102. except ISC.CC.session.SessionError:
  103. time.sleep(0.1)
  104. self.cc_session.group_subscribe("Boss", "boss")
  105. # start the configuration manager
  106. if self.verbose:
  107. sys.stdout.write("Starting bind-cfgd\n")
  108. try:
  109. bind_cfgd = ProcessInfo("bind-cfgd", "bind-cfgd")
  110. except Exception as e:
  111. c_channel.process.kill()
  112. return "Unable to start bind-cfgd; " + str(e)
  113. self.processes[bind_cfgd.pid] = bind_cfgd
  114. if self.verbose:
  115. sys.stdout.write("Started bind-cfgd (PID %d)\n" % bind_cfgd.pid)
  116. # start the parking lot
  117. # XXX: this must be read from the configuration manager in the future
  118. # XXX: we hardcode port 5300
  119. if self.verbose:
  120. sys.stdout.write("Starting parkinglot on port 5300\n")
  121. try:
  122. parkinglot = ProcessInfo("parkinglot", ["parkinglot", "-p", "5300"])
  123. except Exception as e:
  124. c_channel.kill()
  125. bind_cfgd.kill()
  126. return "Unable to start parkinglot; " + str(e)
  127. self.processes[parkinglot.pid] = parkinglot
  128. if self.verbose:
  129. sys.stdout.write("Started parkinglot (PID %d)\n" % parkinglot.pid)
  130. self.runnable = True
  131. return None
  132. def stop_all_processes(self):
  133. """Stop all processes."""
  134. cmd = { "command": "shutdown" }
  135. self.cc_session.group_sendmsg(cmd, "Boss", "*")
  136. def stop_process(self, process):
  137. """Stop the given process, friendly-like."""
  138. # XXX nothing yet
  139. pass
  140. def shutdown(self):
  141. """Stop the BoB instance."""
  142. if self.verbose:
  143. sys.stdout.write("Stopping the server.\n")
  144. # first try using the BIND 10 request to stop
  145. try:
  146. self.stop_all_processes()
  147. except:
  148. pass
  149. # XXX: some delay probably useful... how much is uncertain
  150. time.sleep(0.1)
  151. # next try sending a SIGTERM
  152. processes_to_stop = list(self.processes.values())
  153. unstopped_processes = []
  154. for proc_info in processes_to_stop:
  155. if self.verbose:
  156. sys.stdout.write("Sending SIGTERM to %s (PID %d).\n" %
  157. (proc_info.name, proc_info.pid))
  158. try:
  159. proc_info.process.terminate()
  160. except OSError as o:
  161. # ignore these (usually ESRCH because the child
  162. # finally exited)
  163. pass
  164. # XXX: some delay probably useful... how much is uncertain
  165. time.sleep(0.1)
  166. for proc_info in processes_to_stop:
  167. (pid, exit_status) = os.waitpid(proc_info.pid, os.WNOHANG)
  168. if pid == 0:
  169. unstopped_processes.append(proc_info)
  170. # finally, send a SIGKILL (unmaskable termination)
  171. processes_to_stop = unstopped_processes
  172. for proc_info in processes_to_stop:
  173. if self.verbose:
  174. sys.stdout.write("Sending SIGKILL to %s (PID %d).\n" %
  175. (proc_info.name, proc_info.pid))
  176. try:
  177. proc_info.process.kill()
  178. except OSError as o:
  179. # ignore these (usually ESRCH because the child
  180. # finally exited)
  181. pass
  182. if self.verbose:
  183. sys.stdout.write("All processes ended, server done.\n")
  184. def reap(self, pid, exit_status):
  185. """The process specified by pid has exited with the value
  186. exit_status, so perform any action necessary (cleanup,
  187. restart, and so on).
  188. Returns True if everything is okay, or False if a fatal error
  189. has been detected and the program should exit.
  190. """
  191. if not pid in self.processes:
  192. sys.stdout.write("Unknown child pid %d exited.\n" % pid)
  193. return
  194. proc_info = self.processes.pop(pid)
  195. self.dead_processes[proc_info.pid] = proc_info
  196. if self.verbose:
  197. sys.stdout.write("Process %s (PID %d) died.\n" %
  198. (proc_info.name, proc_info.pid))
  199. if proc_info.name == "msgq":
  200. if self.verbose:
  201. sys.stdout.write("The msgq process died, shutting down.\n")
  202. self.runnable = False
  203. def recv_and_process_cc_msg(self):
  204. """Receive and process the next message on the c-channel,
  205. if any."""
  206. # XXX: this needs to be made more robust for handling
  207. # badly formatted messages
  208. msg, data = self.cc_session.group_recvmsg(False)
  209. if msg is None:
  210. return
  211. msg_from = data.get('from', '')
  212. if (type(msg) is dict) and (type(data) is dict):
  213. if "command" in msg:
  214. cmd = msg['command']
  215. if (cmd[0] == "boss") and (cmd[1] == "shutdown"):
  216. if self.verbose:
  217. sys.stdout.write("Shutdown command received\n")
  218. self.runnable = False
  219. else:
  220. if self.verbose:
  221. sys.stdout.write("Unknown command %s\n" % str(cmd))
  222. else:
  223. if self.verbose:
  224. del data['msg']
  225. sys.stdout.write("Unknown message received\n")
  226. sys.stdout.write(pprint.pformat(data) + "\n")
  227. sys.stdout.write(pprint.pformat(msg) + "\n")
  228. else:
  229. if self.verbose:
  230. sys.stdout.write("Non-dictionary message\n")
  231. def restart_processes(self):
  232. """Restart any dead processes."""
  233. # XXX: this needs a back-off algorithm
  234. still_dead = {}
  235. for proc_info in self.dead_processes.values():
  236. if self.verbose:
  237. sys.stdout.write("Resurrecting dead %s process...\n" %
  238. proc_info.name)
  239. try:
  240. proc_info.respawn()
  241. self.processes[proc_info.pid] = proc_info
  242. if self.verbose:
  243. sys.stdout.write("Resurrected %s (PID %d)\n" %
  244. (proc_info.name, proc_info.pid))
  245. except:
  246. still_dead[proc_info.pid] = proc_info
  247. # remember any processes that refuse to be resurrected
  248. self.dead_processes = still_dead
  249. if __name__ == "__main__":
  250. def reaper(signal_number, stack_frame):
  251. """A child process has died (SIGCHLD received)."""
  252. global boss_of_bind
  253. while True:
  254. try:
  255. (pid, exit_status) = os.waitpid(-1, os.WNOHANG)
  256. except OSError as o:
  257. if o.errno == errno.ECHILD: break
  258. raise
  259. if pid == 0: break
  260. if boss_of_bind:
  261. boss_of_bind.reap(pid, exit_status)
  262. def get_signame(signal_number):
  263. """Return the symbolic name for a signal."""
  264. for sig in dir(signal):
  265. if sig.startswith("SIG") and sig[3].isalnum():
  266. if getattr(signal, sig) == signal_number:
  267. return sig
  268. return "Unknown signal %d" % signal_number
  269. # XXX: perhaps register atexit() function and invoke that instead
  270. def fatal_signal(signal_number, stack_frame):
  271. """We need to exit (SIGINT or SIGTERM received)."""
  272. global options
  273. if options.verbose:
  274. sys.stdout.write("Received %s.\n" % get_signame(signal_number))
  275. signal.signal(signal.SIGCHLD, signal.SIG_DFL)
  276. boss_of_bind.runnable = False
  277. def check_port(option, opt_str, value, parser):
  278. """Function to insure that the port we are passed is actually
  279. a valid port number. Used by OptionParser() on startup."""
  280. if not re.match('^(6553[0-5]|655[0-2]\d|65[0-4]\d\d|6[0-4]\d{3}|[1-5]\d{4}|[1-9]\d{0,3}|0)$', value):
  281. raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
  282. parser.values.msgq_port = value
  283. # Parse any command-line options.
  284. parser = OptionParser(version=__version__)
  285. parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
  286. help="display more about what is going on")
  287. parser.add_option("-m", "--msgq-port", dest="msgq_port", type="string",
  288. action="callback", callback=check_port, default="9912",
  289. help="port the msgq daemon will use")
  290. (options, args) = parser.parse_args()
  291. # Announce startup.
  292. if options.verbose:
  293. sys.stdout.write("BIND 10 %s\n" % __version__)
  294. # TODO: set process name, perhaps by:
  295. # http://code.google.com/p/procname/
  296. # http://github.com/lericson/procname/
  297. # Create wakeup pipe for signal handlers
  298. wakeup_pipe = os.pipe()
  299. signal.set_wakeup_fd(wakeup_pipe[1])
  300. # Set signal handlers for catching child termination, as well
  301. # as our own demise.
  302. signal.signal(signal.SIGCHLD, reaper)
  303. signal.siginterrupt(signal.SIGCHLD, False)
  304. signal.signal(signal.SIGINT, fatal_signal)
  305. signal.signal(signal.SIGTERM, fatal_signal)
  306. # Go bob!
  307. boss_of_bind = BoB(int(options.msgq_port), options.verbose)
  308. startup_result = boss_of_bind.startup()
  309. if startup_result:
  310. sys.stderr.write("Error on startup: %s\n" % startup_result)
  311. sys.exit(1)
  312. # In our main loop, we check for dead processes or messages
  313. # on the c-channel.
  314. event_poller = select.poll()
  315. wakeup_fd = wakeup_pipe[0]
  316. event_poller.register(wakeup_fd, select.POLLIN)
  317. cc_fd = boss_of_bind.cc_session._socket.fileno()
  318. event_poller.register(cc_fd, select.POLLIN)
  319. while boss_of_bind.runnable:
  320. # XXX: get time for next restart for poll
  321. # poll() can raise EINTR when a signal arrives,
  322. # even if they are resumable, so we have to catch
  323. # the exception
  324. try:
  325. events = event_poller.poll()
  326. except select.error as err:
  327. if err.args[0] == errno.EINTR:
  328. events = []
  329. else:
  330. sys.stderr.write("Error with poll(); %s\n" % err)
  331. break
  332. for (fd, event) in events:
  333. if fd == cc_fd:
  334. boss_of_bind.recv_and_process_cc_msg()
  335. elif fd == wakeup_fd:
  336. os.read(wakeup_fd, 32)
  337. boss_of_bind.restart_processes()
  338. # shutdown
  339. signal.signal(signal.SIGCHLD, signal.SIG_DFL)
  340. boss_of_bind.shutdown()