bind10.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. """\
  2. This file implements the Boss of Bind (BoB, or bob) program.
  3. It's purpose is to start up the BIND 10 system, and then manage the
  4. processes, by starting and stopping processes, plus restarting
  5. processes that exit.
  6. To start the system, it first runs the c-channel program (msgq), then
  7. connects to that. It then runs the configuration manager, and reads
  8. its own configuration. Then it proceeds to starting other modules.
  9. The Python subprocess module is used for starting processes, but
  10. because this is not efficient for managing groups of processes,
  11. SIGCHLD signals are caught and processed using the signal module.
  12. Most of the logic is contained in the BoB class. However, since Python
  13. requires that signal processing happen in the main thread, we do
  14. signal handling outside of that class, in the code running for
  15. __main__.
  16. """
  17. # TODO: start up statistics thingy
  18. import subprocess
  19. import signal
  20. import os
  21. import sys
  22. import re
  23. import errno
  24. import time
  25. import select
  26. import pprint
  27. from optparse import OptionParser, OptionValueError
  28. import ISC.CC
  29. import isc
  30. # This is the version that gets displayed to the user.
  31. __version__ = "v20091030 (Paving the DNS Parking Lot)"
  32. # Nothing at all to do with the 1990-12-10 article here:
  33. # http://www.subgenius.com/subg-digest/v2/0056.html
  34. class ProcessInfo:
  35. """Information about a process"""
  36. dev_null = open("/dev/null", "w")
  37. def _spawn(self):
  38. if self.dev_null_stdout:
  39. spawn_stdout = self.dev_null
  40. else:
  41. spawn_stdout = None
  42. spawn_env = self.env
  43. spawn_env['PATH'] = os.environ['PATH']
  44. spawn_env['PYTHON_EXEC'] = os.environ['PYTHON_EXEC']
  45. self.process = subprocess.Popen(self.args,
  46. stdin=subprocess.PIPE,
  47. stdout=spawn_stdout,
  48. stderr=spawn_stdout,
  49. close_fds=True,
  50. env=spawn_env,)
  51. self.pid = self.process.pid
  52. def __init__(self, name, args, env={}, dev_null_stdout=False):
  53. self.name = name
  54. self.args = args
  55. self.env = env
  56. self.dev_null_stdout = dev_null_stdout
  57. self._spawn()
  58. def respawn(self):
  59. self._spawn()
  60. class BoB:
  61. """Boss of BIND class."""
  62. def __init__(self, c_channel_port=9912, verbose=False):
  63. """Initialize the Boss of BIND. This is a singleton (only one
  64. can run).
  65. The c_channel_port specifies the TCP/IP port that the msgq
  66. process listens on. If verbose is True, then the boss reports
  67. what it is doing.
  68. """
  69. self.verbose = verbose
  70. self.c_channel_port = c_channel_port
  71. self.cc_session = None
  72. self.processes = {}
  73. self.dead_processes = {}
  74. self.runnable = False
  75. def config_handler(self, new_config):
  76. if self.verbose:
  77. print("[XX] handling new config:")
  78. print(new_config)
  79. # TODO
  80. def command_handler(self, command):
  81. # a command is of the form [ "command", { "arg1": arg1, "arg2": arg2 } ]
  82. if self.verbose:
  83. print("[XX] Boss got command:")
  84. print(command)
  85. answer = None
  86. if type(command) != list or len(command) == 0:
  87. answer = { "result": [ 1, "bad command" ] }
  88. else:
  89. cmd = command[0]
  90. if cmd == "shutdown":
  91. print("[XX] got shutdown command")
  92. self.runnable = False
  93. answer = { "result": [ 0 ] }
  94. elif cmd == "print_message":
  95. if len(command) > 1 and type(command[1]) == dict and "message" in command[1]:
  96. print(command[1]["message"])
  97. answer = { "result": [ 0 ] }
  98. else:
  99. answer = { "result": [ 1, "Unknown command" ] }
  100. return answer
  101. def startup(self):
  102. """Start the BoB instance.
  103. Returns None if successful, otherwise an string describing the
  104. problem.
  105. """
  106. # start the c-channel daemon
  107. if self.verbose:
  108. sys.stdout.write("Starting msgq using port %d\n" %
  109. self.c_channel_port)
  110. c_channel_env = { "ISC_MSGQ_PORT": str(self.c_channel_port), }
  111. try:
  112. c_channel = ProcessInfo("msgq", "msgq", c_channel_env, True)
  113. except Exception as e:
  114. return "Unable to start msgq; " + str(e)
  115. self.processes[c_channel.pid] = c_channel
  116. if self.verbose:
  117. sys.stdout.write("Started msgq (PID %d)\n" % c_channel.pid)
  118. # now connect to the c-channel
  119. cc_connect_start = time.time()
  120. while self.cc_session is None:
  121. # if we have been trying for "a while" give up
  122. if (time.time() - cc_connect_start) > 5:
  123. c_channel.process.kill()
  124. return "Unable to connect to c-channel after 5 seconds"
  125. # try to connect, and if we can't wait a short while
  126. try:
  127. self.cc_session = ISC.CC.Session(self.c_channel_port)
  128. except ISC.CC.session.SessionError:
  129. time.sleep(0.1)
  130. #self.cc_session.group_subscribe("Boss", "boss")
  131. # start the configuration manager
  132. if self.verbose:
  133. sys.stdout.write("Starting b10-cfgmgr\n")
  134. try:
  135. bind_cfgd = ProcessInfo("b10-cfgmgr", "b10-cfgmgr")
  136. except Exception as e:
  137. c_channel.process.kill()
  138. return "Unable to start b10-cfgmgr; " + str(e)
  139. self.processes[bind_cfgd.pid] = bind_cfgd
  140. if self.verbose:
  141. sys.stdout.write("Started b10-cfgmgr (PID %d)\n" % bind_cfgd.pid)
  142. # TODO: once this interface is done, replace self.cc_session
  143. # by this one
  144. # sleep until b10-cfgmgr is fully up and running, this is a good place
  145. # to have a (short) timeout on synchronized groupsend/receive
  146. time.sleep(1)
  147. if self.verbose:
  148. print("[XX] starting ccsession")
  149. self.ccs = isc.config.CCSession("bob.spec", self.config_handler, self.command_handler)
  150. if self.verbose:
  151. print("[XX] ccsession started")
  152. # start the parking lot
  153. # XXX: this must be read from the configuration manager in the future
  154. # XXX: we hardcode port 5300
  155. if self.verbose:
  156. sys.stdout.write("Starting b10-parkinglot on port 5300\n")
  157. try:
  158. parkinglot = ProcessInfo("b10-parkinglot", ["b10-parkinglot", "-p", "5300"])
  159. except Exception as e:
  160. c_channel.process.kill()
  161. bind_cfgd.process.kill()
  162. return "Unable to start b10-parkinglot; " + str(e)
  163. self.processes[parkinglot.pid] = parkinglot
  164. if self.verbose:
  165. sys.stdout.write("Started b10-parkinglot (PID %d)\n" % parkinglot.pid)
  166. # start the b10-cmdctl
  167. # XXX: we hardcode port 8080
  168. if self.verbose:
  169. sys.stdout.write("Starting b10-cmdctl on port 8080\n")
  170. try:
  171. cmd_ctrld = ProcessInfo("b10-cmdctl", ['b10-cmdctl'])
  172. except Exception as e:
  173. c_channel.process.kill()
  174. bind_cfgd.process.kill()
  175. parkinglot.process.kill()
  176. return "Unable to start b10-cmdctl; " + str(e)
  177. self.processes[cmd_ctrld.pid] = cmd_ctrld
  178. if self.verbose:
  179. sys.stdout.write("Started b10-cmdctl (PID %d)\n" % cmd_ctrld.pid)
  180. self.runnable = True
  181. return None
  182. def stop_all_processes(self):
  183. """Stop all processes."""
  184. cmd = { "command": ['shutdown']}
  185. self.cc_session.group_sendmsg(cmd, 'Boss', 'Cmd-Ctrld')
  186. self.cc_session.group_sendmsg(cmd, "Boss", "ConfigManager")
  187. self.cc_session.group_sendmsg(cmd, "Boss", "ParkingLot")
  188. def stop_process(self, process):
  189. """Stop the given process, friendly-like."""
  190. # XXX nothing yet
  191. pass
  192. def shutdown(self):
  193. """Stop the BoB instance."""
  194. if self.verbose:
  195. sys.stdout.write("Stopping the server.\n")
  196. # first try using the BIND 10 request to stop
  197. try:
  198. self.stop_all_processes()
  199. except:
  200. pass
  201. # XXX: some delay probably useful... how much is uncertain
  202. time.sleep(0.1)
  203. self.reap_children()
  204. # next try sending a SIGTERM
  205. processes_to_stop = list(self.processes.values())
  206. unstopped_processes = []
  207. for proc_info in processes_to_stop:
  208. if self.verbose:
  209. sys.stdout.write("Sending SIGTERM to %s (PID %d).\n" %
  210. (proc_info.name, proc_info.pid))
  211. try:
  212. proc_info.process.terminate()
  213. except OSError as o:
  214. # ignore these (usually ESRCH because the child
  215. # finally exited)
  216. pass
  217. # XXX: some delay probably useful... how much is uncertain
  218. time.sleep(0.1)
  219. self.reap_children()
  220. # finally, send a SIGKILL (unmaskable termination)
  221. processes_to_stop = unstopped_processes
  222. for proc_info in processes_to_stop:
  223. if self.verbose:
  224. sys.stdout.write("Sending SIGKILL to %s (PID %d).\n" %
  225. (proc_info.name, proc_info.pid))
  226. try:
  227. proc_info.process.kill()
  228. except OSError as o:
  229. # ignore these (usually ESRCH because the child
  230. # finally exited)
  231. pass
  232. if self.verbose:
  233. sys.stdout.write("All processes ended, server done.\n")
  234. def reap_children(self):
  235. """Check to see if any of our child processes have exited,
  236. and note this for later handling.
  237. """
  238. while True:
  239. try:
  240. (pid, exit_status) = os.waitpid(-1, os.WNOHANG)
  241. except OSError as o:
  242. if o.errno == errno.ECHILD: break
  243. # XXX: should be impossible to get any other error here
  244. raise
  245. if pid == 0: break
  246. if pid in self.processes:
  247. proc_info = self.processes.pop(pid)
  248. self.dead_processes[proc_info.pid] = proc_info
  249. if self.verbose:
  250. sys.stdout.write("Process %s (PID %d) died.\n" %
  251. (proc_info.name, proc_info.pid))
  252. if proc_info.name == "msgq":
  253. if self.verbose:
  254. sys.stdout.write(
  255. "The msgq process died, shutting down.\n")
  256. self.runnable = False
  257. else:
  258. sys.stdout.write("Unknown child pid %d exited.\n" % pid)
  259. # 'old' command style, uncommented for now
  260. # move the handling below move to command_handler please
  261. #def recv_and_process_cc_msg(self):
  262. #"""Receive and process the next message on the c-channel,
  263. #if any."""
  264. #self.ccs.checkCommand()
  265. #msg, envelope = self.cc_session.group_recvmsg(False)
  266. #print(msg)
  267. #if msg is None:
  268. # return
  269. #if not ((type(msg) is dict) and (type(envelope) is dict)):
  270. # if self.verbose:
  271. # sys.stdout.write("Non-dictionary message\n")
  272. # return
  273. #if not "command" in msg:
  274. # if self.verbose:
  275. # if "msg" in envelope:
  276. # del envelope['msg']
  277. # sys.stdout.write("Unknown message received\n")
  278. # sys.stdout.write(pprint.pformat(envelope) + "\n")
  279. # sys.stdout.write(pprint.pformat(msg) + "\n")
  280. # return
  281. #cmd = msg['command']
  282. #if not (type(cmd) is list):
  283. # if self.verbose:
  284. # sys.stdout.write("Non-list command\n")
  285. # return
  286. #
  287. # done checking and extracting... time to execute the command
  288. #if cmd[0] == "shutdown":
  289. # if self.verbose:
  290. # sys.stdout.write("shutdown command received\n")
  291. # self.runnable = False
  292. # # XXX: reply here?
  293. #elif cmd[0] == "getProcessList":
  294. # if self.verbose:
  295. # sys.stdout.write("getProcessList command received\n")
  296. # live_processes = [ ]
  297. # for proc_info in processes:
  298. # live_processes.append({ "name": proc_info.name,
  299. # "args": proc_info.args,
  300. # "pid": proc_info.pid, })
  301. # dead_processes = [ ]
  302. # for proc_info in dead_processes:
  303. # dead_processes.append({ "name": proc_info.name,
  304. # "args": proc_info.args, })
  305. # cc.group_reply(envelope, { "response": cmd,
  306. # "sent": msg["sent"],
  307. # "live_processes": live_processes,
  308. # "dead_processes": dead_processes, })
  309. #else:
  310. # if self.verbose:
  311. # sys.stdout.write("Unknown command %s\n" % str(cmd))
  312. def restart_processes(self):
  313. """Restart any dead processes."""
  314. # XXX: this needs a back-off algorithm
  315. # if we're shutting down, then don't restart
  316. if not self.runnable:
  317. return
  318. # otherwise look through each dead process and try to restart
  319. still_dead = {}
  320. for proc_info in self.dead_processes.values():
  321. if self.verbose:
  322. sys.stdout.write("Resurrecting dead %s process...\n" %
  323. proc_info.name)
  324. try:
  325. proc_info.respawn()
  326. self.processes[proc_info.pid] = proc_info
  327. if self.verbose:
  328. sys.stdout.write("Resurrected %s (PID %d)\n" %
  329. (proc_info.name, proc_info.pid))
  330. except:
  331. still_dead[proc_info.pid] = proc_info
  332. # remember any processes that refuse to be resurrected
  333. self.dead_processes = still_dead
  334. def reaper(signal_number, stack_frame):
  335. """A child process has died (SIGCHLD received)."""
  336. # don't do anything...
  337. # the Python signal handler has been set up to write
  338. # down a pipe, waking up our select() bit
  339. pass
  340. def get_signame(signal_number):
  341. """Return the symbolic name for a signal."""
  342. for sig in dir(signal):
  343. if sig.startswith("SIG") and sig[3].isalnum():
  344. if getattr(signal, sig) == signal_number:
  345. return sig
  346. return "Unknown signal %d" % signal_number
  347. # XXX: perhaps register atexit() function and invoke that instead
  348. def fatal_signal(signal_number, stack_frame):
  349. """We need to exit (SIGINT or SIGTERM received)."""
  350. global options
  351. global boss_of_bind
  352. if options.verbose:
  353. sys.stdout.write("Received %s.\n" % get_signame(signal_number))
  354. signal.signal(signal.SIGCHLD, signal.SIG_DFL)
  355. boss_of_bind.runnable = False
  356. def check_port(option, opt_str, value, parser):
  357. """Function to insure that the port we are passed is actually
  358. a valid port number. Used by OptionParser() on startup."""
  359. if not re.match('^(6553[0-5]|655[0-2]\d|65[0-4]\d\d|6[0-4]\d{3}|[1-5]\d{4}|[1-9]\d{0,3}|0)$', value):
  360. raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
  361. parser.values.msgq_port = value
  362. def main():
  363. global options
  364. global boss_of_bind
  365. # Parse any command-line options.
  366. parser = OptionParser(version=__version__)
  367. parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
  368. help="display more about what is going on")
  369. parser.add_option("-m", "--msgq-port", dest="msgq_port", type="string",
  370. action="callback", callback=check_port, default="9912",
  371. help="port the msgq daemon will use")
  372. (options, args) = parser.parse_args()
  373. # Announce startup.
  374. if options.verbose:
  375. sys.stdout.write("BIND 10 %s\n" % __version__)
  376. # TODO: set process name, perhaps by:
  377. # http://code.google.com/p/procname/
  378. # http://github.com/lericson/procname/
  379. # Create wakeup pipe for signal handlers
  380. wakeup_pipe = os.pipe()
  381. signal.set_wakeup_fd(wakeup_pipe[1])
  382. # Set signal handlers for catching child termination, as well
  383. # as our own demise.
  384. signal.signal(signal.SIGCHLD, reaper)
  385. signal.siginterrupt(signal.SIGCHLD, False)
  386. signal.signal(signal.SIGINT, fatal_signal)
  387. signal.signal(signal.SIGTERM, fatal_signal)
  388. # Go bob!
  389. boss_of_bind = BoB(int(options.msgq_port), options.verbose)
  390. startup_result = boss_of_bind.startup()
  391. if startup_result:
  392. sys.stderr.write("Error on startup: %s\n" % startup_result)
  393. sys.exit(1)
  394. # In our main loop, we check for dead processes or messages
  395. # on the c-channel.
  396. wakeup_fd = wakeup_pipe[0]
  397. ccs_fd = boss_of_bind.ccs.getSocket().fileno()
  398. while boss_of_bind.runnable:
  399. # XXX: get time for next restart for timeout
  400. # select() can raise EINTR when a signal arrives,
  401. # even if they are resumable, so we have to catch
  402. # the exception
  403. try:
  404. (rlist, wlist, xlist) = select.select([wakeup_fd, ccs_fd], [], [])
  405. except select.error as err:
  406. if err.args[0] == errno.EINTR:
  407. (rlist, wlist, xlist) = ([], [], [])
  408. else:
  409. sys.stderr.write("Error with select(); %s\n" % err)
  410. break
  411. for fd in rlist + xlist:
  412. if fd == ccs_fd:
  413. boss_of_bind.ccs.checkCommand()
  414. elif fd == wakeup_fd:
  415. os.read(wakeup_fd, 32)
  416. # clean up any processes that exited
  417. boss_of_bind.reap_children()
  418. boss_of_bind.restart_processes()
  419. # shutdown
  420. signal.signal(signal.SIGCHLD, signal.SIG_DFL)
  421. boss_of_bind.shutdown()
  422. if __name__ == "__main__":
  423. main()