123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452 |
- #!@PYTHON@
- # Copyright (C) 2010 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- import sys; sys.path.append ('@@PYTHONPATH@@')
- """This code implements the msgq daemon."""
- import subprocess
- import signal
- import os
- import socket
- import sys
- import struct
- import errno
- import time
- import select
- import pprint
- import random
- from optparse import OptionParser, OptionValueError
- import isc.util.process
- import isc.cc
- isc.util.process.rename()
- # This is the version that gets displayed to the user.
- # The VERSION string consists of the module name, the module version
- # number, and the overall BIND 10 version number (set in configure.ac).
- VERSION = "b10-msgq 20100818 (BIND 10 @PACKAGE_VERSION@)"
- class MsgQReceiveError(Exception): pass
- class SubscriptionManager:
- def __init__(self):
- self.subscriptions = {}
- def subscribe(self, group, instance, socket):
- """Add a subscription."""
- target = ( group, instance )
- if target in self.subscriptions:
- print("[b10-msgq] Appending to existing target")
- if socket not in self.subscriptions[target]:
- self.subscriptions[target].append(socket)
- else:
- print("[b10-msgq] Creating new target")
- self.subscriptions[target] = [ socket ]
- def unsubscribe(self, group, instance, socket):
- """Remove the socket from the one specific subscription."""
- target = ( group, instance )
- if target in self.subscriptions:
- if socket in self.subscriptions[target]:
- self.subscriptions[target].remove(socket)
- def unsubscribe_all(self, socket):
- """Remove the socket from all subscriptions."""
- for socklist in self.subscriptions.values():
- if socket in socklist:
- socklist.remove(socket)
- def find_sub(self, group, instance):
- """Return an array of sockets which want this specific group,
- instance."""
- target = (group, instance)
- if target in self.subscriptions:
- return self.subscriptions[target]
- else:
- return []
- def find(self, group, instance):
- """Return an array of sockets who should get something sent to
- this group, instance pair. This includes wildcard subscriptions."""
- target = (group, instance)
- partone = self.find_sub(group, instance)
- parttwo = self.find_sub(group, "*")
- return list(set(partone + parttwo))
- class MsgQ:
- """Message Queue class."""
- # did we find a better way to do this?
- SOCKET_FILE = os.path.join("@localstatedir@",
- "@PACKAGE_NAME@",
- "msgq_socket").replace("${prefix}",
- "@prefix@")
-
- def __init__(self, socket_file=None, verbose=False):
- """Initialize the MsgQ master.
-
- The socket_file specifies the path to the UNIX domain socket
- that the msgq process listens on. If it is None, the
- environment variable BIND10_MSGQ_SOCKET_FILE is used. If that
- is not set, it will default to
- @localstatedir@/@PACKAGE_NAME@/msg_socket.
- If verbose is True, then the MsgQ reports
- what it is doing.
- """
- if socket_file is None:
- if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
- self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"]
- else:
- self.socket_file = self.SOCKET_FILE
- else:
- self.socket_file = socket_file
- self.verbose = verbose
- self.poller = None
- self.kqueue = None
- self.runnable = False
- self.listen_socket = False
- self.sockets = {}
- self.connection_counter = random.random()
- self.hostname = socket.gethostname()
- self.subs = SubscriptionManager()
- self.lnames = {}
- def setup_poller(self):
- """Set up the poll thing. Internal function."""
- try:
- self.poller = select.poll()
- except AttributeError:
- self.kqueue = select.kqueue()
-
- def add_kqueue_socket(self, socket):
- event = select.kevent(socket.fileno(),
- select.KQ_FILTER_READ,
- select.KQ_EV_ADD | select.KQ_EV_ENABLE)
- self.kqueue.control([event], 0)
- def setup_listener(self):
- """Set up the listener socket. Internal function."""
- if self.verbose:
- sys.stdout.write("[b10-msgq] Setting up socket at %s\n" %
- self.socket_file)
- self.listen_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-
- if os.path.exists(self.socket_file):
- os.remove(self.socket_file)
- try:
- self.listen_socket.bind(self.socket_file)
- self.listen_socket.listen(1024)
- except Exception as e:
- # remove the file again if something goes wrong
- # (note this is a catch-all, but we reraise it)
- if os.path.exists(self.socket_file):
- os.remove(self.socket_file)
- raise e
- if self.poller:
- self.poller.register(self.listen_socket, select.POLLIN)
- else:
- self.add_kqueue_socket(self.listen_socket)
- def setup(self):
- """Configure listener socket, polling, etc.
- Raises a socket.error if the socket_file cannot be
- created.
- """
- self.setup_poller()
- self.setup_listener()
- if self.verbose:
- sys.stdout.write("[b10-msgq] Listening\n")
-
- self.runnable = True
- def process_accept(self):
- """Process an accept on the listening socket."""
- newsocket, ipaddr = self.listen_socket.accept()
- # TODO: When we have logging, we might want
- # to add a debug message here that a new connection
- # was made
- self.sockets[newsocket.fileno()] = newsocket
- lname = self.newlname()
- self.lnames[lname] = newsocket
- if self.poller:
- self.poller.register(newsocket, select.POLLIN)
- else:
- self.add_kqueue_socket(newsocket)
- def process_socket(self, fd):
- """Process a read on a socket."""
- sock = self.sockets[fd]
- if sock == None:
- sys.stderr.write("[b10-msgq] Got read on Strange Socket fd %d\n" % fd)
- return
- # sys.stderr.write("[b10-msgq] Got read on fd %d\n" %fd)
- self.process_packet(fd, sock)
- def kill_socket(self, fd, sock):
- """Fully close down the socket."""
- if self.poller:
- self.poller.unregister(sock)
- self.subs.unsubscribe_all(sock)
- lname = [ k for k, v in self.lnames.items() if v == sock ][0]
- del self.lnames[lname]
- sock.close()
- self.sockets[fd] = None
- sys.stderr.write("[b10-msgq] Closing socket fd %d\n" % fd)
- def getbytes(self, fd, sock, length):
- """Get exactly the requested bytes, or raise an exception if
- EOF."""
- received = b''
- while len(received) < length:
- try:
- data = sock.recv(length - len(received))
- except socket.error:
- raise MsgQReceiveError(socket.error)
- if len(data) == 0:
- raise MsgQReceiveError("EOF")
- received += data
- return received
- def read_packet(self, fd, sock):
- """Read a correctly formatted packet. Will raise exceptions if
- something fails."""
- lengths = self.getbytes(fd, sock, 6)
- overall_length, routing_length = struct.unpack(">IH", lengths)
- if overall_length < 2:
- raise MsgQReceiveError("overall_length < 2")
- overall_length -= 2
- if routing_length > overall_length:
- raise MsgQReceiveError("routing_length > overall_length")
- if routing_length == 0:
- raise MsgQReceiveError("routing_length == 0")
- data_length = overall_length - routing_length
- # probably need to sanity check lengths here...
- routing = self.getbytes(fd, sock, routing_length)
- if data_length > 0:
- data = self.getbytes(fd, sock, data_length)
- else:
- data = None
- return (routing, data)
- def process_packet(self, fd, sock):
- """Process one packet."""
- try:
- routing, data = self.read_packet(fd, sock)
- except MsgQReceiveError as err:
- self.kill_socket(fd, sock)
- sys.stderr.write("[b10-msgq] Receive error: %s\n" % err)
- return
- try:
- routingmsg = isc.cc.message.from_wire(routing)
- except DecodeError as err:
- self.kill_socket(fd, sock)
- sys.stderr.write("[b10-msgq] Routing decode error: %s\n" % err)
- return
- # sys.stdout.write("\t" + pprint.pformat(routingmsg) + "\n")
- # sys.stdout.write("\t" + pprint.pformat(data) + "\n")
- self.process_command(fd, sock, routingmsg, data)
- def process_command(self, fd, sock, routing, data):
- """Process a single command. This will split out into one of the
- other functions."""
- # TODO: A print statement got removed here (one that prints the
- # routing envelope). When we have logging with multiple levels,
- # we might want to re-add that on a high debug verbosity.
- cmd = routing["type"]
- if cmd == 'send':
- self.process_command_send(sock, routing, data)
- elif cmd == 'subscribe':
- self.process_command_subscribe(sock, routing, data)
- elif cmd == 'unsubscribe':
- self.process_command_unsubscribe(sock, routing, data)
- elif cmd == 'getlname':
- self.process_command_getlname(sock, routing, data)
- else:
- sys.stderr.write("[b10-msgq] Invalid command: %s\n" % cmd)
- def preparemsg(self, env, msg = None):
- if type(env) == dict:
- env = isc.cc.message.to_wire(env)
- if type(msg) == dict:
- msg = isc.cc.message.to_wire(msg)
- length = 2 + len(env);
- if msg:
- length += len(msg)
- ret = struct.pack("!IH", length, len(env))
- ret += env
- if msg:
- ret += msg
- return ret
- def sendmsg(self, sock, env, msg = None):
- sock.send(self.preparemsg(env, msg))
- def send_prepared_msg(self, sock, msg):
- sock.send(msg)
- def newlname(self):
- """Generate a unique connection identifier for this socket.
- This is done by using an increasing counter and the current
- time."""
- self.connection_counter += 1
- return "%x_%x@%s" % (time.time(), self.connection_counter, self.hostname)
- def process_command_getlname(self, sock, routing, data):
- lname = [ k for k, v in self.lnames.items() if v == sock ][0]
- self.sendmsg(sock, { "type" : "getlname" }, { "lname" : lname })
- def process_command_send(self, sock, routing, data):
- group = routing["group"]
- instance = routing["instance"]
- to = routing["to"]
- if group == None or instance == None:
- return # ignore invalid packets entirely
- if to == "*":
- sockets = self.subs.find(group, instance)
- else:
- if to in self.lnames:
- sockets = [ self.lnames[to] ]
- else:
- return # recipient doesn't exist
- msg = self.preparemsg(routing, data)
- if sock in sockets:
- sockets.remove(sock)
- for socket in sockets:
- self.send_prepared_msg(socket, msg)
- def process_command_subscribe(self, sock, routing, data):
- group = routing["group"]
- instance = routing["instance"]
- if group == None or instance == None:
- return # ignore invalid packets entirely
- self.subs.subscribe(group, instance, sock)
- def process_command_unsubscribe(self, sock, routing, data):
- group = routing["group"]
- instance = routing["instance"]
- if group == None or instance == None:
- return # ignore invalid packets entirely
- self.subs.unsubscribe(group, instance, sock)
- def run(self):
- """Process messages. Forever. Mostly."""
-
- if self.poller:
- self.run_poller()
- else:
- self.run_kqueue()
-
- def run_poller(self):
- while True:
- try:
- events = self.poller.poll()
- except select.error as err:
- if err.args[0] == errno.EINTR:
- events = []
- else:
- sys.stderr.write("[b10-msgq] Error with poll(): %s\n" % err)
- break
- for (fd, event) in events:
- if fd == self.listen_socket.fileno():
- self.process_accept()
- else:
- self.process_socket(fd)
- def run_kqueue(self):
- while True:
- events = self.kqueue.control(None, 10)
- if not events:
- raise RuntimeError('serve: kqueue returned no events')
-
- for event in events:
- if event.ident == self.listen_socket.fileno():
- self.process_accept()
- else:
- if event.flags & select.KQ_FILTER_READ and event.data > 0:
- self.process_socket(event.ident)
- elif event.flags & select.KQ_EV_EOF:
- self.kill_socket(event.ident, self.sockets[event.ident])
- def shutdown(self):
- """Stop the MsgQ master."""
- if self.verbose:
- sys.stdout.write("[b10-msgq] Stopping the server.\n")
- self.listen_socket.close()
- if os.path.exists(self.socket_file):
- os.remove(self.socket_file)
- # can signal handling and calling a destructor be done without a
- # global variable?
- msgq = None
- def signal_handler(signal, frame):
- if msgq:
- msgq.shutdown()
- sys.exit(0)
- if __name__ == "__main__":
- def check_port(option, opt_str, value, parser):
- """Function to insure that the port we are passed is actually
- a valid port number. Used by OptionParser() on startup."""
- intval = int(value)
- if (intval < 0) or (intval > 65535):
- raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
- parser.values.msgq_port = intval
- # Parse any command-line options.
- parser = OptionParser(version=VERSION)
- parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
- help="display more about what is going on")
- parser.add_option("-s", "--socket-file", dest="msgq_socket_file",
- type="string", default=None,
- help="UNIX domain socket file the msgq daemon will use")
- (options, args) = parser.parse_args()
- signal.signal(signal.SIGTERM, signal_handler)
- # Announce startup.
- if options.verbose:
- sys.stdout.write("[b10-msgq] %s\n" % VERSION)
- msgq = MsgQ(options.msgq_socket_file, options.verbose)
- setup_result = msgq.setup()
- if setup_result:
- sys.stderr.write("[b10-msgq] Error on startup: %s\n" % setup_result)
- sys.exit(1)
- try:
- msgq.run()
- except KeyboardInterrupt:
- pass
- msgq.shutdown()
|