Browse Source

[2582] Synchronisation when the config manager connects

Michal 'vorner' Vaner 12 years ago
parent
commit
275a72e95d
2 changed files with 111 additions and 3 deletions
  1. 37 2
      src/bin/msgq/msgq.py.in
  2. 74 1
      src/bin/msgq/tests/msgq_test.py

+ 37 - 2
src/bin/msgq/msgq.py.in

@@ -154,9 +154,41 @@ class MsgQ:
         self.lnames = {}
         self.sendbuffs = {}
         self.running = False
+        self.__cfgmgr_ready = None
+        self.__cfgmgr_ready_cond = threading.Condition()
 
-    def cfgmgr_ready(self):
-        pass
+    def cfgmgr_ready(self, ready=True):
+        """Notify that the config manager is either subscribed, or
+           that the msgq is shutting down and it won't connect, but
+           anybody waiting for it should stop anyway.
+
+           The ready parameter signifies if the config manager is subscribed.
+
+           This method can be called multiple times, but second and any
+           following call is simply ignored. This means the "abort" version
+           of the call can be used on any stop unconditionally, even when
+           the config manager already connected.
+        """
+        with self.__cfgmgr_ready_cond:
+            if self.__cfgmgr_ready is not None:
+                # This is a second call to this method. In that case it does
+                # nothing.
+                return
+            self.__cfgmgr_ready = ready
+            self.__cfgmgr_ready_cond.notify_all()
+
+    def wait_cfgmgr(self):
+        """Wait for msgq to subscribe.
+
+           When this returs, the config manager is either subscribed, or
+           msgq gave up waiting for it. Success is signified by the return
+           value.
+        """
+        with self.__cfgmgr_ready_cond:
+            # Wait until it either aborts or subscribes
+            while self.__cfgmgr_ready is None:
+                self.__cfgmgr_ready_cond.wait()
+            return self.__cfgmgr_ready
 
     def setup_poller(self):
         """Set up the poll thing.  Internal function."""
@@ -575,6 +607,9 @@ class MsgQ:
         # Signal it should terminate.
         self.__control_sock.close()
         self.__control_sock = None
+        # Abort anything waiting on the condition, just to make sure it's not
+        # blocked forever
+        self.cfgmgr_ready(False)
 
     def cleanup_signalsock(self):
         """Close the signal sockets. We could do it directly in shutdown,

+ 74 - 1
src/bin/msgq/tests/msgq_test.py

@@ -218,7 +218,6 @@ class MsgQThread(threading.Thread):
     def stop(self):
         self.msgq_.stop()
 
-
 class SendNonblock(unittest.TestCase):
     """
     Tests that the whole thing will not get blocked if someone does not read.
@@ -487,6 +486,80 @@ class SendNonblock(unittest.TestCase):
         self.do_send_with_send_error(3, sockerr, False, sockerr)
         self.do_send_with_send_error(23, sockerr, False, sockerr)
 
+class ThreadTests(unittest.TestCase):
+    """Test various things around thread synchronization."""
+
+    def setUp(self):
+        self.__msgq = MsgQ()
+        self.__abort_wait = False
+        self.__result = None
+        self.__notify_thread = threading.Thread(target=self.__notify)
+        self.__wait_thread = threading.Thread(target=self.__wait)
+        # Make sure the threads are killed if left behind by the test.
+        self.__notify_thread.daemon = True
+        self.__wait_thread.daemon = True
+
+    def __notify(self):
+        """Call the cfgmgr_ready."""
+        if self.__abort_wait:
+            self.__msgq.cfgmgr_ready(False)
+        else:
+            self.__msgq.cfgmgr_ready()
+
+    def __wait(self):
+        """Wait for config manager and store the result."""
+        self.__result = self.__msgq.wait_cfgmgr()
+
+    def test_wait_cfgmgr(self):
+        """One thread signals the config manager subscribed, the other
+           waits for it. We then check it terminated correctly.
+        """
+        self.__notify_thread.start()
+        self.__wait_thread.start()
+        # Timeout to ensure the test terminates even on failure
+        self.__wait_thread.join(60)
+        self.assertTrue(self.__result)
+
+    def test_wait_cfgmgr_2(self):
+        """Same as test_wait_cfgmgr, but starting the threads in reverse order
+           (the result should be the same).
+        """
+        self.__wait_thread.start()
+        self.__notify_thread.start()
+        # Timeout to ensure the test terminates even on failure
+        self.__wait_thread.join(60)
+        self.assertTrue(self.__result)
+
+    def test_wait_abort(self):
+        """Similar to test_wait_cfgmgr, but the config manager is never
+           subscribed and it is aborted.
+        """
+        self.__abort_wait = True
+        self.__wait_thread.start()
+        self.__notify_thread.start()
+        # Timeout to ensure the test terminates even on failure
+        self.__wait_thread.join(60)
+        self.assertIsNotNone(self.__result)
+        self.assertFalse(self.__result)
+
+    def __check_ready_and_abort(self):
+        """Check that when we first say the config manager is ready and then
+           try to abort, it uses the first result.
+        """
+        self.__msgq.cfgmgr_ready()
+        self.__msgq.cfgmgr_ready(False)
+        self.__result = self.__msgq.wait_cfgmgr()
+
+    def test_ready_and_abort(self):
+        """Perform the __check_ready_and_abort test, but in a separate thread,
+           so in case something goes wrong with the synchronisation and it
+           deadlocks, the test will terminate anyway.
+        """
+        test_thread = threading.Thread(target=self.__check_ready_and_abort)
+        test_thread.daemon = True
+        test_thread.start()
+        test_thread.join(60)
+        self.assertTrue(self.__result)
 
 if __name__ == '__main__':
     isc.log.init("b10-msgq")