Browse Source

Merge #2582

Let the message queue connect to itself and handle config updates and commands.
No commands or configuration specific to Msgq does not exist yet, but it
handles the generic logging config.
Michal 'vorner' Vaner 12 years ago
parent
commit
ced31d8c5a

+ 4 - 1
src/bin/msgq/Makefile.am

@@ -4,13 +4,16 @@ pkglibexecdir = $(libexecdir)/@PACKAGE@
 
 pkglibexec_SCRIPTS = b10-msgq
 
+b10_msgqdir = $(pkgdatadir)
+b10_msgq_DATA = msgq.spec
+
 CLEANFILES = b10-msgq msgq.pyc
 CLEANFILES += $(PYTHON_LOGMSGPKG_DIR)/work/msgq_messages.py
 CLEANFILES += $(PYTHON_LOGMSGPKG_DIR)/work/msgq_messages.pyc
 
 man_MANS = b10-msgq.8
 DISTCLEANFILES = $(man_MANS)
-EXTRA_DIST = $(man_MANS) msgq.xml msgq_messages.mes
+EXTRA_DIST = $(man_MANS) msgq.xml msgq_messages.mes msgq.spec
 
 nodist_pylogmessage_PYTHON = $(PYTHON_LOGMSGPKG_DIR)/work/msgq_messages.py
 pylogmessagedir = $(pyexecdir)/isc/log_messages/

+ 203 - 44
src/bin/msgq/msgq.py.in

@@ -29,6 +29,8 @@ import errno
 import time
 import select
 import random
+import threading
+import isc.config.ccsession
 from optparse import OptionParser, OptionValueError
 import isc.util.process
 import isc.log
@@ -37,7 +39,15 @@ from isc.log_messages.msgq_messages import *
 import isc.cc
 
 isc.util.process.rename()
+
+isc.log.init("b10-msgq", buffer=True)
+# Logger that is used in the actual msgq handling - startup, shutdown and the
+# poller thread.
 logger = isc.log.Logger("msgq")
+# A separate copy for the master/config thread when the poller thread runs.
+# We use a separate instance, since the logger itself doesn't have to be
+# thread safe.
+config_logger = isc.log.Logger("msgq")
 TRACE_START = logger.DBGLVL_START_SHUT
 TRACE_BASIC = logger.DBGLVL_TRACE_BASIC
 TRACE_DETAIL = logger.DBGLVL_TRACE_DETAIL
@@ -47,11 +57,31 @@ TRACE_DETAIL = logger.DBGLVL_TRACE_DETAIL
 # number, and the overall BIND 10 version number (set in configure.ac).
 VERSION = "b10-msgq 20110127 (BIND 10 @PACKAGE_VERSION@)"
 
+# If B10_FROM_BUILD is set in the environment, we use data files
+# from a directory relative to that, otherwise we use the ones
+# installed on the system
+if "B10_FROM_BUILD" in os.environ:
+    SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/msgq"
+else:
+    PREFIX = "@prefix@"
+    DATAROOTDIR = "@datarootdir@"
+    SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
+SPECFILE_LOCATION = SPECFILE_PATH + "/msgq.spec"
+
 class MsgQReceiveError(Exception): pass
 
 class SubscriptionManager:
-    def __init__(self):
+    def __init__(self, cfgmgr_ready):
+        """
+        Initialize the subscription manager.
+        parameters:
+        * cfgmgr_ready: A callable object run once the config manager
+            subscribes. This is a hackish solution, but we can't read
+            the configuration sooner.
+        """
         self.subscriptions = {}
+        self.__cfgmgr_ready = cfgmgr_ready
+        self.__cfgmgr_ready_called = False
 
     def subscribe(self, group, instance, socket):
         """Add a subscription."""
@@ -63,6 +93,10 @@ class SubscriptionManager:
         else:
             logger.debug(TRACE_BASIC, MSGQ_SUBS_NEW_TARGET, group, instance)
             self.subscriptions[target] = [ socket ]
+        if group == "ConfigManager" and not self.__cfgmgr_ready_called:
+            logger.debug(TRACE_BASIC, MSGQ_CFGMGR_SUBSCRIBED)
+            self.__cfgmgr_ready_called = True
+            self.__cfgmgr_ready()
 
     def unsubscribe(self, group, instance, socket):
         """Remove the socket from the one specific subscription."""
@@ -130,10 +164,52 @@ class MsgQ:
         self.sockets = {}
         self.connection_counter = random.random()
         self.hostname = socket.gethostname()
-        self.subs = SubscriptionManager()
+        self.subs = SubscriptionManager(self.cfgmgr_ready)
         self.lnames = {}
         self.sendbuffs = {}
         self.running = False
+        self.__cfgmgr_ready = None
+        self.__cfgmgr_ready_cond = threading.Condition()
+        # A lock used when the message queue does anything more complicated.
+        # It is mostly a safety measure, the threads doing so should be mostly
+        # independent, and the one with config session should be read only,
+        # but with threads, one never knows. We use threads for concurrency,
+        # not for performance, so we use wide lock scopes to be on the safe
+        # side.
+        self.__lock = threading.Lock()
+
+    def cfgmgr_ready(self, ready=True):
+        """Notify that the config manager is either subscribed, or
+           that the msgq is shutting down and it won't connect, but
+           anybody waiting for it should stop anyway.
+
+           The ready parameter signifies if the config manager is subscribed.
+
+           This method can be called multiple times, but second and any
+           following call is simply ignored. This means the "abort" version
+           of the call can be used on any stop unconditionally, even when
+           the config manager already connected.
+        """
+        with self.__cfgmgr_ready_cond:
+            if self.__cfgmgr_ready is not None:
+                # This is a second call to this method. In that case it does
+                # nothing.
+                return
+            self.__cfgmgr_ready = ready
+            self.__cfgmgr_ready_cond.notify_all()
+
+    def wait_cfgmgr(self):
+        """Wait for msgq to subscribe.
+
+           When this returns, the config manager is either subscribed, or
+           msgq gave up waiting for it. Success is signified by the return
+           value.
+        """
+        with self.__cfgmgr_ready_cond:
+            # Wait until it either aborts or subscribes
+            while self.__cfgmgr_ready is None:
+                self.__cfgmgr_ready_cond.wait()
+            return self.__cfgmgr_ready
 
     def setup_poller(self):
         """Set up the poll thing.  Internal function."""
@@ -143,7 +219,7 @@ class MsgQ:
             self.poller = select.poll()
 
     def add_kqueue_socket(self, socket, write_filter=False):
-        """Add a kquque filter for a socket.  By default the read
+        """Add a kqueue filter for a socket.  By default the read
         filter is used; if write_filter is set to True, the write
         filter is used.  We use a boolean value instead of a specific
         filter constant, because kqueue filter values do not seem to
@@ -191,6 +267,20 @@ class MsgQ:
         else:
             self.add_kqueue_socket(self.listen_socket)
 
+    def setup_signalsock(self):
+        """Create a socket pair used to signal when we want to finish.
+           Using a socket is easy and thread/signal safe way to signal
+           the termination.
+        """
+        # The __poller_sock will be the end in the poller. When it is
+        # closed, we should shut down.
+        (self.__poller_sock, self.__control_sock) = socket.socketpair()
+
+        if self.poller:
+            self.poller.register(self.__poller_sock, select.POLLIN)
+        else:
+            self.add_kqueue_socket(self.__poller_sock)
+
     def setup(self):
         """Configure listener socket, polling, etc.
            Raises a socket.error if the socket_file cannot be
@@ -198,6 +288,7 @@ class MsgQ:
         """
 
         self.setup_poller()
+        self.setup_signalsock()
         self.setup_listener()
 
         logger.debug(TRACE_START, MSGQ_LISTENER_STARTED);
@@ -493,16 +584,21 @@ class MsgQ:
                 else:
                     logger.fatal(MSGQ_POLL_ERR, err)
                     break
-            for (fd, event) in events:
-                if fd == self.listen_socket.fileno():
-                    self.process_accept()
-                else:
-                    if event & select.POLLOUT:
-                        self.__process_write(fd)
-                    elif event & select.POLLIN:
-                        self.process_socket(fd)
+            with self.__lock:
+                for (fd, event) in events:
+                    if fd == self.listen_socket.fileno():
+                        self.process_accept()
+                    elif fd == self.__poller_sock.fileno():
+                        # If it's the signal socket, we should terminate now.
+                        self.running = False
+                        break
                     else:
-                        logger.error(MSGQ_POLL_UNKNOWN_EVENT, fd, event)
+                        if event & select.POLLOUT:
+                            self.__process_write(fd)
+                        elif event & select.POLLIN:
+                            self.process_socket(fd)
+                        else:
+                            logger.error(MSGQ_POLL_UNKNOWN_EVENT, fd, event)
 
     def run_kqueue(self):
         while self.running:
@@ -512,38 +608,83 @@ class MsgQ:
             if not events:
                 raise RuntimeError('serve: kqueue returned no events')
 
-            for event in events:
-                if event.ident == self.listen_socket.fileno():
-                    self.process_accept()
-                else:
-                    if event.filter == select.KQ_FILTER_WRITE:
-                        self.__process_write(event.ident)
-                    if event.filter == select.KQ_FILTER_READ and \
-                            event.data > 0:
-                        self.process_socket(event.ident)
-                    elif event.flags & select.KQ_EV_EOF:
-                        self.kill_socket(event.ident,
-                                         self.sockets[event.ident])
+            with self.__lock:
+                for event in events:
+                    if event.ident == self.listen_socket.fileno():
+                        self.process_accept()
+                    elif event.ident == self.__poller_sock.fileno():
+                        # If it's the signal socket, we should terminate now.
+                        self.running = False
+                        break;
+                    else:
+                        if event.filter == select.KQ_FILTER_WRITE:
+                            self.__process_write(event.ident)
+                        if event.filter == select.KQ_FILTER_READ and \
+                                event.data > 0:
+                            self.process_socket(event.ident)
+                        elif event.flags & select.KQ_EV_EOF:
+                            self.kill_socket(event.ident,
+                                             self.sockets[event.ident])
 
     def stop(self):
-        self.running = False
+        # Signal it should terminate.
+        self.__control_sock.close()
+        self.__control_sock = None
+        # Abort anything waiting on the condition, just to make sure it's not
+        # blocked forever
+        self.cfgmgr_ready(False)
+
+    def cleanup_signalsock(self):
+        """Close the signal sockets. We could do it directly in shutdown,
+           but this part is reused in tests.
+        """
+        if self.__poller_sock:
+            self.__poller_sock.close()
+            self.__poller_sock = None
+        if self.__control_sock:
+            self.__control_sock.close()
+            self.__control_sock = None
 
     def shutdown(self):
         """Stop the MsgQ master."""
-        if self.verbose:
-            sys.stdout.write("[b10-msgq] Stopping the server.\n")
+        logger.debug(TRACE_START, MSGQ_SHUTDOWN)
         self.listen_socket.close()
+        self.cleanup_signalsock()
         if os.path.exists(self.socket_file):
             os.remove(self.socket_file)
 
-# can signal handling and calling a destructor be done without a
-# global variable?
-msgq = None
+    def config_handler(self, new_config):
+        """The configuration handler (run in a separate thread).
+           Not tested, currently effectively empty.
+        """
+        config_logger.debug(TRACE_DETAIL, MSGQ_CONFIG_DATA, new_config)
+
+        with self.__lock:
+            if not self.running:
+                return
+
+            # TODO: Any config handlig goes here.
+
+            return isc.config.create_answer(0)
+
+    def command_handler(self, command, args):
+        """The command handler (run in a separate thread).
+           Not tested, currently effectively empty.
+        """
+        config_logger.debug(TRACE_DETAIL, MSGQ_COMMAND, command, args)
+
+        with self.__lock:
+            if not self.running:
+                return
+
+            # TODO: Any commands go here
 
-def signal_handler(signal, frame):
+            config_logger.error(MSGQ_COMMAND_UNKNOWN, command)
+            return isc.config.create_answer(1, 'unknown command: ' + command)
+
+def signal_handler(msgq, signal, frame):
     if msgq:
-        msgq.shutdown()
-    sys.exit(0)
+        msgq.stop()
 
 if __name__ == "__main__":
     def check_port(option, opt_str, value, parser):
@@ -556,6 +697,7 @@ if __name__ == "__main__":
 
     # Parse any command-line options.
     parser = OptionParser(version=VERSION)
+    # TODO: Should we remove the option?
     parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
                       help="display more about what is going on")
     parser.add_option("-s", "--socket-file", dest="msgq_socket_file",
@@ -563,29 +705,46 @@ if __name__ == "__main__":
                       help="UNIX domain socket file the msgq daemon will use")
     (options, args) = parser.parse_args()
 
-    # Init logging, according to the parameters.
-    # FIXME: Do proper logger configuration, this is just a hack
-    # This is #2582
-    sev = 'INFO'
-    if options.verbose:
-        sev = 'DEBUG'
-    isc.log.init("b10-msgq", buffer=False, severity=sev, debuglevel=99)
-
-    signal.signal(signal.SIGTERM, signal_handler)
-
     # Announce startup.
     logger.debug(TRACE_START, MSGQ_START, VERSION)
 
     msgq = MsgQ(options.msgq_socket_file, options.verbose)
 
+    signal.signal(signal.SIGTERM,
+                  lambda signal, frame: signal_handler(msgq, signal, frame))
+
     try:
         msgq.setup()
     except Exception as e:
         logger.fatal(MSGQ_START_FAIL, e)
         sys.exit(1)
 
+    # We run the processing in a separate thread. This is because we want to
+    # connect to the msgq ourself. But the cc library is unfortunately blocking
+    # in many places and waiting for the processing part to answer, it would
+    # deadlock.
+    poller_thread = threading.Thread(target=msgq.run)
+    poller_thread.daemon = True
     try:
-        msgq.run()
+        poller_thread.start()
+        if msgq.wait_cfgmgr():
+            # Once we get the config manager, we can read our own config.
+            session = isc.config.ModuleCCSession(SPECFILE_LOCATION,
+                                                 msgq.config_handler,
+                                                 msgq.command_handler,
+                                                 None, True,
+                                                 msgq.socket_file)
+            session.start()
+            # And we create a thread that'll just wait for commands and
+            # handle them. We don't terminate the thread, we set it to
+            # daemon. Once the main thread terminates, it'll just die.
+            def run_session():
+                while True:
+                    session.check_command(False)
+            background_thread = threading.Thread(target=run_session)
+            background_thread.daemon = True
+            background_thread.start()
+        poller_thread.join()
     except KeyboardInterrupt:
         pass
 

+ 8 - 0
src/bin/msgq/msgq.spec

@@ -0,0 +1,8 @@
+{
+  "module_spec": {
+    "module_name": "Msgq",
+    "module_description": "The message queue",
+    "config_data": [],
+    "commands": []
+  }
+}

+ 18 - 0
src/bin/msgq/msgq_messages.mes

@@ -19,6 +19,21 @@
 # <topsrcdir>/tools/reorder_message_file.py to make sure the
 # messages are in the correct order.
 
+% MSGQ_CFGMGR_SUBSCRIBED The config manager subscribed to message queue
+This is a debug message. The message queue has little bit of special handling
+for the configuration manager. This special handling is happening now.
+
+% MSGQ_COMMAND Running command %1 with arguments %2
+Debug message. The message queue received a command and it is running it.
+
+% MSGQ_COMMAND_UNKNOWN Unknown command '%1'
+The message queue received a command from other module, but it doesn't
+recognize it. This is probably either a coding error or inconsistency between
+the message queue version and version of the module.
+
+% MSGQ_CONFIG_DATA Received configuration update for the msgq: %1
+Debug message. The message queue received a configuration update, handling it.
+
 % MSGQ_HDR_DECODE_ERR Error decoding header received from socket %1: %2
 The socket with mentioned file descriptor sent a packet. However, it was not
 possible to decode the routing header of the packet. The packet is ignored.
@@ -69,6 +84,9 @@ incompatible version of a module and message queue daemon.
 There was a low-level error when sending data to a socket. The error is logged
 and the corresponding socket is dropped.
 
+% MSGQ_SHUTDOWN Stopping Msgq
+Debug message. The message queue is shutting down.
+
 % MSGQ_SOCK_CLOSE Closing socket fd %1
 Debug message. Closing the mentioned socket.
 

+ 107 - 4
src/bin/msgq/tests/msgq_test.py

@@ -19,7 +19,12 @@ import isc.log
 
 class TestSubscriptionManager(unittest.TestCase):
     def setUp(self):
-        self.sm = SubscriptionManager()
+        self.__cfgmgr_ready_called = 0
+        self.sm = SubscriptionManager(self.cfgmgr_ready)
+
+    def cfgmgr_ready(self):
+        # Called one more time
+        self.__cfgmgr_ready_called += 1
 
     def test_subscription_add_delete_manager(self):
         self.sm.subscribe("a", "*", 'sock1')
@@ -101,7 +106,7 @@ class TestSubscriptionManager(unittest.TestCase):
         try:
             msgq.setup()
             self.assertTrue(os.path.exists(socket_file))
-            msgq.shutdown();
+            msgq.shutdown()
             self.assertFalse(os.path.exists(socket_file))
         except socket.error:
             # ok, the install path doesn't exist at all,
@@ -115,6 +120,25 @@ class TestSubscriptionManager(unittest.TestCase):
     def test_open_socket_bad(self):
         msgq = MsgQ("/does/not/exist")
         self.assertRaises(socket.error, msgq.setup)
+        # But we can clean up after that.
+        msgq.shutdown()
+
+    def test_subscribe_cfgmgr(self):
+        """Test special handling of the config manager. Once it subscribes,
+           the message queue needs to connect and read the config. But not
+           before and only once.
+        """
+        self.assertEqual(0, self.__cfgmgr_ready_called)
+        # Not called when something else subscribes
+        self.sm.subscribe('SomethingElse', '*', 's1')
+        self.assertEqual(0, self.__cfgmgr_ready_called)
+        # Called whenever the config manager subscribes
+        self.sm.subscribe('ConfigManager', '*', 's2')
+        self.assertEqual(1, self.__cfgmgr_ready_called)
+        # But not called again when it subscribes again (should not
+        # happen in practice, but we make sure anyway)
+        self.sm.subscribe('ConfigManager', '*', 's3')
+        self.assertEqual(1, self.__cfgmgr_ready_called)
 
 class DummySocket:
     """
@@ -194,7 +218,6 @@ class MsgQThread(threading.Thread):
     def stop(self):
         self.msgq_.stop()
 
-
 class SendNonblock(unittest.TestCase):
     """
     Tests that the whole thing will not get blocked if someone does not read.
@@ -282,8 +305,10 @@ class SendNonblock(unittest.TestCase):
             if queue_pid == 0:
                 signal.alarm(120)
                 msgq.setup_poller()
+                msgq.setup_signalsock()
                 msgq.register_socket(queue)
                 msgq.run()
+                msgq.cleanup_signalsock()
             else:
                 try:
                     def killall(signum, frame):
@@ -357,6 +382,7 @@ class SendNonblock(unittest.TestCase):
         # Don't need a listen_socket
         msgq.listen_socket = DummySocket
         msgq.setup_poller()
+        msgq.setup_signalsock()
         msgq.register_socket(write)
         msgq.register_socket(control_write)
         # Queue the message for sending
@@ -384,6 +410,10 @@ class SendNonblock(unittest.TestCase):
         # Fail the test if it didn't stop
         self.assertFalse(msgq_thread.isAlive(), "Thread did not stop")
 
+        # Clean up some internals of msgq (usually called as part of
+        # shutdown, but we skip that one here)
+        msgq.cleanup_signalsock()
+
         # Check the exception from the thread, if any
         # First, if we didn't expect it; reraise it (to make test fail and
         # show the stacktrace for debugging)
@@ -456,8 +486,81 @@ class SendNonblock(unittest.TestCase):
         self.do_send_with_send_error(3, sockerr, False, sockerr)
         self.do_send_with_send_error(23, sockerr, False, sockerr)
 
+class ThreadTests(unittest.TestCase):
+    """Test various things around thread synchronization."""
+
+    def setUp(self):
+        self.__msgq = MsgQ()
+        self.__abort_wait = False
+        self.__result = None
+        self.__notify_thread = threading.Thread(target=self.__notify)
+        self.__wait_thread = threading.Thread(target=self.__wait)
+        # Make sure the threads are killed if left behind by the test.
+        self.__notify_thread.daemon = True
+        self.__wait_thread.daemon = True
+
+    def __notify(self):
+        """Call the cfgmgr_ready."""
+        if self.__abort_wait:
+            self.__msgq.cfgmgr_ready(False)
+        else:
+            self.__msgq.cfgmgr_ready()
+
+    def __wait(self):
+        """Wait for config manager and store the result."""
+        self.__result = self.__msgq.wait_cfgmgr()
+
+    def test_wait_cfgmgr(self):
+        """One thread signals the config manager subscribed, the other
+           waits for it. We then check it terminated correctly.
+        """
+        self.__notify_thread.start()
+        self.__wait_thread.start()
+        # Timeout to ensure the test terminates even on failure
+        self.__wait_thread.join(60)
+        self.assertTrue(self.__result)
+
+    def test_wait_cfgmgr_2(self):
+        """Same as test_wait_cfgmgr, but starting the threads in reverse order
+           (the result should be the same).
+        """
+        self.__wait_thread.start()
+        self.__notify_thread.start()
+        # Timeout to ensure the test terminates even on failure
+        self.__wait_thread.join(60)
+        self.assertTrue(self.__result)
+
+    def test_wait_abort(self):
+        """Similar to test_wait_cfgmgr, but the config manager is never
+           subscribed and it is aborted.
+        """
+        self.__abort_wait = True
+        self.__wait_thread.start()
+        self.__notify_thread.start()
+        # Timeout to ensure the test terminates even on failure
+        self.__wait_thread.join(60)
+        self.assertIsNotNone(self.__result)
+        self.assertFalse(self.__result)
+
+    def __check_ready_and_abort(self):
+        """Check that when we first say the config manager is ready and then
+           try to abort, it uses the first result.
+        """
+        self.__msgq.cfgmgr_ready()
+        self.__msgq.cfgmgr_ready(False)
+        self.__result = self.__msgq.wait_cfgmgr()
+
+    def test_ready_and_abort(self):
+        """Perform the __check_ready_and_abort test, but in a separate thread,
+           so in case something goes wrong with the synchronisation and it
+           deadlocks, the test will terminate anyway.
+        """
+        test_thread = threading.Thread(target=self.__check_ready_and_abort)
+        test_thread.daemon = True
+        test_thread.start()
+        test_thread.join(60)
+        self.assertTrue(self.__result)
 
 if __name__ == '__main__':
-    isc.log.init("b10-msgq")
     isc.log.resetUnitTestRootLogger()
     unittest.main()

+ 1 - 0
src/lib/python/isc/config/cfgmgr.py

@@ -234,6 +234,7 @@ class ConfigManager:
 
     def notify_boss(self):
         """Notifies the Boss module that the Config Manager is running"""
+        # TODO: Use a real, broadcast notification here.
         self.cc.group_sendmsg({"running": "ConfigManager"}, "Boss")
 
     def set_module_spec(self, spec):

+ 18 - 0
tests/lettuce/features/msgq.feature

@@ -0,0 +1,18 @@
+Feature: Message queue tests
+    Tests for the message queue daemon.
+
+    Scenario: logging
+        # We check the message queue logs.
+        Given I have bind10 running with configuration default.config
+        And wait for bind10 stderr message BIND10_STARTED_CC
+        And wait for bind10 stderr message MSGQ_START
+        And wait for bind10 stderr message MSGQ_LISTENER_STARTED
+        And wait for bind10 stderr message MSGQ_CFGMGR_SUBSCRIBED
+        And wait for bind10 stderr message CMDCTL_STARTED
+
+        # Check it handles configuration. The configuration is invalid,
+        # but it should get there anyway and we abuse it.
+        # TODO: Once it has any kind of real command or configuration
+        # value, use that instead.
+        Then set bind10 configuration Msgq to {"nonsense": 1}
+        And wait for bind10 stderr message MSGQ_CONFIG_DATA