|
@@ -81,7 +81,7 @@ class BoB:
|
|
|
process listens on. If verbose is True, then the boss reports
|
|
|
what it is doing.
|
|
|
"""
|
|
|
- self.verbose = True
|
|
|
+ self.verbose = verbose
|
|
|
self.c_channel_port = c_channel_port
|
|
|
self.cc_session = None
|
|
|
self.processes = {}
|
|
@@ -153,7 +153,7 @@ class BoB:
|
|
|
if self.verbose:
|
|
|
sys.stdout.write("Starting cmd-ctrld on port 8080\n")
|
|
|
try:
|
|
|
- cmd_ctrld = ProcessInfo("cmd-ctrld", 'cmd-ctrld')
|
|
|
+ cmd_ctrld = ProcessInfo("cmd-ctrld", ['cmd-ctrld'])
|
|
|
except Exception as e:
|
|
|
c_channel.process.kill()
|
|
|
bind_cfgd.process.kill()
|
|
@@ -247,32 +247,59 @@ class BoB:
|
|
|
def recv_and_process_cc_msg(self):
|
|
|
"""Receive and process the next message on the c-channel,
|
|
|
if any."""
|
|
|
- # XXX: this needs to be made more robust for handling
|
|
|
- # badly formatted messages
|
|
|
- msg, data = self.cc_session.group_recvmsg(False)
|
|
|
+ msg, envelope = self.cc_session.group_recvmsg(False)
|
|
|
+ print(msg)
|
|
|
if msg is None:
|
|
|
return
|
|
|
- msg_from = data.get('from', '')
|
|
|
-
|
|
|
- if (type(msg) is dict) and (type(data) is dict):
|
|
|
- if "command" in msg:
|
|
|
- cmd = msg['command']
|
|
|
- if cmd[0] == "shutdown":
|
|
|
- if self.verbose:
|
|
|
- sys.stdout.write("Shutdown command received\n")
|
|
|
- self.runnable = False
|
|
|
- else:
|
|
|
- if self.verbose:
|
|
|
- sys.stdout.write("Unknown command %s\n" % str(cmd))
|
|
|
- else:
|
|
|
- if self.verbose:
|
|
|
- del data['msg']
|
|
|
- sys.stdout.write("Unknown message received\n")
|
|
|
- sys.stdout.write(pprint.pformat(data) + "\n")
|
|
|
- sys.stdout.write(pprint.pformat(msg) + "\n")
|
|
|
- else:
|
|
|
+ if not ((type(msg) is dict) and (type(envelope) is dict)):
|
|
|
if self.verbose:
|
|
|
sys.stdout.write("Non-dictionary message\n")
|
|
|
+ return
|
|
|
+ if not "command" in msg:
|
|
|
+ if self.verbose:
|
|
|
+ if "msg" in envelope:
|
|
|
+ del envelope['msg']
|
|
|
+ sys.stdout.write("Unknown message received\n")
|
|
|
+ sys.stdout.write(pprint.pformat(envelope) + "\n")
|
|
|
+ sys.stdout.write(pprint.pformat(msg) + "\n")
|
|
|
+ return
|
|
|
+ cmd = msg['command']
|
|
|
+ if not (type(cmd) is list):
|
|
|
+ if self.verbose:
|
|
|
+ sys.stdout.write("Non-list command\n")
|
|
|
+ return
|
|
|
+ if len(cmd) < 2:
|
|
|
+ if self.verbose:
|
|
|
+ sys.stdout.write("Command too short\n")
|
|
|
+ return
|
|
|
+ if cmd[0] != "boss":
|
|
|
+ return
|
|
|
+
|
|
|
+ # done checking and extracting... time to execute the command
|
|
|
+ if cmd[1] == "shutdown":
|
|
|
+ if self.verbose:
|
|
|
+ sys.stdout.write("shutdown command received\n")
|
|
|
+ self.runnable = False
|
|
|
+ # XXX: reply here?
|
|
|
+ elif cmd[1] == "getProcessList":
|
|
|
+ if self.verbose:
|
|
|
+ sys.stdout.write("getProcessList command received\n")
|
|
|
+ live_processes = [ ]
|
|
|
+ for proc_info in processes:
|
|
|
+ live_processes.append({ "name": proc_info.name,
|
|
|
+ "args": proc_info.args,
|
|
|
+ "pid": proc_info.pid, })
|
|
|
+ dead_processes = [ ]
|
|
|
+ for proc_info in dead_processes:
|
|
|
+ dead_processes.append({ "name": proc_info.name,
|
|
|
+ "args": proc_info.args, })
|
|
|
+ cc.group_reply(envelope, { "response": cmd,
|
|
|
+ "sent": msg["sent"],
|
|
|
+ "live_processes": live_processes,
|
|
|
+ "dead_processes": dead_processes, })
|
|
|
+ else:
|
|
|
+ if self.verbose:
|
|
|
+ sys.stdout.write("Unknown command %s\n" % str(cmd))
|
|
|
|
|
|
def restart_processes(self):
|
|
|
"""Restart any dead processes."""
|
|
@@ -297,38 +324,41 @@ class BoB:
|
|
|
# remember any processes that refuse to be resurrected
|
|
|
self.dead_processes = still_dead
|
|
|
|
|
|
-if __name__ == "__main__":
|
|
|
- def reaper(signal_number, stack_frame):
|
|
|
- """A child process has died (SIGCHLD received)."""
|
|
|
- # don't do anything...
|
|
|
- # the Python signal handler has been set up to write
|
|
|
- # down a pipe, waking up our select() bit
|
|
|
- pass
|
|
|
+def reaper(signal_number, stack_frame):
|
|
|
+ """A child process has died (SIGCHLD received)."""
|
|
|
+ # don't do anything...
|
|
|
+ # the Python signal handler has been set up to write
|
|
|
+ # down a pipe, waking up our select() bit
|
|
|
+ pass
|
|
|
|
|
|
- def get_signame(signal_number):
|
|
|
- """Return the symbolic name for a signal."""
|
|
|
- for sig in dir(signal):
|
|
|
- if sig.startswith("SIG") and sig[3].isalnum():
|
|
|
- if getattr(signal, sig) == signal_number:
|
|
|
- return sig
|
|
|
- return "Unknown signal %d" % signal_number
|
|
|
-
|
|
|
- # XXX: perhaps register atexit() function and invoke that instead
|
|
|
- def fatal_signal(signal_number, stack_frame):
|
|
|
- """We need to exit (SIGINT or SIGTERM received)."""
|
|
|
- global options
|
|
|
- if options.verbose:
|
|
|
- sys.stdout.write("Received %s.\n" % get_signame(signal_number))
|
|
|
- signal.signal(signal.SIGCHLD, signal.SIG_DFL)
|
|
|
- boss_of_bind.runnable = False
|
|
|
-
|
|
|
- def check_port(option, opt_str, value, parser):
|
|
|
- """Function to insure that the port we are passed is actually
|
|
|
- a valid port number. Used by OptionParser() on startup."""
|
|
|
- if not re.match('^(6553[0-5]|655[0-2]\d|65[0-4]\d\d|6[0-4]\d{3}|[1-5]\d{4}|[1-9]\d{0,3}|0)$', value):
|
|
|
- raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
|
|
|
- parser.values.msgq_port = value
|
|
|
-
|
|
|
+def get_signame(signal_number):
|
|
|
+ """Return the symbolic name for a signal."""
|
|
|
+ for sig in dir(signal):
|
|
|
+ if sig.startswith("SIG") and sig[3].isalnum():
|
|
|
+ if getattr(signal, sig) == signal_number:
|
|
|
+ return sig
|
|
|
+ return "Unknown signal %d" % signal_number
|
|
|
+
|
|
|
+# XXX: perhaps register atexit() function and invoke that instead
|
|
|
+def fatal_signal(signal_number, stack_frame):
|
|
|
+ """We need to exit (SIGINT or SIGTERM received)."""
|
|
|
+ global options
|
|
|
+ global boss_of_bind
|
|
|
+ if options.verbose:
|
|
|
+ sys.stdout.write("Received %s.\n" % get_signame(signal_number))
|
|
|
+ signal.signal(signal.SIGCHLD, signal.SIG_DFL)
|
|
|
+ boss_of_bind.runnable = False
|
|
|
+
|
|
|
+def check_port(option, opt_str, value, parser):
|
|
|
+ """Function to insure that the port we are passed is actually
|
|
|
+ a valid port number. Used by OptionParser() on startup."""
|
|
|
+ if not re.match('^(6553[0-5]|655[0-2]\d|65[0-4]\d\d|6[0-4]\d{3}|[1-5]\d{4}|[1-9]\d{0,3}|0)$', value):
|
|
|
+ raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
|
|
|
+ parser.values.msgq_port = value
|
|
|
+
|
|
|
+def main():
|
|
|
+ global options
|
|
|
+ global boss_of_bind
|
|
|
# Parse any command-line options.
|
|
|
parser = OptionParser(version=__version__)
|
|
|
parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
|
|
@@ -366,27 +396,24 @@ if __name__ == "__main__":
|
|
|
|
|
|
# In our main loop, we check for dead processes or messages
|
|
|
# on the c-channel.
|
|
|
- event_poller = select.poll()
|
|
|
wakeup_fd = wakeup_pipe[0]
|
|
|
- event_poller.register(wakeup_fd, select.POLLIN)
|
|
|
cc_fd = boss_of_bind.cc_session._socket.fileno()
|
|
|
- event_poller.register(cc_fd, select.POLLIN)
|
|
|
while boss_of_bind.runnable:
|
|
|
- # XXX: get time for next restart for poll
|
|
|
+ # XXX: get time for next restart for timeout
|
|
|
|
|
|
- # poll() can raise EINTR when a signal arrives,
|
|
|
+ # select() can raise EINTR when a signal arrives,
|
|
|
# even if they are resumable, so we have to catch
|
|
|
# the exception
|
|
|
try:
|
|
|
- events = event_poller.poll()
|
|
|
+ (rlist, wlist, xlist) = select.select([wakeup_fd, cc_fd], [], [])
|
|
|
except select.error as err:
|
|
|
if err.args[0] == errno.EINTR:
|
|
|
- events = []
|
|
|
+ (rlist, wlist, xlist) = ([], [], [])
|
|
|
else:
|
|
|
- sys.stderr.write("Error with poll(); %s\n" % err)
|
|
|
+ sys.stderr.write("Error with select(); %s\n" % err)
|
|
|
break
|
|
|
|
|
|
- for (fd, event) in events:
|
|
|
+ for fd in rlist + xlist:
|
|
|
if fd == cc_fd:
|
|
|
boss_of_bind.recv_and_process_cc_msg()
|
|
|
elif fd == wakeup_fd:
|
|
@@ -402,9 +429,12 @@ if __name__ == "__main__":
|
|
|
raise
|
|
|
if pid == 0: break
|
|
|
boss_of_bind.reap(pid, exit_status)
|
|
|
-
|
|
|
+
|
|
|
boss_of_bind.restart_processes()
|
|
|
|
|
|
# shutdown
|
|
|
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
|
|
|
boss_of_bind.shutdown()
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|