#!@PYTHON@ # Copyright (C) 2010 Internet Systems Consortium. # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. import sys; sys.path.append ('@@PYTHONPATH@@') """This code implements the msgq daemon.""" import subprocess import signal import os import socket import sys import struct import errno import time import select import random import threading import isc.config.ccsession from optparse import OptionParser, OptionValueError import isc.util.process import isc.log from isc.log_messages.msgq_messages import * import isc.cc isc.util.process.rename() isc.log.init("b10-msgq", buffer=True) # Logger that is used in the actual msgq handling - startup, shutdown and the # poller thread. logger = isc.log.Logger("msgq") # A separate copy for the master/config thread when the poller thread runs. # We use a separate instance, since the logger itself doesn't have to be # thread safe. config_logger = isc.log.Logger("msgq") TRACE_START = logger.DBGLVL_START_SHUT TRACE_BASIC = logger.DBGLVL_TRACE_BASIC TRACE_DETAIL = logger.DBGLVL_TRACE_DETAIL # This is the version that gets displayed to the user. # The VERSION string consists of the module name, the module version # number, and the overall BIND 10 version number (set in configure.ac). VERSION = "b10-msgq 20110127 (BIND 10 @PACKAGE_VERSION@)" # If B10_FROM_BUILD is set in the environment, we use data files # from a directory relative to that, otherwise we use the ones # installed on the system if "B10_FROM_BUILD" in os.environ: SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/msgq" else: PREFIX = "@prefix@" DATAROOTDIR = "@datarootdir@" SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX) SPECFILE_LOCATION = SPECFILE_PATH + "/msgq.spec" class MsgQReceiveError(Exception): pass class MsgQCloseOnReceive(Exception): """Exception raised when reading data from a socket results in 'shutdown'. This happens when msgq received 0-length data. This class holds whether it happens in the middle of reading (i.e. after reading some) via partial_read parameter, which is set to True if and only if so. This will be used by an upper layer catching the exception to distinguish the severity of the event. """ def __init__(self, reason, partial_read): self.partial_read = partial_read self.__reason = reason def __str__(self): return self.__reason class SubscriptionManager: def __init__(self, cfgmgr_ready): """ Initialize the subscription manager. parameters: * cfgmgr_ready: A callable object run once the config manager subscribes. This is a hackish solution, but we can't read the configuration sooner. """ self.subscriptions = {} self.__cfgmgr_ready = cfgmgr_ready self.__cfgmgr_ready_called = False def subscribe(self, group, instance, socket): """Add a subscription.""" target = ( group, instance ) if target in self.subscriptions: logger.debug(TRACE_BASIC, MSGQ_SUBS_APPEND_TARGET, group, instance) if socket not in self.subscriptions[target]: self.subscriptions[target].append(socket) else: logger.debug(TRACE_BASIC, MSGQ_SUBS_NEW_TARGET, group, instance) self.subscriptions[target] = [ socket ] if group == "ConfigManager" and not self.__cfgmgr_ready_called: logger.debug(TRACE_BASIC, MSGQ_CFGMGR_SUBSCRIBED) self.__cfgmgr_ready_called = True self.__cfgmgr_ready() def unsubscribe(self, group, instance, socket): """Remove the socket from the one specific subscription.""" target = ( group, instance ) if target in self.subscriptions: if socket in self.subscriptions[target]: self.subscriptions[target].remove(socket) def unsubscribe_all(self, socket): """Remove the socket from all subscriptions.""" for socklist in self.subscriptions.values(): if socket in socklist: socklist.remove(socket) def find_sub(self, group, instance): """Return an array of sockets which want this specific group, instance.""" target = (group, instance) if target in self.subscriptions: return self.subscriptions[target] else: return [] def find(self, group, instance): """Return an array of sockets who should get something sent to this group, instance pair. This includes wildcard subscriptions.""" target = (group, instance) partone = self.find_sub(group, instance) parttwo = self.find_sub(group, "*") return list(set(partone + parttwo)) class MsgQ: """Message Queue class.""" # did we find a better way to do this? SOCKET_FILE = os.path.join("@localstatedir@", "@PACKAGE_NAME@", "msgq_socket").replace("${prefix}", "@prefix@") def __init__(self, socket_file=None, verbose=False): """Initialize the MsgQ master. The socket_file specifies the path to the UNIX domain socket that the msgq process listens on. If it is None, the environment variable BIND10_MSGQ_SOCKET_FILE is used. If that is not set, it will default to @localstatedir@/@PACKAGE_NAME@/msg_socket. If verbose is True, then the MsgQ reports what it is doing. """ if socket_file is None: if "BIND10_MSGQ_SOCKET_FILE" in os.environ: self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"] else: self.socket_file = self.SOCKET_FILE else: self.socket_file = socket_file self.verbose = verbose self.poller = None self.kqueue = None self.runnable = False self.listen_socket = False self.sockets = {} self.connection_counter = random.random() self.hostname = socket.gethostname() self.subs = SubscriptionManager(self.cfgmgr_ready) self.lnames = {} self.sendbuffs = {} self.running = False self.__cfgmgr_ready = None self.__cfgmgr_ready_cond = threading.Condition() # A lock used when the message queue does anything more complicated. # It is mostly a safety measure, the threads doing so should be mostly # independent, and the one with config session should be read only, # but with threads, one never knows. We use threads for concurrency, # not for performance, so we use wide lock scopes to be on the safe # side. self.__lock = threading.Lock() def cfgmgr_ready(self, ready=True): """Notify that the config manager is either subscribed, or that the msgq is shutting down and it won't connect, but anybody waiting for it should stop anyway. The ready parameter signifies if the config manager is subscribed. This method can be called multiple times, but second and any following call is simply ignored. This means the "abort" version of the call can be used on any stop unconditionally, even when the config manager already connected. """ with self.__cfgmgr_ready_cond: if self.__cfgmgr_ready is not None: # This is a second call to this method. In that case it does # nothing. return self.__cfgmgr_ready = ready self.__cfgmgr_ready_cond.notify_all() def wait_cfgmgr(self): """Wait for msgq to subscribe. When this returns, the config manager is either subscribed, or msgq gave up waiting for it. Success is signified by the return value. """ with self.__cfgmgr_ready_cond: # Wait until it either aborts or subscribes while self.__cfgmgr_ready is None: self.__cfgmgr_ready_cond.wait() return self.__cfgmgr_ready def setup_poller(self): """Set up the poll thing. Internal function.""" try: self.kqueue = select.kqueue() except AttributeError: self.poller = select.poll() def add_kqueue_socket(self, socket, write_filter=False): """Add a kqueue filter for a socket. By default the read filter is used; if write_filter is set to True, the write filter is used. We use a boolean value instead of a specific filter constant, because kqueue filter values do not seem to be defined on some systems. The use of boolean makes the interface restrictive because there are other filters, but this method is mostly only for our internal use, so it should be acceptable at least for now.""" filter_type = select.KQ_FILTER_WRITE if write_filter else \ select.KQ_FILTER_READ event = select.kevent(socket.fileno(), filter_type, select.KQ_EV_ADD | select.KQ_EV_ENABLE) self.kqueue.control([event], 0) def delete_kqueue_socket(self, socket, write_filter=False): """Delete a kqueue filter for socket. See add_kqueue_socket() for the semantics and notes about write_filter.""" filter_type = select.KQ_FILTER_WRITE if write_filter else \ select.KQ_FILTER_READ event = select.kevent(socket.fileno(), filter_type, select.KQ_EV_DELETE) self.kqueue.control([event], 0) def setup_listener(self): """Set up the listener socket. Internal function.""" logger.debug(TRACE_BASIC, MSGQ_LISTENER_SETUP, self.socket_file) self.listen_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) if os.path.exists(self.socket_file): os.remove(self.socket_file) try: self.listen_socket.bind(self.socket_file) self.listen_socket.listen(1024) except Exception as e: # remove the file again if something goes wrong # (note this is a catch-all, but we reraise it) if os.path.exists(self.socket_file): os.remove(self.socket_file) self.listen_socket.close() logger.fatal(MSGQ_LISTENER_FAILED, self.socket_file, e) raise e if self.poller: self.poller.register(self.listen_socket, select.POLLIN) else: self.add_kqueue_socket(self.listen_socket) def setup_signalsock(self): """Create a socket pair used to signal when we want to finish. Using a socket is easy and thread/signal safe way to signal the termination. """ # The __poller_sock will be the end in the poller. When it is # closed, we should shut down. (self.__poller_sock, self.__control_sock) = socket.socketpair() if self.poller: self.poller.register(self.__poller_sock, select.POLLIN) else: self.add_kqueue_socket(self.__poller_sock) def setup(self): """Configure listener socket, polling, etc. Raises a socket.error if the socket_file cannot be created. """ self.setup_poller() self.setup_signalsock() self.setup_listener() logger.debug(TRACE_START, MSGQ_LISTENER_STARTED); self.runnable = True def process_accept(self): """Process an accept on the listening socket.""" newsocket, ipaddr = self.listen_socket.accept() # TODO: When we have logging, we might want # to add a debug message here that a new connection # was made self.register_socket(newsocket) def register_socket(self, newsocket): """ Internal function to insert a socket. Used by process_accept and some tests. """ self.sockets[newsocket.fileno()] = newsocket lname = self.newlname() self.lnames[lname] = newsocket logger.debug(TRACE_BASIC, MSGQ_SOCKET_REGISTERED, newsocket.fileno(), lname) if self.poller: self.poller.register(newsocket, select.POLLIN) else: self.add_kqueue_socket(newsocket) def kill_socket(self, fd, sock): """Fully close down the socket.""" # Unregister events on the socket. Note that we don't have to do # this for kqueue because the registered events are automatically # deleted when the corresponding socket is closed. if self.poller: self.poller.unregister(sock) self.subs.unsubscribe_all(sock) lname = [ k for k, v in self.lnames.items() if v == sock ][0] del self.lnames[lname] sock.close() del self.sockets[fd] if fd in self.sendbuffs: del self.sendbuffs[fd] logger.debug(TRACE_BASIC, MSGQ_SOCK_CLOSE, fd) def __getbytes(self, fd, sock, length, continued): """Get exactly the requested bytes, or raise an exception if EOF. continued is set to True if this method is called to complete already read data. """ received = b'' while len(received) < length: try: data = sock.recv(length - len(received)) except socket.error as err: # This case includes ECONNRESET, which seems to happen when # the remote client has closed its socket at some subtle # timing (it should normally result in receiving empty data). # Since we didn't figure out how exactly that could happen, # we treat it just like other really-unexpected socket errors. raise MsgQReceiveError(str(err)) if len(data) == 0: raise MsgQCloseOnReceive("EOF", continued) received += data continued = True return received def read_packet(self, fd, sock): """Read a correctly formatted packet. Will raise exceptions if something fails.""" lengths = self.__getbytes(fd, sock, 6, False) overall_length, routing_length = struct.unpack(">IH", lengths) if overall_length < 2: raise MsgQReceiveError("overall_length < 2") overall_length -= 2 if routing_length > overall_length: raise MsgQReceiveError("routing_length > overall_length") if routing_length == 0: raise MsgQReceiveError("routing_length == 0") data_length = overall_length - routing_length # probably need to sanity check lengths here... routing = self.__getbytes(fd, sock, routing_length, True) if data_length > 0: data = self.__getbytes(fd, sock, data_length, True) else: data = None return (routing, data) def process_packet(self, fd, sock): """Process one packet.""" try: routing, data = self.read_packet(fd, sock) except (MsgQReceiveError, MsgQCloseOnReceive) as err: # If it's MsgQCloseOnReceive and that happens without reading # any data, it basically means the remote clinet has closed the # socket, so we log it as debug information. Otherwise, it's # a somewhat unexpected event, so we consider it an "error". if isinstance(err, MsgQCloseOnReceive) and not err.partial_read: logger.debug(TRACE_BASIC, MSGQ_CLOSE_ON_RECV, fd) else: logger.error(MSGQ_RECV_ERROR, fd, err) self.kill_socket(fd, sock) return try: routingmsg = isc.cc.message.from_wire(routing) except DecodeError as err: self.kill_socket(fd, sock) logger.error(MSGQ_HDR_DECODE_ERROR, fd, err) return self.process_command(fd, sock, routingmsg, data) def process_command(self, fd, sock, routing, data): """Process a single command. This will split out into one of the other functions.""" logger.debug(TRACE_DETAIL, MSGQ_RECV_HDR, routing) cmd = routing["type"] if cmd == 'send': self.process_command_send(sock, routing, data) elif cmd == 'subscribe': self.process_command_subscribe(sock, routing, data) elif cmd == 'unsubscribe': self.process_command_unsubscribe(sock, routing, data) elif cmd == 'getlname': self.process_command_getlname(sock, routing, data) elif cmd == 'ping': # Command for testing purposes self.process_command_ping(sock, routing, data) elif cmd == 'stop': self.stop() else: logger.error(MSGQ_INVALID_CMD, cmd) def preparemsg(self, env, msg = None): if type(env) == dict: env = isc.cc.message.to_wire(env) if type(msg) == dict: msg = isc.cc.message.to_wire(msg) length = 2 + len(env); if msg: length += len(msg) ret = struct.pack("!IH", length, len(env)) ret += env if msg: ret += msg return ret def sendmsg(self, sock, env, msg = None): self.send_prepared_msg(sock, self.preparemsg(env, msg)) def _send_data(self, sock, data): """ Send a piece of data to the given socket. This method is essentially "private" to MsgQ, but defined as if it were "protected" for easier access from tests. Parameters: sock: The socket to send to data: The list of bytes to send Returns: An integer or None. If an integer (which can be 0), it signals the number of bytes sent. If None, the socket appears to have been closed on the other end, and it has been killed on this side too. """ try: # We set the socket nonblocking, MSG_DONTWAIT doesn't exist # on some OSes sock.setblocking(0) return sock.send(data) except socket.error as e: if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR ]: return 0 elif e.errno in [ errno.EPIPE, errno.ECONNRESET, errno.ENOBUFS ]: # EPIPE happens if the remote module has terminated by the time # of this send; its severity can vary, but in many cases it # shouldn't be critical, so we log it separately as a warning. if e.errno == errno.EPIPE: logger.warn(MSGQ_CLOSE_ON_SEND, sock.fileno()) else: logger.error(MSGQ_SEND_ERROR, sock.fileno(), errno.errorcode[e.errno]) self.kill_socket(sock.fileno(), sock) return None else: raise e finally: # And set it back again sock.setblocking(1) def send_prepared_msg(self, sock, msg): # Try to send the data, but only if there's nothing waiting fileno = sock.fileno() if fileno in self.sendbuffs: amount_sent = 0 else: amount_sent = self._send_data(sock, msg) if amount_sent is None: # Socket has been killed, drop the send return # Still something to send, add it to outgoing queue if amount_sent < len(msg): now = time.clock() # Append it to buffer (but check the data go away) if fileno in self.sendbuffs: (last_sent, buff) = self.sendbuffs[fileno] if now - last_sent > 0.1: self.kill_socket(fileno, sock) return buff += msg else: buff = msg[amount_sent:] last_sent = now if self.poller: self.poller.register(fileno, select.POLLIN | select.POLLOUT) else: self.add_kqueue_socket(sock, True) self.sendbuffs[fileno] = (last_sent, buff) def __process_write(self, fileno): # Try to send some data from the buffer (_, msg) = self.sendbuffs[fileno] sock = self.sockets[fileno] amount_sent = self._send_data(sock, msg) if amount_sent is not None: # Keep the rest msg = msg[amount_sent:] if len(msg) == 0: # If there's no more, stop requesting for write availability if self.poller: self.poller.register(fileno, select.POLLIN) else: self.delete_kqueue_socket(sock, True) del self.sendbuffs[fileno] else: self.sendbuffs[fileno] = (time.clock(), msg) def newlname(self): """Generate a unique connection identifier for this socket. This is done by using an increasing counter and the current time.""" self.connection_counter += 1 return "%x_%x@%s" % (time.time(), self.connection_counter, self.hostname) def process_command_ping(self, sock, routing, data): self.sendmsg(sock, { "type" : "pong" }, data) def process_command_getlname(self, sock, routing, data): lname = [ k for k, v in self.lnames.items() if v == sock ][0] self.sendmsg(sock, { "type" : "getlname" }, { "lname" : lname }) def process_command_send(self, sock, routing, data): group = routing["group"] instance = routing["instance"] to = routing["to"] if group == None or instance == None: return # ignore invalid packets entirely if to == "*": sockets = self.subs.find(group, instance) else: if to in self.lnames: sockets = [ self.lnames[to] ] else: return # recipient doesn't exist msg = self.preparemsg(routing, data) if sock in sockets: sockets.remove(sock) for socket in sockets: self.send_prepared_msg(socket, msg) def process_command_subscribe(self, sock, routing, data): group = routing["group"] instance = routing["instance"] if group == None or instance == None: return # ignore invalid packets entirely self.subs.subscribe(group, instance, sock) def process_command_unsubscribe(self, sock, routing, data): group = routing["group"] instance = routing["instance"] if group == None or instance == None: return # ignore invalid packets entirely self.subs.unsubscribe(group, instance, sock) def run(self): """Process messages. Forever. Mostly.""" self.running = True if self.poller: self.run_poller() else: self.run_kqueue() def run_poller(self): while self.running: try: # Poll with a timeout so that every once in a while, # the loop checks for self.running. events = self.poller.poll() except select.error as err: if err.args[0] == errno.EINTR: events = [] else: logger.fatal(MSGQ_POLL_ERROR, err) break with self.__lock: for (fd, event) in events: if fd == self.listen_socket.fileno(): self.process_accept() elif fd == self.__poller_sock.fileno(): # If it's the signal socket, we should terminate now. self.running = False break else: writable = event & select.POLLOUT # Note: it may be okay to read data if available # immediately after write some, but due to unexpected # regression (see comments on the kqueue version below) # we restrict one operation per iteration for now. # In future we may clarify the point and enable the # "read/write" mode. readable = not writable and (event & select.POLLIN) if not writable and not readable: logger.error(MSGQ_POLL_UNKNOWN_EVENT, fd, event) self._process_fd(fd, writable, readable, False) def run_kqueue(self): while self.running: # Check with a timeout so that every once in a while, # the loop checks for self.running. events = self.kqueue.control(None, 10) if not events: raise RuntimeError('serve: kqueue returned no events') with self.__lock: for event in events: if event.ident == self.listen_socket.fileno(): self.process_accept() elif event.ident == self.__poller_sock.fileno(): # If it's the signal socket, we should terminate now. self.running = False break; else: fd = event.ident writable = event.filter == select.KQ_FILTER_WRITE readable = (event.filter == select.KQ_FILTER_READ and event.data > 0) # It seems to break some of our test cases if we # immediately close the socket on EOF after reading # some data. It may be possible to avoid by tweaking # the test, but unless we can be sure we'll hold off. closed = (not readable and (event.flags & select.KQ_EV_EOF)) self._process_fd(fd, writable, readable, closed) def _process_fd(self, fd, writable, readable, closed): '''Process a single FD: unified subroutine of run_kqueue/poller. closed can be True only in the case of kqueue. This is essentially private but is defined as if it were "protected" so it's callable from tests. ''' # We need to check if FD is still in the sockets dict, because # it's possible that the socket has been "killed" while processing # other FDs; it's even possible it's killed within this method. if writable and fd in self.sockets: self.__process_write(fd) if readable and fd in self.sockets: self.process_packet(fd, self.sockets[fd]) if closed and fd in self.sockets: self.kill_socket(fd, self.sockets[fd]) def stop(self): # Signal it should terminate. self.__control_sock.close() self.__control_sock = None # Abort anything waiting on the condition, just to make sure it's not # blocked forever self.cfgmgr_ready(False) def cleanup_signalsock(self): """Close the signal sockets. We could do it directly in shutdown, but this part is reused in tests. """ if self.__poller_sock: self.__poller_sock.close() self.__poller_sock = None if self.__control_sock: self.__control_sock.close() self.__control_sock = None def shutdown(self): """Stop the MsgQ master.""" logger.debug(TRACE_START, MSGQ_SHUTDOWN) self.listen_socket.close() self.cleanup_signalsock() # Close all the sockets too. In real life, there should be none now, # as Msgq should be the last one. But some tests don't adhere to this # and create a new Msgq for each test, which led to huge socket leaks. # Some other threads put some other things in instead of sockets, so # we catch whatever exceptions there we can. This should be safe, # because in real operation, we will terminate now anyway, implicitly # closing anything anyway. for sock in self.sockets.values(): try: sock.close() except Exception: pass if os.path.exists(self.socket_file): os.remove(self.socket_file) def config_handler(self, new_config): """The configuration handler (run in a separate thread). Not tested, currently effectively empty. """ config_logger.debug(TRACE_DETAIL, MSGQ_CONFIG_DATA, new_config) with self.__lock: if not self.running: return # TODO: Any config handlig goes here. return isc.config.create_answer(0) def command_handler(self, command, args): """The command handler (run in a separate thread). Not tested, currently effectively empty. """ config_logger.debug(TRACE_DETAIL, MSGQ_COMMAND, command, args) with self.__lock: if not self.running: return # TODO: Any commands go here config_logger.error(MSGQ_COMMAND_UNKNOWN, command) return isc.config.create_answer(1, 'unknown command: ' + command) def signal_handler(msgq, signal, frame): if msgq: msgq.stop() if __name__ == "__main__": def check_port(option, opt_str, value, parser): """Function to insure that the port we are passed is actually a valid port number. Used by OptionParser() on startup.""" intval = int(value) if (intval < 0) or (intval > 65535): raise OptionValueError("%s requires a port number (0-65535)" % opt_str) parser.values.msgq_port = intval # Parse any command-line options. parser = OptionParser(version=VERSION) # TODO: Should we remove the option? parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="display more about what is going on") parser.add_option("-s", "--socket-file", dest="msgq_socket_file", type="string", default=None, help="UNIX domain socket file the msgq daemon will use") (options, args) = parser.parse_args() # Announce startup. logger.debug(TRACE_START, MSGQ_START, VERSION) msgq = MsgQ(options.msgq_socket_file, options.verbose) signal.signal(signal.SIGTERM, lambda signal, frame: signal_handler(msgq, signal, frame)) try: msgq.setup() except Exception as e: logger.fatal(MSGQ_START_FAIL, e) sys.exit(1) # We run the processing in a separate thread. This is because we want to # connect to the msgq ourself. But the cc library is unfortunately blocking # in many places and waiting for the processing part to answer, it would # deadlock. poller_thread = threading.Thread(target=msgq.run) poller_thread.daemon = True try: poller_thread.start() if msgq.wait_cfgmgr(): # Once we get the config manager, we can read our own config. session = isc.config.ModuleCCSession(SPECFILE_LOCATION, msgq.config_handler, msgq.command_handler, None, True, msgq.socket_file) session.start() # And we create a thread that'll just wait for commands and # handle them. We don't terminate the thread, we set it to # daemon. Once the main thread terminates, it'll just die. def run_session(): while True: session.check_command(False) background_thread = threading.Thread(target=run_session) background_thread.daemon = True background_thread.start() poller_thread.join() except KeyboardInterrupt: pass msgq.shutdown() logger.info(MSGQ_EXITING)