123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592 |
- #!@PYTHON@
- # Copyright (C) 2010 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- import sys; sys.path.append ('@@PYTHONPATH@@')
- """This code implements the msgq daemon."""
- import subprocess
- import signal
- import os
- import socket
- import sys
- import struct
- import errno
- import time
- import select
- import random
- from optparse import OptionParser, OptionValueError
- import isc.util.process
- import isc.log
- from isc.log_messages.msgq_messages import *
- import isc.cc
- isc.util.process.rename()
- logger = isc.log.Logger("msgq")
- TRACE_START = logger.DBGLVL_START_SHUT
- TRACE_BASIC = logger.DBGLVL_TRACE_BASIC
- TRACE_DETAIL = logger.DBGLVL_TRACE_DETAIL
- # This is the version that gets displayed to the user.
- # The VERSION string consists of the module name, the module version
- # number, and the overall BIND 10 version number (set in configure.ac).
- VERSION = "b10-msgq 20110127 (BIND 10 @PACKAGE_VERSION@)"
- class MsgQReceiveError(Exception): pass
- class SubscriptionManager:
- def __init__(self):
- self.subscriptions = {}
- def subscribe(self, group, instance, socket):
- """Add a subscription."""
- target = ( group, instance )
- if target in self.subscriptions:
- logger.debug(TRACE_BASIC, MSGQ_SUBS_APPEND_TARGET, group, instance)
- if socket not in self.subscriptions[target]:
- self.subscriptions[target].append(socket)
- else:
- logger.debug(TRACE_BASIC, MSGQ_SUBS_NEW_TARGET, group, instance)
- self.subscriptions[target] = [ socket ]
- def unsubscribe(self, group, instance, socket):
- """Remove the socket from the one specific subscription."""
- target = ( group, instance )
- if target in self.subscriptions:
- if socket in self.subscriptions[target]:
- self.subscriptions[target].remove(socket)
- def unsubscribe_all(self, socket):
- """Remove the socket from all subscriptions."""
- for socklist in self.subscriptions.values():
- if socket in socklist:
- socklist.remove(socket)
- def find_sub(self, group, instance):
- """Return an array of sockets which want this specific group,
- instance."""
- target = (group, instance)
- if target in self.subscriptions:
- return self.subscriptions[target]
- else:
- return []
- def find(self, group, instance):
- """Return an array of sockets who should get something sent to
- this group, instance pair. This includes wildcard subscriptions."""
- target = (group, instance)
- partone = self.find_sub(group, instance)
- parttwo = self.find_sub(group, "*")
- return list(set(partone + parttwo))
- class MsgQ:
- """Message Queue class."""
- # did we find a better way to do this?
- SOCKET_FILE = os.path.join("@localstatedir@",
- "@PACKAGE_NAME@",
- "msgq_socket").replace("${prefix}",
- "@prefix@")
- def __init__(self, socket_file=None, verbose=False):
- """Initialize the MsgQ master.
- The socket_file specifies the path to the UNIX domain socket
- that the msgq process listens on. If it is None, the
- environment variable BIND10_MSGQ_SOCKET_FILE is used. If that
- is not set, it will default to
- @localstatedir@/@PACKAGE_NAME@/msg_socket.
- If verbose is True, then the MsgQ reports
- what it is doing.
- """
- if socket_file is None:
- if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
- self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"]
- else:
- self.socket_file = self.SOCKET_FILE
- else:
- self.socket_file = socket_file
- self.verbose = verbose
- self.poller = None
- self.kqueue = None
- self.runnable = False
- self.listen_socket = False
- self.sockets = {}
- self.connection_counter = random.random()
- self.hostname = socket.gethostname()
- self.subs = SubscriptionManager()
- self.lnames = {}
- self.sendbuffs = {}
- self.running = False
- def setup_poller(self):
- """Set up the poll thing. Internal function."""
- try:
- self.kqueue = select.kqueue()
- except AttributeError:
- self.poller = select.poll()
- def add_kqueue_socket(self, socket, write_filter=False):
- """Add a kquque filter for a socket. By default the read
- filter is used; if write_filter is set to True, the write
- filter is used. We use a boolean value instead of a specific
- filter constant, because kqueue filter values do not seem to
- be defined on some systems. The use of boolean makes the
- interface restrictive because there are other filters, but this
- method is mostly only for our internal use, so it should be
- acceptable at least for now."""
- filter_type = select.KQ_FILTER_WRITE if write_filter else \
- select.KQ_FILTER_READ
- event = select.kevent(socket.fileno(), filter_type,
- select.KQ_EV_ADD | select.KQ_EV_ENABLE)
- self.kqueue.control([event], 0)
- def delete_kqueue_socket(self, socket, write_filter=False):
- """Delete a kqueue filter for socket. See add_kqueue_socket()
- for the semantics and notes about write_filter."""
- filter_type = select.KQ_FILTER_WRITE if write_filter else \
- select.KQ_FILTER_READ
- event = select.kevent(socket.fileno(), filter_type,
- select.KQ_EV_DELETE)
- self.kqueue.control([event], 0)
- def setup_listener(self):
- """Set up the listener socket. Internal function."""
- logger.debug(TRACE_BASIC, MSGQ_LISTENER_SETUP, self.socket_file)
- self.listen_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- if os.path.exists(self.socket_file):
- os.remove(self.socket_file)
- try:
- self.listen_socket.bind(self.socket_file)
- self.listen_socket.listen(1024)
- except Exception as e:
- # remove the file again if something goes wrong
- # (note this is a catch-all, but we reraise it)
- if os.path.exists(self.socket_file):
- os.remove(self.socket_file)
- self.listen_socket.close()
- logger.fatal(MSGQ_LISTENER_FAILED, self.socket_file, e)
- raise e
- if self.poller:
- self.poller.register(self.listen_socket, select.POLLIN)
- else:
- self.add_kqueue_socket(self.listen_socket)
- def setup(self):
- """Configure listener socket, polling, etc.
- Raises a socket.error if the socket_file cannot be
- created.
- """
- self.setup_poller()
- self.setup_listener()
- logger.debug(TRACE_START, MSGQ_LISTENER_STARTED);
- self.runnable = True
- def process_accept(self):
- """Process an accept on the listening socket."""
- newsocket, ipaddr = self.listen_socket.accept()
- # TODO: When we have logging, we might want
- # to add a debug message here that a new connection
- # was made
- self.register_socket(newsocket)
- def register_socket(self, newsocket):
- """
- Internal function to insert a socket. Used by process_accept and some tests.
- """
- self.sockets[newsocket.fileno()] = newsocket
- lname = self.newlname()
- self.lnames[lname] = newsocket
- if self.poller:
- self.poller.register(newsocket, select.POLLIN)
- else:
- self.add_kqueue_socket(newsocket)
- def process_socket(self, fd):
- """Process a read on a socket."""
- if not fd in self.sockets:
- logger.error(MSGQ_READ_UNKNOWN_FD, fd)
- return
- sock = self.sockets[fd]
- # sys.stderr.write("[b10-msgq] Got read on fd %d\n" %fd)
- self.process_packet(fd, sock)
- def kill_socket(self, fd, sock):
- """Fully close down the socket."""
- if self.poller:
- self.poller.unregister(sock)
- self.subs.unsubscribe_all(sock)
- lname = [ k for k, v in self.lnames.items() if v == sock ][0]
- del self.lnames[lname]
- sock.close()
- del self.sockets[fd]
- if fd in self.sendbuffs:
- del self.sendbuffs[fd]
- logger.debug(TRACE_BASIC, MSGQ_SOCK_CLOSE, fd)
- def getbytes(self, fd, sock, length):
- """Get exactly the requested bytes, or raise an exception if
- EOF."""
- received = b''
- while len(received) < length:
- try:
- data = sock.recv(length - len(received))
- except socket.error:
- raise MsgQReceiveError(socket.error)
- if len(data) == 0:
- raise MsgQReceiveError("EOF")
- received += data
- return received
- def read_packet(self, fd, sock):
- """Read a correctly formatted packet. Will raise exceptions if
- something fails."""
- lengths = self.getbytes(fd, sock, 6)
- overall_length, routing_length = struct.unpack(">IH", lengths)
- if overall_length < 2:
- raise MsgQReceiveError("overall_length < 2")
- overall_length -= 2
- if routing_length > overall_length:
- raise MsgQReceiveError("routing_length > overall_length")
- if routing_length == 0:
- raise MsgQReceiveError("routing_length == 0")
- data_length = overall_length - routing_length
- # probably need to sanity check lengths here...
- routing = self.getbytes(fd, sock, routing_length)
- if data_length > 0:
- data = self.getbytes(fd, sock, data_length)
- else:
- data = None
- return (routing, data)
- def process_packet(self, fd, sock):
- """Process one packet."""
- try:
- routing, data = self.read_packet(fd, sock)
- except MsgQReceiveError as err:
- logger.error(MSGQ_RECV_ERR, fd, err)
- self.kill_socket(fd, sock)
- return
- try:
- routingmsg = isc.cc.message.from_wire(routing)
- except DecodeError as err:
- self.kill_socket(fd, sock)
- logger.error(MSGQ_HDR_DECODE_ERR, fd, err)
- return
- self.process_command(fd, sock, routingmsg, data)
- def process_command(self, fd, sock, routing, data):
- """Process a single command. This will split out into one of the
- other functions."""
- logger.debug(TRACE_DETAIL, MSGQ_RECV_HDR, routing)
- cmd = routing["type"]
- if cmd == 'send':
- self.process_command_send(sock, routing, data)
- elif cmd == 'subscribe':
- self.process_command_subscribe(sock, routing, data)
- elif cmd == 'unsubscribe':
- self.process_command_unsubscribe(sock, routing, data)
- elif cmd == 'getlname':
- self.process_command_getlname(sock, routing, data)
- elif cmd == 'ping':
- # Command for testing purposes
- self.process_command_ping(sock, routing, data)
- elif cmd == 'stop':
- self.stop()
- else:
- logger.error(MSGQ_INVALID_CMD, cmd)
- def preparemsg(self, env, msg = None):
- if type(env) == dict:
- env = isc.cc.message.to_wire(env)
- if type(msg) == dict:
- msg = isc.cc.message.to_wire(msg)
- length = 2 + len(env);
- if msg:
- length += len(msg)
- ret = struct.pack("!IH", length, len(env))
- ret += env
- if msg:
- ret += msg
- return ret
- def sendmsg(self, sock, env, msg = None):
- self.send_prepared_msg(sock, self.preparemsg(env, msg))
- def __send_data(self, sock, data):
- """
- Send a piece of data to the given socket.
- Parameters:
- sock: The socket to send to
- data: The list of bytes to send
- Returns:
- An integer or None. If an integer (which can be 0), it signals
- the number of bytes sent. If None, the socket appears to have
- been closed on the other end, and it has been killed on this
- side too.
- """
- try:
- # We set the socket nonblocking, MSG_DONTWAIT doesn't exist
- # on some OSes
- sock.setblocking(0)
- return sock.send(data)
- except socket.error as e:
- if e.errno in [ errno.EAGAIN,
- errno.EWOULDBLOCK,
- errno.EINTR ]:
- return 0
- elif e.errno in [ errno.EPIPE,
- errno.ECONNRESET,
- errno.ENOBUFS ]:
- logger.error(MSGQ_SEND_ERR, sock.fileno(),
- errno.errorcode[e.errno])
- self.kill_socket(sock.fileno(), sock)
- return None
- else:
- raise e
- finally:
- # And set it back again
- sock.setblocking(1)
- def send_prepared_msg(self, sock, msg):
- # Try to send the data, but only if there's nothing waiting
- fileno = sock.fileno()
- if fileno in self.sendbuffs:
- amount_sent = 0
- else:
- amount_sent = self.__send_data(sock, msg)
- if amount_sent is None:
- # Socket has been killed, drop the send
- return
- # Still something to send, add it to outgoing queue
- if amount_sent < len(msg):
- now = time.clock()
- # Append it to buffer (but check the data go away)
- if fileno in self.sendbuffs:
- (last_sent, buff) = self.sendbuffs[fileno]
- if now - last_sent > 0.1:
- self.kill_socket(fileno, sock)
- return
- buff += msg
- else:
- buff = msg[amount_sent:]
- last_sent = now
- if self.poller:
- self.poller.register(fileno, select.POLLIN |
- select.POLLOUT)
- else:
- self.add_kqueue_socket(sock, True)
- self.sendbuffs[fileno] = (last_sent, buff)
- def __process_write(self, fileno):
- # Try to send some data from the buffer
- (_, msg) = self.sendbuffs[fileno]
- sock = self.sockets[fileno]
- amount_sent = self.__send_data(sock, msg)
- if amount_sent is not None:
- # Keep the rest
- msg = msg[amount_sent:]
- if len(msg) == 0:
- # If there's no more, stop requesting for write availability
- if self.poller:
- self.poller.register(fileno, select.POLLIN)
- else:
- self.delete_kqueue_socket(sock, True)
- del self.sendbuffs[fileno]
- else:
- self.sendbuffs[fileno] = (time.clock(), msg)
- def newlname(self):
- """Generate a unique connection identifier for this socket.
- This is done by using an increasing counter and the current
- time."""
- self.connection_counter += 1
- return "%x_%x@%s" % (time.time(), self.connection_counter, self.hostname)
- def process_command_ping(self, sock, routing, data):
- self.sendmsg(sock, { "type" : "pong" }, data)
- def process_command_getlname(self, sock, routing, data):
- lname = [ k for k, v in self.lnames.items() if v == sock ][0]
- self.sendmsg(sock, { "type" : "getlname" }, { "lname" : lname })
- def process_command_send(self, sock, routing, data):
- group = routing["group"]
- instance = routing["instance"]
- to = routing["to"]
- if group == None or instance == None:
- return # ignore invalid packets entirely
- if to == "*":
- sockets = self.subs.find(group, instance)
- else:
- if to in self.lnames:
- sockets = [ self.lnames[to] ]
- else:
- return # recipient doesn't exist
- msg = self.preparemsg(routing, data)
- if sock in sockets:
- sockets.remove(sock)
- for socket in sockets:
- self.send_prepared_msg(socket, msg)
- def process_command_subscribe(self, sock, routing, data):
- group = routing["group"]
- instance = routing["instance"]
- if group == None or instance == None:
- return # ignore invalid packets entirely
- self.subs.subscribe(group, instance, sock)
- def process_command_unsubscribe(self, sock, routing, data):
- group = routing["group"]
- instance = routing["instance"]
- if group == None or instance == None:
- return # ignore invalid packets entirely
- self.subs.unsubscribe(group, instance, sock)
- def run(self):
- """Process messages. Forever. Mostly."""
- self.running = True
- if self.poller:
- self.run_poller()
- else:
- self.run_kqueue()
- def run_poller(self):
- while self.running:
- try:
- # Poll with a timeout so that every once in a while,
- # the loop checks for self.running.
- events = self.poller.poll()
- except select.error as err:
- if err.args[0] == errno.EINTR:
- events = []
- else:
- logger.fatal(MSGQ_POLL_ERR, err)
- break
- for (fd, event) in events:
- if fd == self.listen_socket.fileno():
- self.process_accept()
- else:
- if event & select.POLLOUT:
- self.__process_write(fd)
- elif event & select.POLLIN:
- self.process_socket(fd)
- else:
- logger.error(MSGQ_POLL_UNKNOWN_EVENT, fd, event)
- def run_kqueue(self):
- while self.running:
- # Check with a timeout so that every once in a while,
- # the loop checks for self.running.
- events = self.kqueue.control(None, 10)
- if not events:
- raise RuntimeError('serve: kqueue returned no events')
- for event in events:
- if event.ident == self.listen_socket.fileno():
- self.process_accept()
- else:
- if event.filter == select.KQ_FILTER_WRITE:
- self.__process_write(event.ident)
- if event.filter == select.KQ_FILTER_READ and \
- event.data > 0:
- self.process_socket(event.ident)
- elif event.flags & select.KQ_EV_EOF:
- self.kill_socket(event.ident,
- self.sockets[event.ident])
- def stop(self):
- self.running = False
- def shutdown(self):
- """Stop the MsgQ master."""
- if self.verbose:
- sys.stdout.write("[b10-msgq] Stopping the server.\n")
- self.listen_socket.close()
- if os.path.exists(self.socket_file):
- os.remove(self.socket_file)
- # can signal handling and calling a destructor be done without a
- # global variable?
- msgq = None
- def signal_handler(signal, frame):
- if msgq:
- msgq.shutdown()
- sys.exit(0)
- if __name__ == "__main__":
- def check_port(option, opt_str, value, parser):
- """Function to insure that the port we are passed is actually
- a valid port number. Used by OptionParser() on startup."""
- intval = int(value)
- if (intval < 0) or (intval > 65535):
- raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
- parser.values.msgq_port = intval
- # Parse any command-line options.
- parser = OptionParser(version=VERSION)
- parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
- help="display more about what is going on")
- parser.add_option("-s", "--socket-file", dest="msgq_socket_file",
- type="string", default=None,
- help="UNIX domain socket file the msgq daemon will use")
- (options, args) = parser.parse_args()
- # Init logging, according to the parameters.
- # FIXME: Do proper logger configuration, this is just a hack
- # This is #2582
- sev = 'INFO'
- if options.verbose:
- sev = 'DEBUG'
- isc.log.init("b10-msgq", buffer=False, severity=sev, debuglevel=99)
- signal.signal(signal.SIGTERM, signal_handler)
- # Announce startup.
- logger.debug(TRACE_START, MSGQ_START, VERSION)
- msgq = MsgQ(options.msgq_socket_file, options.verbose)
- try:
- msgq.setup()
- except Exception as e:
- logger.fatal(MSGQ_START_FAIL, e)
- sys.exit(1)
- try:
- msgq.run()
- except KeyboardInterrupt:
- pass
- msgq.shutdown()
|