Browse Source

[2922] Prevent a race condition on start

There was a short time in which other modules could be started before
MsgQ subscribed to itself. While this was not as interesting before, as
the only problem could be old configuration of logging being in use as
the other modules start, it now becomes more problematic in this branch,
as commands to list members (which would be issued at start-up) could
get lost.

Make the init wait for the full startup of MsgQ (by confirming the MsgQ
responds to a command, even by an error).
Michal 'vorner' Vaner 12 years ago
parent
commit
6ca9ce7636
2 changed files with 87 additions and 1 deletions
  1. 33 0
      src/bin/bind10/init.py.in
  2. 54 1
      src/bin/bind10/tests/init_test.py.in

+ 33 - 0
src/bin/bind10/init.py.in

@@ -514,6 +514,35 @@ class Init:
 
         return msgq_proc
 
+    def wait_msgq(self):
+        """
+            Wait for the message queue to fully start. It does so only after the
+            config manager connects to it. We know it is ready when it starts
+            answering commands.
+
+            We don't add a specific command for it here, an error response is
+            as good as positive one to know it is alive.
+        """
+        # We do 10 times shorter sleep here (since the start should be fast
+        # now), so we have 10 times more attempts.
+        time_remaining = self.wait_time * 10
+        retry = True
+        while time_remaining > 0 and retry:
+            try:
+                self.ccs.rpc_call('AreYouThere?', 'Msgq')
+                # We don't expect this to succeed. If it does, it's programmer error
+                raise Exception("Non-existing RPC call succeeded")
+            except isc.config.RPCRecipientMissing:
+                retry = True # Not there yet
+                time.sleep(0.1)
+                time_remaining -= 1
+            except isc.config.RPCError:
+                retry = False # It doesn't like the RPC, so it's alive now
+
+        if retry: # Still not started
+            raise ProcessStartError("Msgq didn't complete the second stage " +
+                                    "of startup")
+
     def start_cfgmgr(self):
         """
             Starts the configuration manager process
@@ -666,6 +695,10 @@ class Init:
         # inside the configurator.
         self.start_ccsession(self.c_channel_env)
 
+        # Make sure msgq is fully started before proceeding to the rest
+        # of the components.
+        self.wait_msgq()
+
         # Extract the parameters associated with Init.  This can only be
         # done after the CC Session is started.  Note that the logging
         # configuration may override the "-v" switch set on the command line.

+ 54 - 1
src/bin/bind10/tests/init_test.py.in

@@ -16,7 +16,8 @@
 # Most of the time, we omit the "init" for brevity. Sometimes,
 # we want to be explicit about what we do, like when hijacking a library
 # call used by the b10-init.
-from init import Init, ProcessInfo, parse_args, dump_pid, unlink_pid_file, _BASETIME
+from init import Init, ProcessInfo, parse_args, dump_pid, unlink_pid_file, \
+    _BASETIME, ProcessStartError
 import init
 
 # XXX: environment tests are currently disabled, due to the preprocessor
@@ -941,6 +942,7 @@ class TestStartStopProcessesInit(unittest.TestCase):
         init.start_ccsession = lambda _: start_ccsession()
         # We need to return the original _read_bind10_config
         init._read_bind10_config = lambda: Init._read_bind10_config(init)
+        init.wait_msgq = lambda: None
         init.start_all_components()
         self.check_started(init, True, start_auth, start_resolver)
         self.check_environment_unchanged()
@@ -967,6 +969,7 @@ class TestStartStopProcessesInit(unittest.TestCase):
         init = MockInit()
         self.check_preconditions(init)
 
+        init.wait_msgq = lambda: None
         init.start_all_components()
         init.runnable = True
         init.config_handler(self.construct_config(False, False))
@@ -1028,6 +1031,7 @@ class TestStartStopProcessesInit(unittest.TestCase):
         init = MockInit()
         self.check_preconditions(init)
 
+        init.wait_msgq = lambda: None
         init.start_all_components()
 
         init.runnable = True
@@ -1066,6 +1070,7 @@ class TestStartStopProcessesInit(unittest.TestCase):
         init = MockInit()
         self.check_preconditions(init)
 
+        init.wait_msgq = lambda: None
         init.start_all_components()
         init.config_handler(self.construct_config(False, False))
         self.check_started_dhcp(init, False, False)
@@ -1075,6 +1080,7 @@ class TestStartStopProcessesInit(unittest.TestCase):
         init = MockInit()
         self.check_preconditions(init)
         # v6 only enabled
+        init.wait_msgq = lambda: None
         init.start_all_components()
         init.runnable = True
         init._Init_started = True
@@ -1347,6 +1353,7 @@ class TestInitComponents(unittest.TestCase):
         # Start it
         orig = init._component_configurator.startup
         init._component_configurator.startup = self.__unary_hook
+        init.wait_msgq = lambda: None
         init.start_all_components()
         init._component_configurator.startup = orig
         self.__check_core(self.__param)
@@ -1499,6 +1506,7 @@ class TestInitComponents(unittest.TestCase):
             pass
         init.ccs = CC()
         init.ccs.get_full_config = lambda: {'components': self.__compconfig}
+        init.wait_msgq = lambda: None
         init.start_all_components()
         self.__check_extended(self.__param)
 
@@ -1768,6 +1776,51 @@ class TestInitComponents(unittest.TestCase):
         # this is set by ProcessInfo.spawn()
         self.assertEqual(42147, pi.pid)
 
+    def test_wait_msgq(self):
+        """
+        Test we can wait for msgq to provide its own alias.
+
+        It is not available the first time, the second it is.
+        """
+        class RpcSession:
+            def __init__(self):
+                # Not yet called
+                self.called = 0
+
+            def rpc_call(self, command, recipient):
+                self.called += 1
+                if self.called == 1:
+                    raise isc.config.RPCRecipientMissing("Not yet")
+                elif self.called == 2:
+                    raise isc.config.RPCError(1, "What?")
+                else:
+                    raise Exception("Called too many times")
+
+        init = MockInitSimple()
+        init.wait_time = 1
+        init.ccs = RpcSession()
+        init.wait_msgq()
+        self.assertEqual(2, init.ccs.called)
+
+    def test_wait_msgq_fail(self):
+        """
+        Test the wait_msgq fails in case the msgq does not appear
+        after so many attempts.
+        """
+        class RpcSession:
+            def __init__(self):
+                self.called = 0
+
+            def rpc_call(self, command, recipient):
+                self.called += 1
+                raise isc.config.RPCRecipientMissing("Not yet")
+
+        init = MockInitSimple()
+        init.wait_time = 1
+        init.ccs = RpcSession()
+        self.assertRaises(ProcessStartError, init.wait_msgq)
+        self.assertEqual(10, init.ccs.called)
+
     def test_start_cfgmgr(self):
         '''Test that b10-cfgmgr is started.'''
         class DummySession():