|
@@ -1,886 +0,0 @@
|
|
-#!@PYTHON@
|
|
|
|
-
|
|
|
|
-# Copyright (C) 2010 Internet Systems Consortium.
|
|
|
|
-#
|
|
|
|
-# Permission to use, copy, modify, and distribute this software for any
|
|
|
|
-# purpose with or without fee is hereby granted, provided that the above
|
|
|
|
-# copyright notice and this permission notice appear in all copies.
|
|
|
|
-#
|
|
|
|
-# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
|
|
|
|
-# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
|
|
|
|
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
|
|
|
|
-# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
|
|
|
|
-# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
|
|
|
|
-# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
|
|
|
|
-# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
|
|
|
|
-# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
|
|
-
|
|
|
|
-import sys; sys.path.append ('@@PYTHONPATH@@')
|
|
|
|
-
|
|
|
|
-"""This code implements the msgq daemon."""
|
|
|
|
-
|
|
|
|
-import subprocess
|
|
|
|
-import signal
|
|
|
|
-import os
|
|
|
|
-import socket
|
|
|
|
-import sys
|
|
|
|
-import struct
|
|
|
|
-import errno
|
|
|
|
-import time
|
|
|
|
-import select
|
|
|
|
-import random
|
|
|
|
-import threading
|
|
|
|
-import isc.config.ccsession
|
|
|
|
-from optparse import OptionParser, OptionValueError
|
|
|
|
-import isc.util.process
|
|
|
|
-import isc.util.traceback_handler
|
|
|
|
-from isc.cc.proto_defs import *
|
|
|
|
-import isc.log
|
|
|
|
-from isc.log_messages.msgq_messages import *
|
|
|
|
-
|
|
|
|
-import isc.cc
|
|
|
|
-
|
|
|
|
-isc.util.process.rename()
|
|
|
|
-
|
|
|
|
-isc.log.init("b10-msgq", buffer=True)
|
|
|
|
-# Logger that is used in the actual msgq handling - startup, shutdown and the
|
|
|
|
-# poller thread.
|
|
|
|
-logger = isc.log.Logger("msgq")
|
|
|
|
-# A separate copy for the master/config thread when the poller thread runs.
|
|
|
|
-# We use a separate instance, since the logger itself doesn't have to be
|
|
|
|
-# thread safe.
|
|
|
|
-config_logger = isc.log.Logger("msgq")
|
|
|
|
-TRACE_START = logger.DBGLVL_START_SHUT
|
|
|
|
-TRACE_BASIC = logger.DBGLVL_TRACE_BASIC
|
|
|
|
-TRACE_DETAIL = logger.DBGLVL_TRACE_DETAIL
|
|
|
|
-
|
|
|
|
-# This is the version that gets displayed to the user.
|
|
|
|
-# The VERSION string consists of the module name, the module version
|
|
|
|
-# number, and the overall BIND 10 version number (set in configure.ac).
|
|
|
|
-VERSION = "b10-msgq 20110127 (BIND 10 @PACKAGE_VERSION@)"
|
|
|
|
-
|
|
|
|
-# If B10_FROM_BUILD is set in the environment, we use data files
|
|
|
|
-# from a directory relative to that, otherwise we use the ones
|
|
|
|
-# installed on the system
|
|
|
|
-if "B10_FROM_SOURCE" in os.environ:
|
|
|
|
- SPECFILE_PATH = os.environ["B10_FROM_SOURCE"] + "/src/bin/msgq"
|
|
|
|
-else:
|
|
|
|
- PREFIX = "@prefix@"
|
|
|
|
- DATAROOTDIR = "@datarootdir@"
|
|
|
|
- SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}",
|
|
|
|
- DATAROOTDIR). \
|
|
|
|
- replace("${prefix}", PREFIX)
|
|
|
|
-SPECFILE_LOCATION = SPECFILE_PATH + "/msgq.spec"
|
|
|
|
-
|
|
|
|
-class MsgQReceiveError(Exception): pass
|
|
|
|
-
|
|
|
|
-class MsgQRunningError(Exception): pass
|
|
|
|
-
|
|
|
|
-class MsgQCloseOnReceive(Exception):
|
|
|
|
- """Exception raised when reading data from a socket results in 'shutdown'.
|
|
|
|
-
|
|
|
|
- This happens when msgq received 0-length data. This class holds whether
|
|
|
|
- it happens in the middle of reading (i.e. after reading some) via
|
|
|
|
- partial_read parameter, which is set to True if and only if so.
|
|
|
|
- This will be used by an upper layer catching the exception to distinguish
|
|
|
|
- the severity of the event.
|
|
|
|
-
|
|
|
|
- """
|
|
|
|
- def __init__(self, reason, partial_read):
|
|
|
|
- self.partial_read = partial_read
|
|
|
|
- self.__reason = reason
|
|
|
|
-
|
|
|
|
- def __str__(self):
|
|
|
|
- return self.__reason
|
|
|
|
-
|
|
|
|
-class SubscriptionManager:
|
|
|
|
- def __init__(self, cfgmgr_ready):
|
|
|
|
- """
|
|
|
|
- Initialize the subscription manager.
|
|
|
|
- parameters:
|
|
|
|
- * cfgmgr_ready: A callable object run once the config manager
|
|
|
|
- subscribes. This is a hackish solution, but we can't read
|
|
|
|
- the configuration sooner.
|
|
|
|
- """
|
|
|
|
- self.subscriptions = {}
|
|
|
|
- self.__cfgmgr_ready = cfgmgr_ready
|
|
|
|
- self.__cfgmgr_ready_called = False
|
|
|
|
-
|
|
|
|
- def subscribe(self, group, instance, socket):
|
|
|
|
- """Add a subscription."""
|
|
|
|
- target = ( group, instance )
|
|
|
|
- if target in self.subscriptions:
|
|
|
|
- logger.debug(TRACE_BASIC, MSGQ_SUBS_APPEND_TARGET, group, instance)
|
|
|
|
- if socket not in self.subscriptions[target]:
|
|
|
|
- self.subscriptions[target].append(socket)
|
|
|
|
- else:
|
|
|
|
- logger.debug(TRACE_BASIC, MSGQ_SUBS_NEW_TARGET, group, instance)
|
|
|
|
- self.subscriptions[target] = [ socket ]
|
|
|
|
- if group == "ConfigManager" and not self.__cfgmgr_ready_called:
|
|
|
|
- logger.debug(TRACE_BASIC, MSGQ_CFGMGR_SUBSCRIBED)
|
|
|
|
- self.__cfgmgr_ready_called = True
|
|
|
|
- self.__cfgmgr_ready()
|
|
|
|
-
|
|
|
|
- def unsubscribe(self, group, instance, socket):
|
|
|
|
- """Remove the socket from the one specific subscription."""
|
|
|
|
- target = ( group, instance )
|
|
|
|
- if target in self.subscriptions:
|
|
|
|
- if socket in self.subscriptions[target]:
|
|
|
|
- self.subscriptions[target].remove(socket)
|
|
|
|
- return True
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- def unsubscribe_all(self, socket):
|
|
|
|
- """Remove the socket from all subscriptions."""
|
|
|
|
- removed_from = []
|
|
|
|
- for subs, socklist in self.subscriptions.items():
|
|
|
|
- if socket in socklist:
|
|
|
|
- socklist.remove(socket)
|
|
|
|
- removed_from.append(subs)
|
|
|
|
- return removed_from
|
|
|
|
-
|
|
|
|
- def find_sub(self, group, instance):
|
|
|
|
- """Return an array of sockets which want this specific group,
|
|
|
|
- instance."""
|
|
|
|
- target = (group, instance)
|
|
|
|
- if target in self.subscriptions:
|
|
|
|
- return self.subscriptions[target]
|
|
|
|
- else:
|
|
|
|
- return []
|
|
|
|
-
|
|
|
|
- def find(self, group, instance):
|
|
|
|
- """Return an array of sockets who should get something sent to
|
|
|
|
- this group, instance pair. This includes wildcard subscriptions."""
|
|
|
|
- target = (group, instance)
|
|
|
|
- partone = self.find_sub(group, instance)
|
|
|
|
- parttwo = self.find_sub(group, CC_INSTANCE_WILDCARD)
|
|
|
|
- return list(set(partone + parttwo))
|
|
|
|
-
|
|
|
|
-class MsgQ:
|
|
|
|
- """Message Queue class."""
|
|
|
|
- # did we find a better way to do this?
|
|
|
|
- SOCKET_FILE = os.path.join("@localstatedir@",
|
|
|
|
- "@PACKAGE_NAME@",
|
|
|
|
- "msgq_socket").replace("${prefix}",
|
|
|
|
- "@prefix@")
|
|
|
|
-
|
|
|
|
- def __init__(self, socket_file=None, verbose=False):
|
|
|
|
- """Initialize the MsgQ master.
|
|
|
|
-
|
|
|
|
- The socket_file specifies the path to the UNIX domain socket
|
|
|
|
- that the msgq process listens on. If it is None, the
|
|
|
|
- environment variable BIND10_MSGQ_SOCKET_FILE is used. If that
|
|
|
|
- is not set, it will default to
|
|
|
|
- @localstatedir@/@PACKAGE_NAME@/msg_socket.
|
|
|
|
- If verbose is True, then the MsgQ reports
|
|
|
|
- what it is doing.
|
|
|
|
- """
|
|
|
|
-
|
|
|
|
- if socket_file is None:
|
|
|
|
- if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
|
|
|
|
- self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"]
|
|
|
|
- else:
|
|
|
|
- self.socket_file = self.SOCKET_FILE
|
|
|
|
- else:
|
|
|
|
- self.socket_file = socket_file
|
|
|
|
-
|
|
|
|
- self.verbose = verbose
|
|
|
|
- self.runnable = False
|
|
|
|
- self.listen_socket = False
|
|
|
|
- self.sockets = {}
|
|
|
|
- self.connection_counter = random.random()
|
|
|
|
- self.hostname = socket.gethostname()
|
|
|
|
- self.subs = SubscriptionManager(self.cfgmgr_ready)
|
|
|
|
- self.lnames = {}
|
|
|
|
- self.fd_to_lname = {}
|
|
|
|
- self.sendbuffs = {}
|
|
|
|
- self.running = False
|
|
|
|
- self.__cfgmgr_ready = None
|
|
|
|
- self.__cfgmgr_ready_cond = threading.Condition()
|
|
|
|
- # A lock used when the message queue does anything more complicated.
|
|
|
|
- # It is mostly a safety measure, the threads doing so should be mostly
|
|
|
|
- # independent, and the one with config session should be read only,
|
|
|
|
- # but with threads, one never knows. We use threads for concurrency,
|
|
|
|
- # not for performance, so we use wide lock scopes to be on the safe
|
|
|
|
- # side.
|
|
|
|
- self.__lock = threading.Lock()
|
|
|
|
- self._session = None
|
|
|
|
- self.__poller_sock = None
|
|
|
|
-
|
|
|
|
- def members_notify(self, event, params):
|
|
|
|
- """
|
|
|
|
- Thin wrapper around ccs's notify. Send a notification about change
|
|
|
|
- of some list that can be requested by the members command.
|
|
|
|
-
|
|
|
|
- The event is one of:
|
|
|
|
- - connected (client connected to MsgQ)
|
|
|
|
- - disconected (client disconnected from MsgQ)
|
|
|
|
- - subscribed (client subscribed to a group)
|
|
|
|
- - unsubscribed (client unsubscribed from a group)
|
|
|
|
-
|
|
|
|
- The params is dict containing:
|
|
|
|
- - client: The lname of the client in question.
|
|
|
|
- - group (for 'subscribed' and 'unsubscribed' events):
|
|
|
|
- The group the client subscribed or unsubscribed from.
|
|
|
|
-
|
|
|
|
- The notification occurs after the event, so client a subscribing for
|
|
|
|
- notifications will get a notification about its own subscription, but
|
|
|
|
- will not get a notification when it unsubscribes.
|
|
|
|
- """
|
|
|
|
- # Due to the interaction between threads (and fear it might influence
|
|
|
|
- # sending stuff), we test this method in msgq_run_test, instead of
|
|
|
|
- # mocking the ccs.
|
|
|
|
- if self._session: # Don't send before we have started up
|
|
|
|
- self._session.notify('cc_members', event, params)
|
|
|
|
-
|
|
|
|
- def cfgmgr_ready(self, ready=True):
|
|
|
|
- """Notify that the config manager is either subscribed, or
|
|
|
|
- that the msgq is shutting down and it won't connect, but
|
|
|
|
- anybody waiting for it should stop anyway.
|
|
|
|
-
|
|
|
|
- The ready parameter signifies if the config manager is subscribed.
|
|
|
|
-
|
|
|
|
- This method can be called multiple times, but second and any
|
|
|
|
- following call is simply ignored. This means the "abort" version
|
|
|
|
- of the call can be used on any stop unconditionally, even when
|
|
|
|
- the config manager already connected.
|
|
|
|
- """
|
|
|
|
- with self.__cfgmgr_ready_cond:
|
|
|
|
- if self.__cfgmgr_ready is not None:
|
|
|
|
- # This is a second call to this method. In that case it does
|
|
|
|
- # nothing.
|
|
|
|
- return
|
|
|
|
- self.__cfgmgr_ready = ready
|
|
|
|
- self.__cfgmgr_ready_cond.notify_all()
|
|
|
|
-
|
|
|
|
- def wait_cfgmgr(self):
|
|
|
|
- """Wait for msgq to subscribe.
|
|
|
|
-
|
|
|
|
- When this returns, the config manager is either subscribed, or
|
|
|
|
- msgq gave up waiting for it. Success is signified by the return
|
|
|
|
- value.
|
|
|
|
- """
|
|
|
|
- with self.__cfgmgr_ready_cond:
|
|
|
|
- # Wait until it either aborts or subscribes
|
|
|
|
- while self.__cfgmgr_ready is None:
|
|
|
|
- self.__cfgmgr_ready_cond.wait()
|
|
|
|
- return self.__cfgmgr_ready
|
|
|
|
-
|
|
|
|
- def setup_listener(self):
|
|
|
|
- """Set up the listener socket. Internal function."""
|
|
|
|
- logger.debug(TRACE_BASIC, MSGQ_LISTENER_SETUP, self.socket_file)
|
|
|
|
-
|
|
|
|
- if os.path.exists(self.socket_file):
|
|
|
|
- # Rather than just blindly removing the socket file, attempt to
|
|
|
|
- # connect to the existing socket to see if there is an existing
|
|
|
|
- # msgq running. Only if that fails do we remove the file and
|
|
|
|
- # attempt to create a new socket.
|
|
|
|
- existing_msgq = None
|
|
|
|
- try:
|
|
|
|
- existing_msgq = isc.cc.Session(self.socket_file)
|
|
|
|
- except isc.cc.session.SessionError:
|
|
|
|
- existing_msgq = None
|
|
|
|
-
|
|
|
|
- if existing_msgq:
|
|
|
|
- existing_msgq.close()
|
|
|
|
- logger.fatal(MSGQ_ALREADY_RUNNING)
|
|
|
|
- raise MsgQRunningError("b10-msgq already running")
|
|
|
|
-
|
|
|
|
- os.remove(self.socket_file)
|
|
|
|
-
|
|
|
|
- try:
|
|
|
|
- self.listen_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
|
|
- self.listen_socket.bind(self.socket_file)
|
|
|
|
- self.listen_socket.listen(1024)
|
|
|
|
- except Exception as e:
|
|
|
|
- # remove the file again if something goes wrong
|
|
|
|
- # (note this is a catch-all, but we reraise it)
|
|
|
|
- if os.path.exists(self.socket_file):
|
|
|
|
- os.remove(self.socket_file)
|
|
|
|
- self.listen_socket.close()
|
|
|
|
- logger.fatal(MSGQ_LISTENER_FAILED, self.socket_file, e)
|
|
|
|
- raise e
|
|
|
|
-
|
|
|
|
- def setup_signalsock(self):
|
|
|
|
- """Create a socket pair used to signal when we want to finish.
|
|
|
|
- Using a socket is easy and thread/signal safe way to signal
|
|
|
|
- the termination.
|
|
|
|
- """
|
|
|
|
- # The __poller_sock will be the end in the poller. When it is
|
|
|
|
- # closed, we should shut down.
|
|
|
|
- (self.__poller_sock, self.__control_sock) = socket.socketpair()
|
|
|
|
-
|
|
|
|
- def setup(self):
|
|
|
|
- """Configure listener socket, polling, etc.
|
|
|
|
- Raises a socket.error if the socket_file cannot be
|
|
|
|
- created.
|
|
|
|
- """
|
|
|
|
-
|
|
|
|
- self.setup_signalsock()
|
|
|
|
- self.setup_listener()
|
|
|
|
-
|
|
|
|
- logger.debug(TRACE_START, MSGQ_LISTENER_STARTED);
|
|
|
|
-
|
|
|
|
- self.runnable = True
|
|
|
|
-
|
|
|
|
- def process_accept(self):
|
|
|
|
- """Process an accept on the listening socket."""
|
|
|
|
- newsocket, ipaddr = self.listen_socket.accept()
|
|
|
|
- # TODO: When we have logging, we might want
|
|
|
|
- # to add a debug message here that a new connection
|
|
|
|
- # was made
|
|
|
|
- self.register_socket(newsocket)
|
|
|
|
-
|
|
|
|
- def register_socket(self, newsocket):
|
|
|
|
- """
|
|
|
|
- Internal function to insert a socket. Used by process_accept and some
|
|
|
|
- tests.
|
|
|
|
- """
|
|
|
|
- self.sockets[newsocket.fileno()] = newsocket
|
|
|
|
- lname = self.newlname()
|
|
|
|
- self.lnames[lname] = newsocket
|
|
|
|
- self.fd_to_lname[newsocket.fileno()] = lname
|
|
|
|
-
|
|
|
|
- logger.debug(TRACE_BASIC, MSGQ_SOCKET_REGISTERED, newsocket.fileno(),
|
|
|
|
- lname)
|
|
|
|
-
|
|
|
|
- self.members_notify('connected', {'client': lname})
|
|
|
|
-
|
|
|
|
- def kill_socket(self, fd, sock):
|
|
|
|
- """Fully close down the socket."""
|
|
|
|
-
|
|
|
|
- unsubscribed_from = self.subs.unsubscribe_all(sock)
|
|
|
|
- lname = self.fd_to_lname[fd]
|
|
|
|
- del self.fd_to_lname[fd]
|
|
|
|
- del self.lnames[lname]
|
|
|
|
- sock.close()
|
|
|
|
- del self.sockets[fd]
|
|
|
|
- if fd in self.sendbuffs:
|
|
|
|
- del self.sendbuffs[fd]
|
|
|
|
- logger.debug(TRACE_BASIC, MSGQ_SOCK_CLOSE, fd)
|
|
|
|
- # Filter out just the groups.
|
|
|
|
- unsubscribed_from_groups = set(map(lambda x: x[0], unsubscribed_from))
|
|
|
|
- for group in unsubscribed_from_groups:
|
|
|
|
- self.members_notify('unsubscribed', {
|
|
|
|
- 'client': lname,
|
|
|
|
- 'group': group
|
|
|
|
- })
|
|
|
|
- self.members_notify('disconnected', {'client': lname})
|
|
|
|
-
|
|
|
|
- def __getbytes(self, fd, sock, length, continued):
|
|
|
|
- """Get exactly the requested bytes, or raise an exception if
|
|
|
|
- EOF.
|
|
|
|
-
|
|
|
|
- continued is set to True if this method is called to complete
|
|
|
|
- already read data.
|
|
|
|
- """
|
|
|
|
- received = b''
|
|
|
|
- while len(received) < length:
|
|
|
|
- try:
|
|
|
|
- data = sock.recv(length - len(received))
|
|
|
|
-
|
|
|
|
- except socket.error as err:
|
|
|
|
- # This case includes ECONNRESET, which seems to happen when
|
|
|
|
- # the remote client has closed its socket at some subtle
|
|
|
|
- # timing (it should normally result in receiving empty data).
|
|
|
|
- # Since we didn't figure out how exactly that could happen,
|
|
|
|
- # we treat it just like other really-unexpected socket errors.
|
|
|
|
- raise MsgQReceiveError(str(err))
|
|
|
|
- if len(data) == 0:
|
|
|
|
- raise MsgQCloseOnReceive("EOF", continued)
|
|
|
|
- received += data
|
|
|
|
- continued = True
|
|
|
|
- return received
|
|
|
|
-
|
|
|
|
- def read_packet(self, fd, sock):
|
|
|
|
- """Read a correctly formatted packet. Will raise exceptions if
|
|
|
|
- something fails."""
|
|
|
|
- lengths = self.__getbytes(fd, sock, 6, False)
|
|
|
|
- overall_length, routing_length = struct.unpack(">IH", lengths)
|
|
|
|
- if overall_length < 2:
|
|
|
|
- raise MsgQReceiveError("overall_length < 2")
|
|
|
|
- overall_length -= 2
|
|
|
|
- if routing_length > overall_length:
|
|
|
|
- raise MsgQReceiveError("routing_length > overall_length")
|
|
|
|
- if routing_length == 0:
|
|
|
|
- raise MsgQReceiveError("routing_length == 0")
|
|
|
|
- data_length = overall_length - routing_length
|
|
|
|
- # probably need to sanity check lengths here...
|
|
|
|
- routing = self.__getbytes(fd, sock, routing_length, True)
|
|
|
|
- if data_length > 0:
|
|
|
|
- data = self.__getbytes(fd, sock, data_length, True)
|
|
|
|
- else:
|
|
|
|
- data = None
|
|
|
|
- return (routing, data)
|
|
|
|
-
|
|
|
|
- def process_packet(self, fd, sock):
|
|
|
|
- """Process one packet."""
|
|
|
|
- try:
|
|
|
|
- routing, data = self.read_packet(fd, sock)
|
|
|
|
- except (MsgQReceiveError, MsgQCloseOnReceive) as err:
|
|
|
|
- # If it's MsgQCloseOnReceive and that happens without reading
|
|
|
|
- # any data, it basically means the remote client has closed the
|
|
|
|
- # socket, so we log it as debug information. Otherwise, it's
|
|
|
|
- # a somewhat unexpected event, so we consider it an "error".
|
|
|
|
- if isinstance(err, MsgQCloseOnReceive) and not err.partial_read:
|
|
|
|
- logger.debug(TRACE_BASIC, MSGQ_CLOSE_ON_RECV, fd)
|
|
|
|
- else:
|
|
|
|
- logger.error(MSGQ_RECV_ERROR, fd, err)
|
|
|
|
- self.kill_socket(fd, sock)
|
|
|
|
- return
|
|
|
|
-
|
|
|
|
- try:
|
|
|
|
- routingmsg = isc.cc.message.from_wire(routing)
|
|
|
|
- except DecodeError as err:
|
|
|
|
- self.kill_socket(fd, sock)
|
|
|
|
- logger.error(MSGQ_HDR_DECODE_ERROR, fd, err)
|
|
|
|
- return
|
|
|
|
-
|
|
|
|
- self.process_command(fd, sock, routingmsg, data)
|
|
|
|
-
|
|
|
|
- def process_command(self, fd, sock, routing, data):
|
|
|
|
- """Process a single command. This will split out into one of the
|
|
|
|
- other functions."""
|
|
|
|
- logger.debug(TRACE_DETAIL, MSGQ_RECV_HDR, routing)
|
|
|
|
- cmd = routing[CC_HEADER_TYPE]
|
|
|
|
- if cmd == CC_COMMAND_SEND:
|
|
|
|
- self.process_command_send(sock, routing, data)
|
|
|
|
- elif cmd == CC_COMMAND_SUBSCRIBE:
|
|
|
|
- self.process_command_subscribe(sock, routing, data)
|
|
|
|
- elif cmd == CC_COMMAND_UNSUBSCRIBE:
|
|
|
|
- self.process_command_unsubscribe(sock, routing, data)
|
|
|
|
- elif cmd == CC_COMMAND_GET_LNAME:
|
|
|
|
- self.process_command_getlname(sock, routing, data)
|
|
|
|
- elif cmd == CC_COMMAND_PING:
|
|
|
|
- # Command for testing purposes
|
|
|
|
- self.process_command_ping(sock, routing, data)
|
|
|
|
- elif cmd == CC_COMMAND_STOP:
|
|
|
|
- self.stop()
|
|
|
|
- else:
|
|
|
|
- logger.error(MSGQ_INVALID_CMD, cmd)
|
|
|
|
-
|
|
|
|
- def preparemsg(self, env, msg = None):
|
|
|
|
- if type(env) == dict:
|
|
|
|
- env = isc.cc.message.to_wire(env)
|
|
|
|
- if type(msg) == dict:
|
|
|
|
- msg = isc.cc.message.to_wire(msg)
|
|
|
|
- length = 2 + len(env);
|
|
|
|
- if msg:
|
|
|
|
- length += len(msg)
|
|
|
|
- ret = struct.pack("!IH", length, len(env))
|
|
|
|
- ret += env
|
|
|
|
- if msg:
|
|
|
|
- ret += msg
|
|
|
|
- return ret
|
|
|
|
-
|
|
|
|
- def sendmsg(self, sock, env, msg = None):
|
|
|
|
- self.send_prepared_msg(sock, self.preparemsg(env, msg))
|
|
|
|
-
|
|
|
|
- def _send_data(self, sock, data):
|
|
|
|
- """
|
|
|
|
- Send a piece of data to the given socket. This method is
|
|
|
|
- essentially "private" to MsgQ, but defined as if it were "protected"
|
|
|
|
- for easier access from tests.
|
|
|
|
-
|
|
|
|
- Parameters:
|
|
|
|
- sock: The socket to send to
|
|
|
|
- data: The list of bytes to send
|
|
|
|
- Returns:
|
|
|
|
- An integer or None. If an integer (which can be 0), it signals
|
|
|
|
- the number of bytes sent. If None, the socket appears to have
|
|
|
|
- been closed on the other end, and it has been killed on this
|
|
|
|
- side too.
|
|
|
|
- """
|
|
|
|
- try:
|
|
|
|
- # We set the socket nonblocking, MSG_DONTWAIT doesn't exist
|
|
|
|
- # on some OSes
|
|
|
|
- sock.setblocking(0)
|
|
|
|
- return sock.send(data)
|
|
|
|
- except socket.error as e:
|
|
|
|
- if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR ]:
|
|
|
|
- return 0
|
|
|
|
- elif e.errno in [ errno.EPIPE, errno.ECONNRESET, errno.ENOBUFS ]:
|
|
|
|
- # EPIPE happens if the remote module has terminated by the time
|
|
|
|
- # of this send; its severity can vary, but in many cases it
|
|
|
|
- # shouldn't be critical, so we log it separately as a warning.
|
|
|
|
- if e.errno == errno.EPIPE:
|
|
|
|
- logger.warn(MSGQ_CLOSE_ON_SEND, sock.fileno())
|
|
|
|
- else:
|
|
|
|
- logger.error(MSGQ_SEND_ERROR, sock.fileno(),
|
|
|
|
- errno.errorcode[e.errno])
|
|
|
|
- self.kill_socket(sock.fileno(), sock)
|
|
|
|
- return None
|
|
|
|
- else:
|
|
|
|
- raise e
|
|
|
|
- finally:
|
|
|
|
- # And set it back again
|
|
|
|
- sock.setblocking(1)
|
|
|
|
-
|
|
|
|
- def send_prepared_msg(self, sock, msg):
|
|
|
|
- '''
|
|
|
|
- Add a message to the queue. If there's nothing waiting, try
|
|
|
|
- to send it right away.
|
|
|
|
-
|
|
|
|
- Return if the socket is still alive. It can return false if the
|
|
|
|
- socket dies (for example due to EPIPE in the attempt to send).
|
|
|
|
- Returning true does not guarantee the message will be delivered,
|
|
|
|
- but returning false means it won't.
|
|
|
|
- '''
|
|
|
|
- # Try to send the data, but only if there's nothing waiting
|
|
|
|
- fileno = sock.fileno()
|
|
|
|
- if fileno in self.sendbuffs:
|
|
|
|
- amount_sent = 0
|
|
|
|
- else:
|
|
|
|
- amount_sent = self._send_data(sock, msg)
|
|
|
|
- if amount_sent is None:
|
|
|
|
- # Socket has been killed, drop the send
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- # Still something to send, add it to outgoing queue
|
|
|
|
- if amount_sent < len(msg):
|
|
|
|
- now = time.clock()
|
|
|
|
- # Append it to buffer (but check the data go away)
|
|
|
|
- if fileno in self.sendbuffs:
|
|
|
|
- (last_sent, buff) = self.sendbuffs[fileno]
|
|
|
|
- tdelta = now - last_sent
|
|
|
|
- if tdelta > 0.1:
|
|
|
|
- logger.error(MSGQ_SOCKET_TIMEOUT_ERROR, fileno, tdelta)
|
|
|
|
- self.kill_socket(fileno, sock)
|
|
|
|
- return False
|
|
|
|
- buff += msg
|
|
|
|
- else:
|
|
|
|
- buff = msg[amount_sent:]
|
|
|
|
- last_sent = now
|
|
|
|
- self.sendbuffs[fileno] = (last_sent, buff)
|
|
|
|
- return True
|
|
|
|
-
|
|
|
|
- def _process_write(self, fileno):
|
|
|
|
- # Try to send some data from the buffer
|
|
|
|
- (_, msg) = self.sendbuffs[fileno]
|
|
|
|
- sock = self.sockets[fileno]
|
|
|
|
- amount_sent = self._send_data(sock, msg)
|
|
|
|
- if amount_sent is not None:
|
|
|
|
- # Keep the rest
|
|
|
|
- msg = msg[amount_sent:]
|
|
|
|
- if len(msg) == 0:
|
|
|
|
- # If there's no more, stop requesting for write availability
|
|
|
|
- del self.sendbuffs[fileno]
|
|
|
|
- else:
|
|
|
|
- self.sendbuffs[fileno] = (time.clock(), msg)
|
|
|
|
-
|
|
|
|
- def newlname(self):
|
|
|
|
- """Generate a unique connection identifier for this socket.
|
|
|
|
- This is done by using an increasing counter and the current
|
|
|
|
- time."""
|
|
|
|
- self.connection_counter += 1
|
|
|
|
- return "%x_%x@%s" % (time.time(), self.connection_counter,
|
|
|
|
- self.hostname)
|
|
|
|
-
|
|
|
|
- def process_command_ping(self, sock, routing, data):
|
|
|
|
- self.sendmsg(sock, { CC_HEADER_TYPE : CC_COMMAND_PONG }, data)
|
|
|
|
-
|
|
|
|
- def process_command_getlname(self, sock, routing, data):
|
|
|
|
- lname = [ k for k, v in self.lnames.items() if v == sock ][0]
|
|
|
|
- self.sendmsg(sock, { CC_HEADER_TYPE : CC_COMMAND_GET_LNAME },
|
|
|
|
- { CC_PAYLOAD_LNAME : lname })
|
|
|
|
-
|
|
|
|
- def process_command_send(self, sock, routing, data):
|
|
|
|
- group = routing[CC_HEADER_GROUP]
|
|
|
|
- instance = routing[CC_HEADER_INSTANCE]
|
|
|
|
- to = routing[CC_HEADER_TO]
|
|
|
|
- if group == None or instance == None:
|
|
|
|
- # FIXME: Should we log them instead?
|
|
|
|
- return # ignore invalid packets entirely
|
|
|
|
-
|
|
|
|
- if to == CC_TO_WILDCARD:
|
|
|
|
- sockets = self.subs.find(group, instance)
|
|
|
|
- else:
|
|
|
|
- if to in self.lnames:
|
|
|
|
- sockets = [ self.lnames[to] ]
|
|
|
|
- else:
|
|
|
|
- sockets = []
|
|
|
|
-
|
|
|
|
- msg = self.preparemsg(routing, data)
|
|
|
|
-
|
|
|
|
- if sock in sockets:
|
|
|
|
- # Don't bounce to self
|
|
|
|
- sockets.remove(sock)
|
|
|
|
-
|
|
|
|
- has_recipient = False
|
|
|
|
- for socket in sockets:
|
|
|
|
- if self.send_prepared_msg(socket, msg):
|
|
|
|
- has_recipient = True
|
|
|
|
- if not has_recipient and routing.get(CC_HEADER_WANT_ANSWER) and \
|
|
|
|
- CC_HEADER_REPLY not in routing:
|
|
|
|
- # We have no recipients. But the sender insists on a reply
|
|
|
|
- # (and the message isn't a reply itself). We need to send
|
|
|
|
- # an error to satisfy the request, since there's nobody
|
|
|
|
- # else who can.
|
|
|
|
- #
|
|
|
|
- # We omit the replies on purpose. The recipient might generate
|
|
|
|
- # the response by copying and mangling the header of incoming
|
|
|
|
- # message (just like we do below) and would include the want_answer
|
|
|
|
- # by accident. And we want to avoid loops of errors. Also, it
|
|
|
|
- # is unclear if the knowledge of undeliverable reply would be
|
|
|
|
- # of any use to the sender, and it should be much rarer situation.
|
|
|
|
-
|
|
|
|
- # The real errors would be positive, 1 most probably. We use
|
|
|
|
- # negative errors for delivery errors to distinguish them a
|
|
|
|
- # little. We probably should have a way to provide more data
|
|
|
|
- # in the error message.
|
|
|
|
- payload = isc.config.ccsession.create_answer(CC_REPLY_NO_RECPT,
|
|
|
|
- "No such recipient")
|
|
|
|
- # We create the header based on the current one. But we don't
|
|
|
|
- # want to mangle it for the caller, so we get a copy. A shallow
|
|
|
|
- # one should be enough, we modify the dict only.
|
|
|
|
- header = routing.copy()
|
|
|
|
- header[CC_HEADER_REPLY] = routing[CC_HEADER_SEQ]
|
|
|
|
- # Dummy lname not assigned to clients
|
|
|
|
- header[CC_HEADER_FROM] = "msgq"
|
|
|
|
- header[CC_HEADER_TO] = routing[CC_HEADER_FROM]
|
|
|
|
- # We keep the seq as it is. We don't need to track the message
|
|
|
|
- # and we will not confuse the sender. The sender would use an
|
|
|
|
- # unique id for each message, so we won't return one twice to it.
|
|
|
|
- errmsg = self.preparemsg(header, payload)
|
|
|
|
- # Send it back.
|
|
|
|
- self.send_prepared_msg(sock, errmsg)
|
|
|
|
-
|
|
|
|
- def process_command_subscribe(self, sock, routing, data):
|
|
|
|
- group = routing[CC_HEADER_GROUP]
|
|
|
|
- instance = routing[CC_HEADER_INSTANCE]
|
|
|
|
- if group == None or instance == None:
|
|
|
|
- return # ignore invalid packets entirely
|
|
|
|
- self.subs.subscribe(group, instance, sock)
|
|
|
|
- lname = self.fd_to_lname[sock.fileno()]
|
|
|
|
- self.members_notify('subscribed',
|
|
|
|
- {
|
|
|
|
- 'client': lname,
|
|
|
|
- 'group': group
|
|
|
|
- })
|
|
|
|
-
|
|
|
|
- def process_command_unsubscribe(self, sock, routing, data):
|
|
|
|
- group = routing[CC_HEADER_GROUP]
|
|
|
|
- instance = routing[CC_HEADER_INSTANCE]
|
|
|
|
- if group == None or instance == None:
|
|
|
|
- return # ignore invalid packets entirely
|
|
|
|
- if self.subs.unsubscribe(group, instance, sock):
|
|
|
|
- lname = self.fd_to_lname[sock.fileno()]
|
|
|
|
- self.members_notify('unsubscribed',
|
|
|
|
- {
|
|
|
|
- 'client': lname,
|
|
|
|
- 'group': group
|
|
|
|
- })
|
|
|
|
-
|
|
|
|
- def run(self):
|
|
|
|
- """Process messages. Forever. Mostly."""
|
|
|
|
- self.running = True
|
|
|
|
-
|
|
|
|
- self.run_select()
|
|
|
|
-
|
|
|
|
- def run_select(self):
|
|
|
|
- while self.running:
|
|
|
|
- reads = list(self.fd_to_lname.keys())
|
|
|
|
- if self.listen_socket.fileno() != -1: # Skip in tests
|
|
|
|
- reads.append(self.listen_socket.fileno())
|
|
|
|
- if self.__poller_sock and self.__poller_sock.fileno() != -1:
|
|
|
|
- reads.append(self.__poller_sock.fileno())
|
|
|
|
- writes = list(self.sendbuffs.keys())
|
|
|
|
- (read_ready, write_ready) = ([], [])
|
|
|
|
- try:
|
|
|
|
- (read_ready, write_ready, _) = select.select(reads, writes,
|
|
|
|
- []);
|
|
|
|
- except select.error as err:
|
|
|
|
- if err.args[0] == errno.EINTR:
|
|
|
|
- continue # Just try it again if interrupted.
|
|
|
|
- else:
|
|
|
|
- logger.fatal(MSGQ_SELECT_ERROR, err)
|
|
|
|
- break
|
|
|
|
- with self.__lock:
|
|
|
|
- write_ready = set(write_ready)
|
|
|
|
- for fd in read_ready:
|
|
|
|
- # Do only one operation per loop iteration on the given fd.
|
|
|
|
- # It could be possible to perform both, but it may have
|
|
|
|
- # undesired side effects in special situations (like, if the
|
|
|
|
- # read closes the socket).
|
|
|
|
- if fd in write_ready:
|
|
|
|
- write_ready.remove(fd)
|
|
|
|
- if fd == self.listen_socket.fileno():
|
|
|
|
- self.process_accept()
|
|
|
|
- elif self.__poller_sock and fd == \
|
|
|
|
- self.__poller_sock.fileno():
|
|
|
|
- # The signal socket. We should terminate now.
|
|
|
|
- self.running = False
|
|
|
|
- break
|
|
|
|
- else:
|
|
|
|
- self.process_packet(fd, self.sockets[fd])
|
|
|
|
- for fd in write_ready:
|
|
|
|
- self._process_write(fd)
|
|
|
|
-
|
|
|
|
- def stop(self):
|
|
|
|
- # Signal it should terminate.
|
|
|
|
- self.__control_sock.close()
|
|
|
|
- self.__control_sock = None
|
|
|
|
- # Abort anything waiting on the condition, just to make sure it's not
|
|
|
|
- # blocked forever
|
|
|
|
- self.cfgmgr_ready(False)
|
|
|
|
-
|
|
|
|
- def cleanup_signalsock(self):
|
|
|
|
- """Close the signal sockets. We could do it directly in shutdown,
|
|
|
|
- but this part is reused in tests.
|
|
|
|
- """
|
|
|
|
- if self.__poller_sock:
|
|
|
|
- self.__poller_sock.close()
|
|
|
|
- self.__poller_sock = None
|
|
|
|
- if self.__control_sock:
|
|
|
|
- self.__control_sock.close()
|
|
|
|
- self.__control_sock = None
|
|
|
|
-
|
|
|
|
- def shutdown(self):
|
|
|
|
- """Stop the MsgQ master."""
|
|
|
|
- logger.debug(TRACE_START, MSGQ_SHUTDOWN)
|
|
|
|
- self.listen_socket.close()
|
|
|
|
- self.cleanup_signalsock()
|
|
|
|
- # Close all the sockets too. In real life, there should be none now,
|
|
|
|
- # as Msgq should be the last one. But some tests don't adhere to this
|
|
|
|
- # and create a new Msgq for each test, which led to huge socket leaks.
|
|
|
|
- # Some other threads put some other things in instead of sockets, so
|
|
|
|
- # we catch whatever exceptions there we can. This should be safe,
|
|
|
|
- # because in real operation, we will terminate now anyway, implicitly
|
|
|
|
- # closing anything anyway.
|
|
|
|
- for sock in self.sockets.values():
|
|
|
|
- try:
|
|
|
|
- sock.close()
|
|
|
|
- except Exception:
|
|
|
|
- pass
|
|
|
|
- if os.path.exists(self.socket_file):
|
|
|
|
- os.remove(self.socket_file)
|
|
|
|
-
|
|
|
|
- def config_handler(self, new_config):
|
|
|
|
- """The configuration handler (run in a separate thread).
|
|
|
|
- Not tested, currently effectively empty.
|
|
|
|
- """
|
|
|
|
- config_logger.debug(TRACE_DETAIL, MSGQ_CONFIG_DATA, new_config)
|
|
|
|
-
|
|
|
|
- with self.__lock:
|
|
|
|
- if not self.running:
|
|
|
|
- return
|
|
|
|
-
|
|
|
|
- # TODO: Any config handling goes here.
|
|
|
|
-
|
|
|
|
- return isc.config.create_answer(0)
|
|
|
|
-
|
|
|
|
- def command_handler(self, command, args):
|
|
|
|
- """The command handler (run in a separate thread)."""
|
|
|
|
- config_logger.debug(TRACE_DETAIL, MSGQ_COMMAND, command, args)
|
|
|
|
-
|
|
|
|
- with self.__lock:
|
|
|
|
- if not self.running:
|
|
|
|
- return
|
|
|
|
-
|
|
|
|
- # TODO: Who does validation? The ModuleCCSession or must we?
|
|
|
|
-
|
|
|
|
- if command == 'members':
|
|
|
|
- # List all members of MsgQ or of a group.
|
|
|
|
- if args is None:
|
|
|
|
- args = {}
|
|
|
|
- group = args.get('group')
|
|
|
|
- if group:
|
|
|
|
- return isc.config.create_answer(0,
|
|
|
|
- list(map(lambda sock: self.fd_to_lname[sock.fileno()],
|
|
|
|
- self.subs.find(group, ''))))
|
|
|
|
- else:
|
|
|
|
- return isc.config.create_answer(0,
|
|
|
|
- list(self.lnames.keys()))
|
|
|
|
-
|
|
|
|
- config_logger.error(MSGQ_COMMAND_UNKNOWN, command)
|
|
|
|
- return isc.config.create_answer(1, 'unknown command: ' + command)
|
|
|
|
-
|
|
|
|
-def signal_handler(msgq, signal, frame):
|
|
|
|
- if msgq:
|
|
|
|
- msgq.stop()
|
|
|
|
-
|
|
|
|
-def main():
|
|
|
|
- def check_port(option, opt_str, value, parser):
|
|
|
|
- """Function to insure that the port we are passed is actually
|
|
|
|
- a valid port number. Used by OptionParser() on startup."""
|
|
|
|
- intval = int(value)
|
|
|
|
- if (intval < 0) or (intval > 65535):
|
|
|
|
- raise OptionValueError("%s requires a port number (0-65535)" %
|
|
|
|
- opt_str)
|
|
|
|
- parser.values.msgq_port = intval
|
|
|
|
-
|
|
|
|
- # Parse any command-line options.
|
|
|
|
- parser = OptionParser(version=VERSION)
|
|
|
|
- # TODO: Should we remove the option?
|
|
|
|
- parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
|
|
|
|
- help="display more about what is going on")
|
|
|
|
- parser.add_option("-s", "--socket-file", dest="msgq_socket_file",
|
|
|
|
- type="string", default=None,
|
|
|
|
- help="UNIX domain socket file the msgq daemon will use")
|
|
|
|
- (options, args) = parser.parse_args()
|
|
|
|
-
|
|
|
|
- # Announce startup.
|
|
|
|
- logger.debug(TRACE_START, MSGQ_START, VERSION)
|
|
|
|
-
|
|
|
|
- msgq = MsgQ(options.msgq_socket_file, options.verbose)
|
|
|
|
-
|
|
|
|
- signal.signal(signal.SIGTERM,
|
|
|
|
- lambda signal, frame: signal_handler(msgq, signal, frame))
|
|
|
|
-
|
|
|
|
- # Ignore SIGPIPE. We handle errors writing to children using try in the
|
|
|
|
- # appropriate places and the delivery of a signal is very heavy handed.
|
|
|
|
- # Windows doesn't support SIGPIPE so don't try it there.
|
|
|
|
- if not sys.platform.startswith('win'):
|
|
|
|
- signal.signal(signal.SIGPIPE, signal.SIG_IGN)
|
|
|
|
-
|
|
|
|
- try:
|
|
|
|
- msgq.setup()
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.fatal(MSGQ_START_FAIL, e)
|
|
|
|
- sys.exit(1)
|
|
|
|
-
|
|
|
|
- # We run the processing in a separate thread. This is because we want to
|
|
|
|
- # connect to the msgq ourself. But the cc library is unfortunately blocking
|
|
|
|
- # in many places and waiting for the processing part to answer, it would
|
|
|
|
- # deadlock.
|
|
|
|
- poller_thread = threading.Thread(target=msgq.run)
|
|
|
|
- poller_thread.daemon = True
|
|
|
|
- try:
|
|
|
|
- poller_thread.start()
|
|
|
|
- if msgq.wait_cfgmgr():
|
|
|
|
- # Once we get the config manager, we can read our own config.
|
|
|
|
- session = isc.config.ModuleCCSession(SPECFILE_LOCATION,
|
|
|
|
- msgq.config_handler,
|
|
|
|
- msgq.command_handler,
|
|
|
|
- None, True,
|
|
|
|
- msgq.socket_file)
|
|
|
|
- msgq._session = session
|
|
|
|
- session.start()
|
|
|
|
- # And we create a thread that'll just wait for commands and
|
|
|
|
- # handle them. We don't terminate the thread, we set it to
|
|
|
|
- # daemon. Once the main thread terminates, it'll just die.
|
|
|
|
- def run_session():
|
|
|
|
- while True:
|
|
|
|
- # As the check_command has internal mutex that is shared
|
|
|
|
- # with sending part (which includes notify). So we don't
|
|
|
|
- # want to hold it long-term and block using select.
|
|
|
|
- fileno = session.get_socket().fileno()
|
|
|
|
- try:
|
|
|
|
- (reads, _, _) = select.select([fileno], [], [])
|
|
|
|
- except select.error as se:
|
|
|
|
- if se.args[0] != errno.EINTR:
|
|
|
|
- raise
|
|
|
|
- session.check_command(True)
|
|
|
|
- background_thread = threading.Thread(target=run_session)
|
|
|
|
- background_thread.daemon = True
|
|
|
|
- background_thread.start()
|
|
|
|
- poller_thread.join()
|
|
|
|
- except KeyboardInterrupt:
|
|
|
|
- pass
|
|
|
|
-
|
|
|
|
- msgq.shutdown()
|
|
|
|
-
|
|
|
|
- logger.info(MSGQ_EXITING)
|
|
|
|
-
|
|
|
|
-if __name__ == "__main__":
|
|
|
|
- isc.util.traceback_handler.traceback_handler(main)
|
|
|