Browse Source

Merge #2922

Add commands to list MsgQ's users and group subscriptions.

Notify about subscriptions and unsubscriptions.
Michal 'vorner' Vaner 12 years ago
parent
commit
696f1e0129

+ 74 - 25
src/bin/bind10/init.py.in

@@ -89,7 +89,8 @@ logger = isc.log.Logger("init")
 DBG_PROCESS = logger.DBGLVL_TRACE_BASIC
 DBG_COMMANDS = logger.DBGLVL_TRACE_DETAIL
 
-# Messages sent over the unix domain socket to indicate if it is followed by a real socket
+# Messages sent over the unix domain socket to indicate if it is followed by a
+# real socket
 CREATOR_SOCKET_OK = b"1\n"
 CREATOR_SOCKET_UNAVAILABLE = b"0\n"
 
@@ -200,7 +201,8 @@ class Init:
                  verbose=False, nokill=False, setuid=None, setgid=None,
                  username=None, cmdctl_port=None, wait_time=10):
         """
-            Initialize the Init of BIND. This is a singleton (only one can run).
+            Initialize the Init of BIND. This is a singleton (only one can
+            run).
 
             The msgq_socket_file specifies the UNIX domain socket file that the
             msgq process listens on.  If verbose is True, then b10-init reports
@@ -223,12 +225,13 @@ class Init:
         self.component_config = {}
         # Some time in future, it may happen that a single component has
         # multple processes (like a pipeline-like component). If so happens,
-        # name "components" may be inappropriate. But as the code isn't probably
-        # completely ready for it, we leave it at components for now. We also
-        # want to support multiple instances of a single component. If it turns
-        # out that we'll have a single component with multiple same processes
-        # or if we start multiple components with the same configuration (we do
-        # this now, but it might change) is an open question.
+        # name "components" may be inappropriate. But as the code isn't
+        # probably completely ready for it, we leave it at components for
+        # now. We also want to support multiple instances of a single
+        # component. If it turns out that we'll have a single component with
+        # multiple same processes or if we start multiple components with the
+        # same configuration (we do this now, but it might change) is an open
+        # question.
         self.components = {}
         # Simply list of components that died and need to wait for a
         # restart. Components manage their own restart schedule now
@@ -351,7 +354,8 @@ class Init:
 
     def command_handler(self, command, args):
         logger.debug(DBG_COMMANDS, BIND10_RECEIVED_COMMAND, command)
-        answer = isc.config.ccsession.create_answer(1, "command not implemented")
+        answer = isc.config.ccsession.create_answer(1,
+                                                    "command not implemented")
         if type(command) != str:
             answer = isc.config.ccsession.create_answer(1, "bad command")
         else:
@@ -440,7 +444,8 @@ class Init:
         if pid is None:
             logger.debug(DBG_PROCESS, BIND10_STARTED_PROCESS, self.curproc)
         else:
-            logger.debug(DBG_PROCESS, BIND10_STARTED_PROCESS_PID, self.curproc, pid)
+            logger.debug(DBG_PROCESS, BIND10_STARTED_PROCESS_PID, self.curproc,
+                         pid)
 
     def process_running(self, msg, who):
         """
@@ -499,7 +504,8 @@ class Init:
                 if msgq_proc.process:
                     msgq_proc.process.kill()
                 logger.error(BIND10_CONNECTING_TO_CC_FAIL)
-                raise CChannelConnectError("Unable to connect to c-channel after 5 seconds")
+                raise CChannelConnectError("Unable to connect to c-channel " +
+                                           "after 5 seconds")
 
             # try to connect, and if we can't wait a short while
             try:
@@ -507,13 +513,43 @@ class Init:
             except isc.cc.session.SessionError:
                 time.sleep(0.1)
 
-        # Subscribe to the message queue.  The only messages we expect to receive
-        # on this channel are once relating to process startup.
+        # Subscribe to the message queue.  The only messages we expect to
+        # receive on this channel are once relating to process startup.
         if self.cc_session is not None:
             self.cc_session.group_subscribe("Init")
 
         return msgq_proc
 
+    def wait_msgq(self):
+        """
+            Wait for the message queue to fully start. It does so only after
+            the config manager connects to it. We know it is ready when it
+            starts answering commands.
+
+            We don't add a specific command for it here, an error response is
+            as good as positive one to know it is alive.
+        """
+        # We do 10 times shorter sleep here (since the start should be fast
+        # now), so we have 10 times more attempts.
+        time_remaining = self.wait_time * 10
+        retry = True
+        while time_remaining > 0 and retry:
+            try:
+                self.ccs.rpc_call('AreYouThere?', 'Msgq')
+                # We don't expect this to succeed. If it does, it's programmer
+                # error
+                raise Exception("Non-existing RPC call succeeded")
+            except isc.config.RPCRecipientMissing:
+                retry = True # Not there yet
+                time.sleep(0.1)
+                time_remaining -= 1
+            except isc.config.RPCError:
+                retry = False # It doesn't like the RPC, so it's alive now
+
+        if retry: # Still not started
+            raise ProcessStartError("Msgq didn't complete the second stage " +
+                                    "of startup")
+
     def start_cfgmgr(self):
         """
             Starts the configuration manager process
@@ -536,14 +572,16 @@ class Init:
         # time to wait can be set on the command line.
         time_remaining = self.wait_time
         msg, env = self.cc_session.group_recvmsg()
-        while time_remaining > 0 and not self.process_running(msg, "ConfigManager"):
+        while time_remaining > 0 and not self.process_running(msg,
+                                                              "ConfigManager"):
             logger.debug(DBG_PROCESS, BIND10_WAIT_CFGMGR)
             time.sleep(1)
             time_remaining = time_remaining - 1
             msg, env = self.cc_session.group_recvmsg()
 
         if not self.process_running(msg, "ConfigManager"):
-            raise ProcessStartError("Configuration manager process has not started")
+            raise ProcessStartError("Configuration manager process has not " +
+                                    "started")
 
         return bind_cfgd
 
@@ -567,7 +605,8 @@ class Init:
 
     # A couple of utility methods for starting processes...
 
-    def start_process(self, name, args, c_channel_env, port=None, address=None):
+    def start_process(self, name, args, c_channel_env, port=None,
+                      address=None):
         """
             Given a set of command arguments, start the process and output
             appropriate log messages.  If the start is successful, the process
@@ -612,9 +651,9 @@ class Init:
 
     # The next few methods start up the rest of the BIND-10 processes.
     # Although many of these methods are little more than a call to
-    # start_simple, they are retained (a) for testing reasons and (b) as a place
-    # where modifications can be made if the process start-up sequence changes
-    # for a given process.
+    # start_simple, they are retained (a) for testing reasons and (b) as a
+    # place where modifications can be made if the process start-up sequence
+    # changes for a given process.
 
     def start_auth(self):
         """
@@ -666,6 +705,10 @@ class Init:
         # inside the configurator.
         self.start_ccsession(self.c_channel_env)
 
+        # Make sure msgq is fully started before proceeding to the rest
+        # of the components.
+        self.wait_msgq()
+
         # Extract the parameters associated with Init.  This can only be
         # done after the CC Session is started.  Note that the logging
         # configuration may override the "-v" switch set on the command line.
@@ -689,7 +732,8 @@ class Init:
         try:
             self.cc_session = isc.cc.Session(self.msgq_socket_file)
             logger.fatal(BIND10_MSGQ_ALREADY_RUNNING)
-            return "b10-msgq already running, or socket file not cleaned , cannot start"
+            return "b10-msgq already running, or socket file not cleaned , " +\
+                "cannot start"
         except isc.cc.session.SessionError:
             # this is the case we want, where the msgq is not running
             pass
@@ -948,8 +992,8 @@ class Init:
 
     def set_creator(self, creator):
         """
-        Registeres a socket creator into the b10-init. The socket creator is not
-        used directly, but through a cache. The cache is created in this
+        Registeres a socket creator into the b10-init. The socket creator is
+        not used directly, but through a cache. The cache is created in this
         method.
 
         If called more than once, it raises a ValueError.
@@ -1121,9 +1165,12 @@ def parse_args(args=sys.argv[1:], Parser=OptionParser):
     parser = Parser(version=VERSION)
     parser.add_option("-m", "--msgq-socket-file", dest="msgq_socket_file",
                       type="string", default=None,
-                      help="UNIX domain socket file the b10-msgq daemon will use")
+                      help="UNIX domain socket file the b10-msgq daemon " +
+                      "will use")
     parser.add_option("-i", "--no-kill", action="store_true", dest="nokill",
-                      default=False, help="do not send SIGTERM and SIGKILL signals to modules during shutdown")
+                      default=False,
+                      help="do not send SIGTERM and SIGKILL signals to " +
+                      "modules during shutdown")
     parser.add_option("-u", "--user", dest="user", type="string", default=None,
                       help="Change user after startup (must run as root)")
     parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
@@ -1147,7 +1194,9 @@ def parse_args(args=sys.argv[1:], Parser=OptionParser):
                       default=None,
                       help="file to dump the PID of the BIND 10 process")
     parser.add_option("-w", "--wait", dest="wait_time", type="int",
-                      default=10, help="Time (in seconds) to wait for config manager to start up")
+                      default=10,
+                      help="Time (in seconds) to wait for config manager to "
+                      "start up")
 
     (options, args) = parser.parse_args(args)
 

+ 54 - 1
src/bin/bind10/tests/init_test.py.in

@@ -16,7 +16,8 @@
 # Most of the time, we omit the "init" for brevity. Sometimes,
 # we want to be explicit about what we do, like when hijacking a library
 # call used by the b10-init.
-from init import Init, ProcessInfo, parse_args, dump_pid, unlink_pid_file, _BASETIME
+from init import Init, ProcessInfo, parse_args, dump_pid, unlink_pid_file, \
+    _BASETIME
 import init
 
 # XXX: environment tests are currently disabled, due to the preprocessor
@@ -941,6 +942,7 @@ class TestStartStopProcessesInit(unittest.TestCase):
         init.start_ccsession = lambda _: start_ccsession()
         # We need to return the original _read_bind10_config
         init._read_bind10_config = lambda: Init._read_bind10_config(init)
+        init.wait_msgq = lambda: None
         init.start_all_components()
         self.check_started(init, True, start_auth, start_resolver)
         self.check_environment_unchanged()
@@ -967,6 +969,7 @@ class TestStartStopProcessesInit(unittest.TestCase):
         init = MockInit()
         self.check_preconditions(init)
 
+        init.wait_msgq = lambda: None
         init.start_all_components()
         init.runnable = True
         init.config_handler(self.construct_config(False, False))
@@ -1028,6 +1031,7 @@ class TestStartStopProcessesInit(unittest.TestCase):
         init = MockInit()
         self.check_preconditions(init)
 
+        init.wait_msgq = lambda: None
         init.start_all_components()
 
         init.runnable = True
@@ -1066,6 +1070,7 @@ class TestStartStopProcessesInit(unittest.TestCase):
         init = MockInit()
         self.check_preconditions(init)
 
+        init.wait_msgq = lambda: None
         init.start_all_components()
         init.config_handler(self.construct_config(False, False))
         self.check_started_dhcp(init, False, False)
@@ -1075,6 +1080,7 @@ class TestStartStopProcessesInit(unittest.TestCase):
         init = MockInit()
         self.check_preconditions(init)
         # v6 only enabled
+        init.wait_msgq = lambda: None
         init.start_all_components()
         init.runnable = True
         init._Init_started = True
@@ -1347,6 +1353,7 @@ class TestInitComponents(unittest.TestCase):
         # Start it
         orig = init._component_configurator.startup
         init._component_configurator.startup = self.__unary_hook
+        init.wait_msgq = lambda: None
         init.start_all_components()
         init._component_configurator.startup = orig
         self.__check_core(self.__param)
@@ -1499,6 +1506,7 @@ class TestInitComponents(unittest.TestCase):
             pass
         init.ccs = CC()
         init.ccs.get_full_config = lambda: {'components': self.__compconfig}
+        init.wait_msgq = lambda: None
         init.start_all_components()
         self.__check_extended(self.__param)
 
@@ -1768,6 +1776,51 @@ class TestInitComponents(unittest.TestCase):
         # this is set by ProcessInfo.spawn()
         self.assertEqual(42147, pi.pid)
 
+    def test_wait_msgq(self):
+        """
+        Test we can wait for msgq to provide its own alias.
+
+        It is not available the first time, the second it is.
+        """
+        class RpcSession:
+            def __init__(self):
+                # Not yet called
+                self.called = 0
+
+            def rpc_call(self, command, recipient):
+                self.called += 1
+                if self.called == 1:
+                    raise isc.config.RPCRecipientMissing("Not yet")
+                elif self.called == 2:
+                    raise isc.config.RPCError(1, "What?")
+                else:
+                    raise Exception("Called too many times")
+
+        init = MockInitSimple()
+        init.wait_time = 1
+        init.ccs = RpcSession()
+        init.wait_msgq()
+        self.assertEqual(2, init.ccs.called)
+
+    def test_wait_msgq_fail(self):
+        """
+        Test the wait_msgq fails in case the msgq does not appear
+        after so many attempts.
+        """
+        class RpcSession:
+            def __init__(self):
+                self.called = 0
+
+            def rpc_call(self, command, recipient):
+                self.called += 1
+                raise isc.config.RPCRecipientMissing("Not yet")
+
+        b10init = MockInitSimple()
+        b10init.wait_time = 1
+        b10init.ccs = RpcSession()
+        self.assertRaises(init.ProcessStartError, b10init.wait_msgq)
+        self.assertEqual(10, b10init.ccs.called)
+
     def test_start_cfgmgr(self):
         '''Test that b10-cfgmgr is started.'''
         class DummySession():

+ 0 - 1
src/bin/cmdctl/cmdctl.py.in

@@ -36,7 +36,6 @@ import re
 import ssl, socket
 import isc
 import pprint
-import select
 import csv
 import random
 import time

+ 98 - 15
src/bin/msgq/msgq.py.in

@@ -61,12 +61,14 @@ VERSION = "b10-msgq 20110127 (BIND 10 @PACKAGE_VERSION@)"
 # If B10_FROM_BUILD is set in the environment, we use data files
 # from a directory relative to that, otherwise we use the ones
 # installed on the system
-if "B10_FROM_BUILD" in os.environ:
-    SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/msgq"
+if "B10_FROM_SOURCE" in os.environ:
+    SPECFILE_PATH = os.environ["B10_FROM_SOURCE"] + "/src/bin/msgq"
 else:
     PREFIX = "@prefix@"
     DATAROOTDIR = "@datarootdir@"
-    SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
+    SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}",
+                                                  DATAROOTDIR). \
+                                                  replace("${prefix}", PREFIX)
 SPECFILE_LOCATION = SPECFILE_PATH + "/msgq.spec"
 
 class MsgQReceiveError(Exception): pass
@@ -122,12 +124,17 @@ class SubscriptionManager:
         if target in self.subscriptions:
             if socket in self.subscriptions[target]:
                 self.subscriptions[target].remove(socket)
+                return True
+        return False
 
     def unsubscribe_all(self, socket):
         """Remove the socket from all subscriptions."""
-        for socklist in self.subscriptions.values():
+        removed_from = []
+        for subs, socklist in self.subscriptions.items():
             if socket in socklist:
                 socklist.remove(socket)
+                removed_from.append(subs)
+        return removed_from
 
     def find_sub(self, group, instance):
         """Return an array of sockets which want this specific group,
@@ -184,6 +191,7 @@ class MsgQ:
         self.hostname = socket.gethostname()
         self.subs = SubscriptionManager(self.cfgmgr_ready)
         self.lnames = {}
+        self.fd_to_lname = {}
         self.sendbuffs = {}
         self.running = False
         self.__cfgmgr_ready = None
@@ -195,6 +203,33 @@ class MsgQ:
         # not for performance, so we use wide lock scopes to be on the safe
         # side.
         self.__lock = threading.Lock()
+        self._session = None
+
+    def members_notify(self, event, params):
+        """
+        Thin wrapper around ccs's notify. Send a notification about change
+        of some list that can be requested by the members command.
+
+        The event is one of:
+        - connected (client connected to MsgQ)
+        - disconected (client disconnected from MsgQ)
+        - subscribed (client subscribed to a group)
+        - unsubscribed (client unsubscribed from a group)
+
+        The params is dict containing:
+        - client: The lname of the client in question.
+        - group (for 'subscribed' and 'unsubscribed' events):
+          The group the client subscribed or unsubscribed from.
+
+        The notification occurs after the event, so client a subscribing for
+        notifications will get a notification about its own subscription, but
+        will not get a notification when it unsubscribes.
+        """
+        # Due to the interaction between threads (and fear it might influence
+        # sending stuff), we test this method in msgq_run_test, instead of
+        # mocking the ccs.
+        if self._session: # Don't send before we have started up
+            self._session.notify('cc_members', event, params)
 
     def cfgmgr_ready(self, ready=True):
         """Notify that the config manager is either subscribed, or
@@ -323,11 +358,13 @@ class MsgQ:
 
     def register_socket(self, newsocket):
         """
-        Internal function to insert a socket. Used by process_accept and some tests.
+        Internal function to insert a socket. Used by process_accept and some
+        tests.
         """
         self.sockets[newsocket.fileno()] = newsocket
         lname = self.newlname()
         self.lnames[lname] = newsocket
+        self.fd_to_lname[newsocket.fileno()] = lname
 
         logger.debug(TRACE_BASIC, MSGQ_SOCKET_REGISTERED, newsocket.fileno(),
                      lname)
@@ -337,6 +374,8 @@ class MsgQ:
         else:
             self.add_kqueue_socket(newsocket)
 
+        self.members_notify('connected', {'client': lname})
+
     def kill_socket(self, fd, sock):
         """Fully close down the socket."""
         # Unregister events on the socket.  Note that we don't have to do
@@ -345,14 +384,23 @@ class MsgQ:
         if self.poller:
             self.poller.unregister(sock)
 
-        self.subs.unsubscribe_all(sock)
-        lname = [ k for k, v in self.lnames.items() if v == sock ][0]
+        unsubscribed_from = self.subs.unsubscribe_all(sock)
+        lname = self.fd_to_lname[fd]
+        del self.fd_to_lname[fd]
         del self.lnames[lname]
         sock.close()
         del self.sockets[fd]
         if fd in self.sendbuffs:
             del self.sendbuffs[fd]
         logger.debug(TRACE_BASIC, MSGQ_SOCK_CLOSE, fd)
+        # Filter out just the groups.
+        unsubscribed_from_groups = set(map(lambda x: x[0], unsubscribed_from))
+        for group in unsubscribed_from_groups:
+            self.members_notify('unsubscribed', {
+                                    'client': lname,
+                                    'group': group
+                                })
+        self.members_notify('disconnected', {'client': lname})
 
     def __getbytes(self, fd, sock, length, continued):
         """Get exactly the requested bytes, or raise an exception if
@@ -567,7 +615,8 @@ class MsgQ:
         This is done by using an increasing counter and the current
         time."""
         self.connection_counter += 1
-        return "%x_%x@%s" % (time.time(), self.connection_counter, self.hostname)
+        return "%x_%x@%s" % (time.time(), self.connection_counter,
+                             self.hostname)
 
     def process_command_ping(self, sock, routing, data):
         self.sendmsg(sock, { CC_HEADER_TYPE : CC_COMMAND_PONG }, data)
@@ -644,13 +693,25 @@ class MsgQ:
         if group == None or instance == None:
             return  # ignore invalid packets entirely
         self.subs.subscribe(group, instance, sock)
+        lname = self.fd_to_lname[sock.fileno()]
+        self.members_notify('subscribed',
+                            {
+                                'client': lname,
+                                'group': group
+                            })
 
     def process_command_unsubscribe(self, sock, routing, data):
         group = routing[CC_HEADER_GROUP]
         instance = routing[CC_HEADER_INSTANCE]
         if group == None or instance == None:
             return  # ignore invalid packets entirely
-        self.subs.unsubscribe(group, instance, sock)
+        if self.subs.unsubscribe(group, instance, sock):
+            lname = self.fd_to_lname[sock.fileno()]
+            self.members_notify('unsubscribed',
+                                {
+                                    'client': lname,
+                                    'group': group
+                                })
 
     def run(self):
         """Process messages.  Forever.  Mostly."""
@@ -795,16 +856,27 @@ class MsgQ:
             return isc.config.create_answer(0)
 
     def command_handler(self, command, args):
-        """The command handler (run in a separate thread).
-           Not tested, currently effectively empty.
-        """
+        """The command handler (run in a separate thread)."""
         config_logger.debug(TRACE_DETAIL, MSGQ_COMMAND, command, args)
 
         with self.__lock:
             if not self.running:
                 return
 
-            # TODO: Any commands go here
+            # TODO: Who does validation? The ModuleCCSession or must we?
+
+            if command == 'members':
+                # List all members of MsgQ or of a group.
+                if args is None:
+                    args = {}
+                group = args.get('group')
+                if group:
+                    return isc.config.create_answer(0,
+                        list(map(lambda sock: self.fd_to_lname[sock.fileno()],
+                                 self.subs.find(group, ''))))
+                else:
+                    return isc.config.create_answer(0,
+                                                    list(self.lnames.keys()))
 
             config_logger.error(MSGQ_COMMAND_UNKNOWN, command)
             return isc.config.create_answer(1, 'unknown command: ' + command)
@@ -819,7 +891,8 @@ if __name__ == "__main__":
         a valid port number. Used by OptionParser() on startup."""
         intval = int(value)
         if (intval < 0) or (intval > 65535):
-            raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
+            raise OptionValueError("%s requires a port number (0-65535)" %
+                                   opt_str)
         parser.values.msgq_port = intval
 
     # Parse any command-line options.
@@ -861,13 +934,23 @@ if __name__ == "__main__":
                                                  msgq.command_handler,
                                                  None, True,
                                                  msgq.socket_file)
+            msgq._session = session
             session.start()
             # And we create a thread that'll just wait for commands and
             # handle them. We don't terminate the thread, we set it to
             # daemon. Once the main thread terminates, it'll just die.
             def run_session():
                 while True:
-                    session.check_command(False)
+                    # As the check_command has internal mutex that is shared
+                    # with sending part (which includes notify). So we don't
+                    # want to hold it long-term and block using select.
+                    fileno = session.get_socket().fileno()
+                    try:
+                        (reads, _, _) = select.select([fileno], [], [])
+                    except select.error as se:
+                        if se.args[0] != errno.EINTR:
+                            raise
+                    session.check_command(True)
             background_thread = threading.Thread(target=run_session)
             background_thread.daemon = True
             background_thread.start()

+ 13 - 1
src/bin/msgq/msgq.spec

@@ -3,6 +3,18 @@
     "module_name": "Msgq",
     "module_description": "The message queue",
     "config_data": [],
-    "commands": []
+    "commands": [
+      {
+        "command_name": "members",
+        "command_description": "Provide the list of members of a group or of the whole MsgQ if no group is given.",
+        "command_args": [
+          {
+            "item_name": "group",
+            "item_optional": true,
+            "item_type": "string"
+          }
+        ]
+      }
+    ]
   }
 }

+ 56 - 0
src/bin/msgq/tests/msgq_run_test.py

@@ -272,6 +272,62 @@ class MsgqRunTest(unittest.TestCase):
             conn.close()
             conn = new
 
+    def test_notifications(self):
+        """
+        Check that the MsgQ is actually sending notifications about events.
+        We create a socket, subscribe the socket itself and see it receives
+        it's own notification.
+
+        Testing all the places where notifications happen is task for the
+        common unit tests in msgq_test.py.
+
+        The test is here, because there might be some trouble with multiple
+        threads in msgq (see the note about locking on the module CC session
+        when sending message from one thread and listening for commands in the
+        other) which would be hard to test using pure unit tests. Testing
+        runnig whole msgq tests that implicitly.
+        """
+        conn = self.__get_connection()
+        # Activate the session, pretend to be the config manager.
+        conn.group_subscribe('ConfigManager')
+        # Answer request for logging config
+        (msg, env) = conn.group_recvmsg(nonblock=False)
+        self.assertEqual({'command': ['get_config',
+                                      {'module_name': 'Logging'}]},
+                         msg)
+        conn.group_reply(env, {'result': [0, {}]})
+        # It sends its spec.
+        (msg, env) = conn.group_recvmsg(nonblock=False)
+        self.assertEqual('module_spec', msg['command'][0])
+        conn.group_reply(env, {'result': [0]})
+        # It asks for its own config
+        (msg, env) = conn.group_recvmsg(nonblock=False)
+        self.assertEqual({'command': ['get_config',
+                                      {'module_name': 'Msgq'}]},
+                         msg)
+        conn.group_reply(env, {'result': [0, {}]})
+        # Synchronization - make sure the session is running before
+        # we continue, so we get the notification. Similar synchronisation
+        # as in b10-init, but we don't have full ccsession here, so we
+        # do so manually.
+        synchronised = False
+        attempts = 100
+        while not synchronised and attempts > 0:
+            time.sleep(0.1)
+            seq = conn.group_sendmsg({'command': ['Are you running?']},
+                                     'Msgq', want_answer=True)
+            msg = conn.group_recvmsg(nonblock=False, seq=seq)
+            synchronised = msg[0] != -1
+            attempts -= 1
+        self.assertTrue(synchronised)
+        # The actual test
+        conn.group_subscribe('notifications/cc_members')
+        (msg, env) = conn.group_recvmsg(nonblock=False)
+        self.assertEqual({'notification': ['subscribed', {
+            'client': conn.lname,
+            'group': 'notifications/cc_members'
+        }]}, msg)
+
 if __name__ == '__main__':
     isc.log.init("msgq-tests")
     isc.log.resetUnitTestRootLogger()

+ 176 - 6
src/bin/msgq/tests/msgq_test.py

@@ -63,8 +63,11 @@ class TestSubscriptionManager(unittest.TestCase):
         socks = [ 's1', 's2', 's3', 's4', 's5' ]
         for s in socks:
             self.sm.subscribe("a", "*", s)
-        self.sm.unsubscribe("a", "*", 's3')
-        self.assertEqual(self.sm.find_sub("a", "*"), [ 's1', 's2', 's4', 's5' ])
+        self.assertTrue(self.sm.unsubscribe("a", "*", 's3'))
+        # Unsubscribe from group it is not in
+        self.assertFalse(self.sm.unsubscribe("a", "*", 's42'))
+        self.assertEqual(self.sm.find_sub("a", "*"),
+                         [ 's1', 's2', 's4', 's5' ])
 
     def test_unsubscribe_all(self):
         self.sm.subscribe('g1', 'i1', 's1')
@@ -75,7 +78,9 @@ class TestSubscriptionManager(unittest.TestCase):
         self.sm.subscribe('g2', 'i1', 's2')
         self.sm.subscribe('g2', 'i2', 's1')
         self.sm.subscribe('g2', 'i2', 's2')
-        self.sm.unsubscribe_all('s1')
+        self.assertEqual(set([('g1', 'i1'), ('g1', 'i2'), ('g2', 'i1'),
+                              ('g2', 'i2')]),
+                         set(self.sm.unsubscribe_all('s1')))
         self.assertEqual(self.sm.find_sub("g1", "i1"), [ 's2' ])
         self.assertEqual(self.sm.find_sub("g1", "i2"), [ 's2' ])
         self.assertEqual(self.sm.find_sub("g2", "i1"), [ 's2' ])
@@ -178,6 +183,157 @@ class MsgQTest(unittest.TestCase):
         data = json.loads(msg[6 + header_len:].decode('utf-8'))
         return (header, data)
 
+    def test_unknown_command(self):
+        """
+        Test the command handler returns error when the command is unknown.
+        """
+        # Fake we are running, to disable test workarounds
+        self.__msgq.running = True
+        self.assertEqual({'result': [1, "unknown command: unknown"]},
+                         self.__msgq.command_handler('unknown', {}))
+
+    def test_get_members(self):
+        """
+        Test getting members of a group or of all connected clients.
+        """
+        # Push two dummy "clients" into msgq (the ugly way, by directly
+        # tweaking relevant data structures).
+        class Sock:
+            def __init__(self, fileno):
+                self.fileno = lambda: fileno
+        self.__msgq.lnames['first'] = Sock(1)
+        self.__msgq.lnames['second'] = Sock(2)
+        self.__msgq.fd_to_lname[1] = 'first'
+        self.__msgq.fd_to_lname[2] = 'second'
+        # Subscribe them to some groups
+        self.__msgq.process_command_subscribe(self.__msgq.lnames['first'],
+                                              {'group': 'G1', 'instance': '*'},
+                                              None)
+        self.__msgq.process_command_subscribe(self.__msgq.lnames['second'],
+                                              {'group': 'G1', 'instance': '*'},
+                                              None)
+        self.__msgq.process_command_subscribe(self.__msgq.lnames['second'],
+                                              {'group': 'G2', 'instance': '*'},
+                                              None)
+        # Now query content of some groups through the command handler.
+        self.__msgq.running = True # Enable the command handler
+        def check_both(result):
+            """
+            Check the result is successful one and it contains both lnames (in
+            any order).
+            """
+            array = result['result'][1]
+            self.assertEqual(set(['first', 'second']), set(array))
+            self.assertEqual({'result': [0, array]}, result)
+            # Make sure the result can be encoded as JSON
+            # (there seems to be types that look like a list but JSON choks
+            # on them)
+            json.dumps(result)
+        # Members of the G1 and G2
+        self.assertEqual({'result': [0, ['second']]},
+                         self.__msgq.command_handler('members',
+                                                     {'group': 'G2'}))
+        check_both(self.__msgq.command_handler('members', {'group': 'G1'}))
+        # We pretend that all the possible groups exist, just that most
+        # of them are empty. So requesting for Empty is request for an empty
+        # group and should not fail.
+        self.assertEqual({'result': [0, []]},
+                         self.__msgq.command_handler('members',
+                                                     {'group': 'Empty'}))
+        # Without the name of the group, we just get all the clients.
+        check_both(self.__msgq.command_handler('members', {}))
+        # Omitting the parameters completely in such case is OK
+        check_both(self.__msgq.command_handler('members', None))
+
+    def notifications_setup(self):
+        """
+        Common setup of some notifications tests. Mock several things.
+        """
+        # Mock the method to send notifications (we don't really want
+        # to send them now, just see they'd be sent).
+        # Mock the poller, as we don't need it at all (and we don't have
+        # real socket to give it now).
+        notifications = []
+        def send_notification(event, params):
+            notifications.append((event, params))
+        class FakePoller:
+            def register(self, socket, mode):
+                pass
+            def unregister(self, sock):
+                pass
+        self.__msgq.members_notify = send_notification
+        self.__msgq.poller = FakePoller()
+
+        # Create a socket
+        class Sock:
+            def __init__(self, fileno):
+                self.fileno = lambda: fileno
+            def close(self):
+                pass
+        sock = Sock(1)
+        return notifications, sock
+
+    def test_notifies(self):
+        """
+        Test the message queue sends notifications about connecting,
+        disconnecting and subscription changes.
+        """
+        notifications, sock = self.notifications_setup()
+
+        # We should notify about new cliend when we register it
+        self.__msgq.register_socket(sock)
+        lname = self.__msgq.fd_to_lname[1] # Steal the lname
+        self.assertEqual([('connected', {'client': lname})], notifications)
+        notifications.clear()
+
+        # A notification should happen for a subscription to a group
+        self.__msgq.process_command_subscribe(sock, {'group': 'G',
+                                                     'instance': '*'},
+                                              None)
+        self.assertEqual([('subscribed', {'client': lname, 'group': 'G'})],
+                         notifications)
+        notifications.clear()
+
+        # As well for unsubscription
+        self.__msgq.process_command_unsubscribe(sock, {'group': 'G',
+                                                       'instance': '*'},
+                                                None)
+        self.assertEqual([('unsubscribed', {'client': lname, 'group': 'G'})],
+                         notifications)
+        notifications.clear()
+
+        # Unsubscription from a group it isn't subscribed to
+        self.__msgq.process_command_unsubscribe(sock, {'group': 'H',
+                                                       'instance': '*'},
+                                                None)
+        self.assertEqual([], notifications)
+
+        # And, finally, for removal of client
+        self.__msgq.kill_socket(sock.fileno(), sock)
+        self.assertEqual([('disconnected', {'client': lname})], notifications)
+
+    def test_notifies_implicit_kill(self):
+        """
+        Test that the unsubscription notifications are sent before the socket
+        is dropped, even in case it does not unsubscribe explicitly.
+        """
+        notifications, sock = self.notifications_setup()
+
+        # Register and subscribe. Notifications for these are in above test.
+        self.__msgq.register_socket(sock)
+        lname = self.__msgq.fd_to_lname[1] # Steal the lname
+        self.__msgq.process_command_subscribe(sock, {'group': 'G',
+                                                     'instance': '*'},
+                                              None)
+        notifications.clear()
+
+        self.__msgq.kill_socket(sock.fileno(), sock)
+        # Now, the notification for unsubscribe should be first, second for
+        # the disconnection.
+        self.assertEqual([('unsubscribed', {'client': lname, 'group': 'G'}),
+                          ('disconnected', {'client': lname})
+                         ], notifications)
+
     def test_undeliverable_errors(self):
         """
         Send several packets through the MsgQ and check it generates
@@ -412,12 +568,17 @@ class SendNonblock(unittest.TestCase):
         The write end is put into the message queue, so we can check it.
         It returns (msgq, read_end, write_end). It is expected the sockets
         are closed by the caller afterwards.
+
+        Also check the sockets are registered correctly (eg. internal data
+        structures are there for them).
         '''
         msgq = MsgQ()
         # We do only partial setup, so we don't create the listening socket
         msgq.setup_poller()
         (read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
         msgq.register_socket(write)
+        self.assertEqual(1, len(msgq.lnames))
+        self.assertEqual(write, msgq.lnames[msgq.fd_to_lname[write.fileno()]])
         return (msgq, read, write)
 
     def infinite_sender(self, sender):
@@ -437,8 +598,15 @@ class SendNonblock(unittest.TestCase):
         # Explicitly close temporary socket pair as the Python
         # interpreter expects it.  It may not be 100% exception safe,
         # but since this is only for tests we prefer brevity.
+        # Actually, the write end is often closed by the sender.
+        if write.fileno() != -1:
+            # Some of the senders passed here kill the socket internally.
+            # So kill it only if not yet done so. If the socket is closed,
+            # it gets -1 as fileno().
+            msgq.kill_socket(write.fileno(), write)
+        self.assertFalse(msgq.lnames)
+        self.assertFalse(msgq.fd_to_lname)
         read.close()
-        write.close()
 
     def test_infinite_sendmsg(self):
         """
@@ -640,9 +808,11 @@ class SendNonblock(unittest.TestCase):
                                send_exception is raised by BadSocket.
         """
         (write, read) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
-        (control_write, control_read) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+        (control_write, control_read) = socket.socketpair(socket.AF_UNIX,
+                                                          socket.SOCK_STREAM)
         badwrite = BadSocket(write, raise_on_send, send_exception)
-        self.do_send(badwrite, read, control_write, control_read, expect_answer, expect_send_exception)
+        self.do_send(badwrite, read, control_write, control_read,
+                     expect_answer, expect_send_exception)
         write.close()
         read.close()
         control_write.close()

+ 1 - 0
src/bin/zonemgr/zonemgr.py.in

@@ -691,6 +691,7 @@ class Zonemgr:
         try:
             while not self._shutdown_event.is_set():
                 fileno = self._module_cc.get_socket().fileno()
+                reads = []
                 # Wait with select() until there is something to read,
                 # and then read it using a non-blocking read
                 # This may or may not be relevant data for this loop,