123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829 |
- #!@PYTHON@
- # Copyright (C) 2010 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- import sys; sys.path.append ('@@PYTHONPATH@@')
- """This code implements the msgq daemon."""
- import subprocess
- import signal
- import os
- import socket
- import sys
- import struct
- import errno
- import time
- import select
- import random
- import threading
- import isc.config.ccsession
- from optparse import OptionParser, OptionValueError
- import isc.util.process
- import isc.log
- from isc.log_messages.msgq_messages import *
- import isc.cc
- isc.util.process.rename()
- isc.log.init("b10-msgq", buffer=True)
- # Logger that is used in the actual msgq handling - startup, shutdown and the
- # poller thread.
- logger = isc.log.Logger("msgq")
- # A separate copy for the master/config thread when the poller thread runs.
- # We use a separate instance, since the logger itself doesn't have to be
- # thread safe.
- config_logger = isc.log.Logger("msgq")
- TRACE_START = logger.DBGLVL_START_SHUT
- TRACE_BASIC = logger.DBGLVL_TRACE_BASIC
- TRACE_DETAIL = logger.DBGLVL_TRACE_DETAIL
- # This is the version that gets displayed to the user.
- # The VERSION string consists of the module name, the module version
- # number, and the overall BIND 10 version number (set in configure.ac).
- VERSION = "b10-msgq 20110127 (BIND 10 @PACKAGE_VERSION@)"
- # If B10_FROM_BUILD is set in the environment, we use data files
- # from a directory relative to that, otherwise we use the ones
- # installed on the system
- if "B10_FROM_BUILD" in os.environ:
- SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/msgq"
- else:
- PREFIX = "@prefix@"
- DATAROOTDIR = "@datarootdir@"
- SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
- SPECFILE_LOCATION = SPECFILE_PATH + "/msgq.spec"
- class MsgQReceiveError(Exception): pass
- class MsgQCloseOnReceive(Exception):
- """Exception raised when reading data from a socket results in 'shutdown'.
- This happens when msgq received 0-length data. This class holds whether
- it happens in the middle of reading (i.e. after reading some) via
- partial_read parameter, which is set to True if and only if so.
- This will be used by an upper layer catching the exception to distinguish
- the severity of the event.
- """
- def __init__(self, reason, partial_read):
- self.partial_read = partial_read
- self.__reason = reason
- def __str__(self):
- return self.__reason
- class SubscriptionManager:
- def __init__(self, cfgmgr_ready):
- """
- Initialize the subscription manager.
- parameters:
- * cfgmgr_ready: A callable object run once the config manager
- subscribes. This is a hackish solution, but we can't read
- the configuration sooner.
- """
- self.subscriptions = {}
- self.__cfgmgr_ready = cfgmgr_ready
- self.__cfgmgr_ready_called = False
- def subscribe(self, group, instance, socket):
- """Add a subscription."""
- target = ( group, instance )
- if target in self.subscriptions:
- logger.debug(TRACE_BASIC, MSGQ_SUBS_APPEND_TARGET, group, instance)
- if socket not in self.subscriptions[target]:
- self.subscriptions[target].append(socket)
- else:
- logger.debug(TRACE_BASIC, MSGQ_SUBS_NEW_TARGET, group, instance)
- self.subscriptions[target] = [ socket ]
- if group == "ConfigManager" and not self.__cfgmgr_ready_called:
- logger.debug(TRACE_BASIC, MSGQ_CFGMGR_SUBSCRIBED)
- self.__cfgmgr_ready_called = True
- self.__cfgmgr_ready()
- def unsubscribe(self, group, instance, socket):
- """Remove the socket from the one specific subscription."""
- target = ( group, instance )
- if target in self.subscriptions:
- if socket in self.subscriptions[target]:
- self.subscriptions[target].remove(socket)
- def unsubscribe_all(self, socket):
- """Remove the socket from all subscriptions."""
- for socklist in self.subscriptions.values():
- if socket in socklist:
- socklist.remove(socket)
- def find_sub(self, group, instance):
- """Return an array of sockets which want this specific group,
- instance."""
- target = (group, instance)
- if target in self.subscriptions:
- return self.subscriptions[target]
- else:
- return []
- def find(self, group, instance):
- """Return an array of sockets who should get something sent to
- this group, instance pair. This includes wildcard subscriptions."""
- target = (group, instance)
- partone = self.find_sub(group, instance)
- parttwo = self.find_sub(group, "*")
- return list(set(partone + parttwo))
- class MsgQ:
- """Message Queue class."""
- # did we find a better way to do this?
- SOCKET_FILE = os.path.join("@localstatedir@",
- "@PACKAGE_NAME@",
- "msgq_socket").replace("${prefix}",
- "@prefix@")
- def __init__(self, socket_file=None, verbose=False):
- """Initialize the MsgQ master.
- The socket_file specifies the path to the UNIX domain socket
- that the msgq process listens on. If it is None, the
- environment variable BIND10_MSGQ_SOCKET_FILE is used. If that
- is not set, it will default to
- @localstatedir@/@PACKAGE_NAME@/msg_socket.
- If verbose is True, then the MsgQ reports
- what it is doing.
- """
- if socket_file is None:
- if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
- self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"]
- else:
- self.socket_file = self.SOCKET_FILE
- else:
- self.socket_file = socket_file
- self.verbose = verbose
- self.poller = None
- self.kqueue = None
- self.runnable = False
- self.listen_socket = False
- self.sockets = {}
- self.connection_counter = random.random()
- self.hostname = socket.gethostname()
- self.subs = SubscriptionManager(self.cfgmgr_ready)
- self.lnames = {}
- self.sendbuffs = {}
- self.running = False
- self.__cfgmgr_ready = None
- self.__cfgmgr_ready_cond = threading.Condition()
- # A lock used when the message queue does anything more complicated.
- # It is mostly a safety measure, the threads doing so should be mostly
- # independent, and the one with config session should be read only,
- # but with threads, one never knows. We use threads for concurrency,
- # not for performance, so we use wide lock scopes to be on the safe
- # side.
- self.__lock = threading.Lock()
- def cfgmgr_ready(self, ready=True):
- """Notify that the config manager is either subscribed, or
- that the msgq is shutting down and it won't connect, but
- anybody waiting for it should stop anyway.
- The ready parameter signifies if the config manager is subscribed.
- This method can be called multiple times, but second and any
- following call is simply ignored. This means the "abort" version
- of the call can be used on any stop unconditionally, even when
- the config manager already connected.
- """
- with self.__cfgmgr_ready_cond:
- if self.__cfgmgr_ready is not None:
- # This is a second call to this method. In that case it does
- # nothing.
- return
- self.__cfgmgr_ready = ready
- self.__cfgmgr_ready_cond.notify_all()
- def wait_cfgmgr(self):
- """Wait for msgq to subscribe.
- When this returns, the config manager is either subscribed, or
- msgq gave up waiting for it. Success is signified by the return
- value.
- """
- with self.__cfgmgr_ready_cond:
- # Wait until it either aborts or subscribes
- while self.__cfgmgr_ready is None:
- self.__cfgmgr_ready_cond.wait()
- return self.__cfgmgr_ready
- def setup_poller(self):
- """Set up the poll thing. Internal function."""
- try:
- self.kqueue = select.kqueue()
- except AttributeError:
- self.poller = select.poll()
- def add_kqueue_socket(self, socket, write_filter=False):
- """Add a kqueue filter for a socket. By default the read
- filter is used; if write_filter is set to True, the write
- filter is used. We use a boolean value instead of a specific
- filter constant, because kqueue filter values do not seem to
- be defined on some systems. The use of boolean makes the
- interface restrictive because there are other filters, but this
- method is mostly only for our internal use, so it should be
- acceptable at least for now."""
- filter_type = select.KQ_FILTER_WRITE if write_filter else \
- select.KQ_FILTER_READ
- event = select.kevent(socket.fileno(), filter_type,
- select.KQ_EV_ADD | select.KQ_EV_ENABLE)
- self.kqueue.control([event], 0)
- def delete_kqueue_socket(self, socket, write_filter=False):
- """Delete a kqueue filter for socket. See add_kqueue_socket()
- for the semantics and notes about write_filter."""
- filter_type = select.KQ_FILTER_WRITE if write_filter else \
- select.KQ_FILTER_READ
- event = select.kevent(socket.fileno(), filter_type,
- select.KQ_EV_DELETE)
- self.kqueue.control([event], 0)
- def setup_listener(self):
- """Set up the listener socket. Internal function."""
- logger.debug(TRACE_BASIC, MSGQ_LISTENER_SETUP, self.socket_file)
- self.listen_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- if os.path.exists(self.socket_file):
- os.remove(self.socket_file)
- try:
- self.listen_socket.bind(self.socket_file)
- self.listen_socket.listen(1024)
- except Exception as e:
- # remove the file again if something goes wrong
- # (note this is a catch-all, but we reraise it)
- if os.path.exists(self.socket_file):
- os.remove(self.socket_file)
- self.listen_socket.close()
- logger.fatal(MSGQ_LISTENER_FAILED, self.socket_file, e)
- raise e
- if self.poller:
- self.poller.register(self.listen_socket, select.POLLIN)
- else:
- self.add_kqueue_socket(self.listen_socket)
- def setup_signalsock(self):
- """Create a socket pair used to signal when we want to finish.
- Using a socket is easy and thread/signal safe way to signal
- the termination.
- """
- # The __poller_sock will be the end in the poller. When it is
- # closed, we should shut down.
- (self.__poller_sock, self.__control_sock) = socket.socketpair()
- if self.poller:
- self.poller.register(self.__poller_sock, select.POLLIN)
- else:
- self.add_kqueue_socket(self.__poller_sock)
- def setup(self):
- """Configure listener socket, polling, etc.
- Raises a socket.error if the socket_file cannot be
- created.
- """
- self.setup_poller()
- self.setup_signalsock()
- self.setup_listener()
- logger.debug(TRACE_START, MSGQ_LISTENER_STARTED);
- self.runnable = True
- def process_accept(self):
- """Process an accept on the listening socket."""
- newsocket, ipaddr = self.listen_socket.accept()
- # TODO: When we have logging, we might want
- # to add a debug message here that a new connection
- # was made
- self.register_socket(newsocket)
- def register_socket(self, newsocket):
- """
- Internal function to insert a socket. Used by process_accept and some tests.
- """
- self.sockets[newsocket.fileno()] = newsocket
- lname = self.newlname()
- self.lnames[lname] = newsocket
- logger.debug(TRACE_BASIC, MSGQ_SOCKET_REGISTERED, newsocket.fileno(),
- lname)
- if self.poller:
- self.poller.register(newsocket, select.POLLIN)
- else:
- self.add_kqueue_socket(newsocket)
- def kill_socket(self, fd, sock):
- """Fully close down the socket."""
- # Unregister events on the socket. Note that we don't have to do
- # this for kqueue because the registered events are automatically
- # deleted when the corresponding socket is closed.
- if self.poller:
- self.poller.unregister(sock)
- self.subs.unsubscribe_all(sock)
- lname = [ k for k, v in self.lnames.items() if v == sock ][0]
- del self.lnames[lname]
- sock.close()
- del self.sockets[fd]
- if fd in self.sendbuffs:
- del self.sendbuffs[fd]
- logger.debug(TRACE_BASIC, MSGQ_SOCK_CLOSE, fd)
- def __getbytes(self, fd, sock, length, continued):
- """Get exactly the requested bytes, or raise an exception if
- EOF.
- continued is set to True if this method is called to complete
- already read data.
- """
- received = b''
- while len(received) < length:
- try:
- data = sock.recv(length - len(received))
- except socket.error as err:
- # This case includes ECONNRESET, which seems to happen when
- # the remote client has closed its socket at some subtle
- # timing (it should normally result in receiving empty data).
- # Since we didn't figure out how exactly that could happen,
- # we treat it just like other really-unexpected socket errors.
- raise MsgQReceiveError(str(err))
- if len(data) == 0:
- raise MsgQCloseOnReceive("EOF", continued)
- received += data
- continued = True
- return received
- def read_packet(self, fd, sock):
- """Read a correctly formatted packet. Will raise exceptions if
- something fails."""
- lengths = self.__getbytes(fd, sock, 6, False)
- overall_length, routing_length = struct.unpack(">IH", lengths)
- if overall_length < 2:
- raise MsgQReceiveError("overall_length < 2")
- overall_length -= 2
- if routing_length > overall_length:
- raise MsgQReceiveError("routing_length > overall_length")
- if routing_length == 0:
- raise MsgQReceiveError("routing_length == 0")
- data_length = overall_length - routing_length
- # probably need to sanity check lengths here...
- routing = self.__getbytes(fd, sock, routing_length, True)
- if data_length > 0:
- data = self.__getbytes(fd, sock, data_length, True)
- else:
- data = None
- return (routing, data)
- def process_packet(self, fd, sock):
- """Process one packet."""
- try:
- routing, data = self.read_packet(fd, sock)
- except (MsgQReceiveError, MsgQCloseOnReceive) as err:
- # If it's MsgQCloseOnReceive and that happens without reading
- # any data, it basically means the remote clinet has closed the
- # socket, so we log it as debug information. Otherwise, it's
- # a somewhat unexpected event, so we consider it an "error".
- if isinstance(err, MsgQCloseOnReceive) and not err.partial_read:
- logger.debug(TRACE_BASIC, MSGQ_CLOSE_ON_RECV, fd)
- else:
- logger.error(MSGQ_RECV_ERROR, fd, err)
- self.kill_socket(fd, sock)
- return
- try:
- routingmsg = isc.cc.message.from_wire(routing)
- except DecodeError as err:
- self.kill_socket(fd, sock)
- logger.error(MSGQ_HDR_DECODE_ERROR, fd, err)
- return
- self.process_command(fd, sock, routingmsg, data)
- def process_command(self, fd, sock, routing, data):
- """Process a single command. This will split out into one of the
- other functions."""
- logger.debug(TRACE_DETAIL, MSGQ_RECV_HDR, routing)
- cmd = routing["type"]
- if cmd == 'send':
- self.process_command_send(sock, routing, data)
- elif cmd == 'subscribe':
- self.process_command_subscribe(sock, routing, data)
- elif cmd == 'unsubscribe':
- self.process_command_unsubscribe(sock, routing, data)
- elif cmd == 'getlname':
- self.process_command_getlname(sock, routing, data)
- elif cmd == 'ping':
- # Command for testing purposes
- self.process_command_ping(sock, routing, data)
- elif cmd == 'stop':
- self.stop()
- else:
- logger.error(MSGQ_INVALID_CMD, cmd)
- def preparemsg(self, env, msg = None):
- if type(env) == dict:
- env = isc.cc.message.to_wire(env)
- if type(msg) == dict:
- msg = isc.cc.message.to_wire(msg)
- length = 2 + len(env);
- if msg:
- length += len(msg)
- ret = struct.pack("!IH", length, len(env))
- ret += env
- if msg:
- ret += msg
- return ret
- def sendmsg(self, sock, env, msg = None):
- self.send_prepared_msg(sock, self.preparemsg(env, msg))
- def _send_data(self, sock, data):
- """
- Send a piece of data to the given socket. This method is
- essentially "private" to MsgQ, but defined as if it were "protected"
- for easier access from tests.
- Parameters:
- sock: The socket to send to
- data: The list of bytes to send
- Returns:
- An integer or None. If an integer (which can be 0), it signals
- the number of bytes sent. If None, the socket appears to have
- been closed on the other end, and it has been killed on this
- side too.
- """
- try:
- # We set the socket nonblocking, MSG_DONTWAIT doesn't exist
- # on some OSes
- sock.setblocking(0)
- return sock.send(data)
- except socket.error as e:
- if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR ]:
- return 0
- elif e.errno in [ errno.EPIPE, errno.ECONNRESET, errno.ENOBUFS ]:
- # EPIPE happens if the remote module has terminated by the time
- # of this send; its severity can vary, but in many cases it
- # shouldn't be critical, so we log it separately as a warning.
- if e.errno == errno.EPIPE:
- logger.warn(MSGQ_CLOSE_ON_SEND, sock.fileno())
- else:
- logger.error(MSGQ_SEND_ERROR, sock.fileno(),
- errno.errorcode[e.errno])
- self.kill_socket(sock.fileno(), sock)
- return None
- else:
- raise e
- finally:
- # And set it back again
- sock.setblocking(1)
- def send_prepared_msg(self, sock, msg):
- # Try to send the data, but only if there's nothing waiting
- fileno = sock.fileno()
- if fileno in self.sendbuffs:
- amount_sent = 0
- else:
- amount_sent = self._send_data(sock, msg)
- if amount_sent is None:
- # Socket has been killed, drop the send
- return
- # Still something to send, add it to outgoing queue
- if amount_sent < len(msg):
- now = time.clock()
- # Append it to buffer (but check the data go away)
- if fileno in self.sendbuffs:
- (last_sent, buff) = self.sendbuffs[fileno]
- if now - last_sent > 0.1:
- self.kill_socket(fileno, sock)
- return
- buff += msg
- else:
- buff = msg[amount_sent:]
- last_sent = now
- if self.poller:
- self.poller.register(fileno, select.POLLIN |
- select.POLLOUT)
- else:
- self.add_kqueue_socket(sock, True)
- self.sendbuffs[fileno] = (last_sent, buff)
- def __process_write(self, fileno):
- # Try to send some data from the buffer
- (_, msg) = self.sendbuffs[fileno]
- sock = self.sockets[fileno]
- amount_sent = self._send_data(sock, msg)
- if amount_sent is not None:
- # Keep the rest
- msg = msg[amount_sent:]
- if len(msg) == 0:
- # If there's no more, stop requesting for write availability
- if self.poller:
- self.poller.register(fileno, select.POLLIN)
- else:
- self.delete_kqueue_socket(sock, True)
- del self.sendbuffs[fileno]
- else:
- self.sendbuffs[fileno] = (time.clock(), msg)
- def newlname(self):
- """Generate a unique connection identifier for this socket.
- This is done by using an increasing counter and the current
- time."""
- self.connection_counter += 1
- return "%x_%x@%s" % (time.time(), self.connection_counter, self.hostname)
- def process_command_ping(self, sock, routing, data):
- self.sendmsg(sock, { "type" : "pong" }, data)
- def process_command_getlname(self, sock, routing, data):
- lname = [ k for k, v in self.lnames.items() if v == sock ][0]
- self.sendmsg(sock, { "type" : "getlname" }, { "lname" : lname })
- def process_command_send(self, sock, routing, data):
- group = routing["group"]
- instance = routing["instance"]
- to = routing["to"]
- if group == None or instance == None:
- return # ignore invalid packets entirely
- if to == "*":
- sockets = self.subs.find(group, instance)
- else:
- if to in self.lnames:
- sockets = [ self.lnames[to] ]
- else:
- return # recipient doesn't exist
- msg = self.preparemsg(routing, data)
- if sock in sockets:
- sockets.remove(sock)
- for socket in sockets:
- self.send_prepared_msg(socket, msg)
- def process_command_subscribe(self, sock, routing, data):
- group = routing["group"]
- instance = routing["instance"]
- if group == None or instance == None:
- return # ignore invalid packets entirely
- self.subs.subscribe(group, instance, sock)
- def process_command_unsubscribe(self, sock, routing, data):
- group = routing["group"]
- instance = routing["instance"]
- if group == None or instance == None:
- return # ignore invalid packets entirely
- self.subs.unsubscribe(group, instance, sock)
- def run(self):
- """Process messages. Forever. Mostly."""
- self.running = True
- if self.poller:
- self.run_poller()
- else:
- self.run_kqueue()
- def run_poller(self):
- while self.running:
- try:
- # Poll with a timeout so that every once in a while,
- # the loop checks for self.running.
- events = self.poller.poll()
- except select.error as err:
- if err.args[0] == errno.EINTR:
- events = []
- else:
- logger.fatal(MSGQ_POLL_ERROR, err)
- break
- with self.__lock:
- for (fd, event) in events:
- if fd == self.listen_socket.fileno():
- self.process_accept()
- elif fd == self.__poller_sock.fileno():
- # If it's the signal socket, we should terminate now.
- self.running = False
- break
- else:
- writable = event & select.POLLOUT
- # Note: it may be okay to read data if available
- # immediately after write some, but due to unexpected
- # regression (see comments on the kqueue version below)
- # we restrict one operation per iteration for now.
- # In future we may clarify the point and enable the
- # "read/write" mode.
- readable = not writable and (event & select.POLLIN)
- if not writable and not readable:
- logger.error(MSGQ_POLL_UNKNOWN_EVENT, fd, event)
- self._process_fd(fd, writable, readable, False)
- def run_kqueue(self):
- while self.running:
- # Check with a timeout so that every once in a while,
- # the loop checks for self.running.
- events = self.kqueue.control(None, 10)
- if not events:
- raise RuntimeError('serve: kqueue returned no events')
- with self.__lock:
- for event in events:
- if event.ident == self.listen_socket.fileno():
- self.process_accept()
- elif event.ident == self.__poller_sock.fileno():
- # If it's the signal socket, we should terminate now.
- self.running = False
- break;
- else:
- fd = event.ident
- writable = event.filter == select.KQ_FILTER_WRITE
- readable = (event.filter == select.KQ_FILTER_READ and
- event.data > 0)
- # It seems to break some of our test cases if we
- # immediately close the socket on EOF after reading
- # some data. It may be possible to avoid by tweaking
- # the test, but unless we can be sure we'll hold off.
- closed = (not readable and
- (event.flags & select.KQ_EV_EOF))
- self._process_fd(fd, writable, readable, closed)
- def _process_fd(self, fd, writable, readable, closed):
- '''Process a single FD: unified subroutine of run_kqueue/poller.
- closed can be True only in the case of kqueue. This is essentially
- private but is defined as if it were "protected" so it's callable
- from tests.
- '''
- # We need to check if FD is still in the sockets dict, because
- # it's possible that the socket has been "killed" while processing
- # other FDs; it's even possible it's killed within this method.
- if writable and fd in self.sockets:
- self.__process_write(fd)
- if readable and fd in self.sockets:
- self.process_packet(fd, self.sockets[fd])
- if closed and fd in self.sockets:
- self.kill_socket(fd, self.sockets[fd])
- def stop(self):
- # Signal it should terminate.
- self.__control_sock.close()
- self.__control_sock = None
- # Abort anything waiting on the condition, just to make sure it's not
- # blocked forever
- self.cfgmgr_ready(False)
- def cleanup_signalsock(self):
- """Close the signal sockets. We could do it directly in shutdown,
- but this part is reused in tests.
- """
- if self.__poller_sock:
- self.__poller_sock.close()
- self.__poller_sock = None
- if self.__control_sock:
- self.__control_sock.close()
- self.__control_sock = None
- def shutdown(self):
- """Stop the MsgQ master."""
- logger.debug(TRACE_START, MSGQ_SHUTDOWN)
- self.listen_socket.close()
- self.cleanup_signalsock()
- # Close all the sockets too. In real life, there should be none now,
- # as Msgq should be the last one. But some tests don't adhere to this
- # and create a new Msgq for each test, which led to huge socket leaks.
- # Some other threads put some other things in instead of sockets, so
- # we catch whatever exceptions there we can. This should be safe,
- # because in real operation, we will terminate now anyway, implicitly
- # closing anything anyway.
- for sock in self.sockets.values():
- try:
- sock.close()
- except Exception:
- pass
- if os.path.exists(self.socket_file):
- os.remove(self.socket_file)
- def config_handler(self, new_config):
- """The configuration handler (run in a separate thread).
- Not tested, currently effectively empty.
- """
- config_logger.debug(TRACE_DETAIL, MSGQ_CONFIG_DATA, new_config)
- with self.__lock:
- if not self.running:
- return
- # TODO: Any config handlig goes here.
- return isc.config.create_answer(0)
- def command_handler(self, command, args):
- """The command handler (run in a separate thread).
- Not tested, currently effectively empty.
- """
- config_logger.debug(TRACE_DETAIL, MSGQ_COMMAND, command, args)
- with self.__lock:
- if not self.running:
- return
- # TODO: Any commands go here
- config_logger.error(MSGQ_COMMAND_UNKNOWN, command)
- return isc.config.create_answer(1, 'unknown command: ' + command)
- def signal_handler(msgq, signal, frame):
- if msgq:
- msgq.stop()
- if __name__ == "__main__":
- def check_port(option, opt_str, value, parser):
- """Function to insure that the port we are passed is actually
- a valid port number. Used by OptionParser() on startup."""
- intval = int(value)
- if (intval < 0) or (intval > 65535):
- raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
- parser.values.msgq_port = intval
- # Parse any command-line options.
- parser = OptionParser(version=VERSION)
- # TODO: Should we remove the option?
- parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
- help="display more about what is going on")
- parser.add_option("-s", "--socket-file", dest="msgq_socket_file",
- type="string", default=None,
- help="UNIX domain socket file the msgq daemon will use")
- (options, args) = parser.parse_args()
- # Announce startup.
- logger.debug(TRACE_START, MSGQ_START, VERSION)
- msgq = MsgQ(options.msgq_socket_file, options.verbose)
- signal.signal(signal.SIGTERM,
- lambda signal, frame: signal_handler(msgq, signal, frame))
- try:
- msgq.setup()
- except Exception as e:
- logger.fatal(MSGQ_START_FAIL, e)
- sys.exit(1)
- # We run the processing in a separate thread. This is because we want to
- # connect to the msgq ourself. But the cc library is unfortunately blocking
- # in many places and waiting for the processing part to answer, it would
- # deadlock.
- poller_thread = threading.Thread(target=msgq.run)
- poller_thread.daemon = True
- try:
- poller_thread.start()
- if msgq.wait_cfgmgr():
- # Once we get the config manager, we can read our own config.
- session = isc.config.ModuleCCSession(SPECFILE_LOCATION,
- msgq.config_handler,
- msgq.command_handler,
- None, True,
- msgq.socket_file)
- session.start()
- # And we create a thread that'll just wait for commands and
- # handle them. We don't terminate the thread, we set it to
- # daemon. Once the main thread terminates, it'll just die.
- def run_session():
- while True:
- session.check_command(False)
- background_thread = threading.Thread(target=run_session)
- background_thread.daemon = True
- background_thread.start()
- poller_thread.join()
- except KeyboardInterrupt:
- pass
- msgq.shutdown()
- logger.info(MSGQ_EXITING)
|