Browse Source

[2582] Run the MSGQ poller in separate thread

This will be needed in the following work. If we run in the same thread
and tried to connect to self by the cc library, it would deadlock, since
the library blocks waiting for answer.

Also, small cleanup of shutdown is included.

No functional change should be observable.
Michal 'vorner' Vaner 12 years ago
parent
commit
84a994f8ef
2 changed files with 64 additions and 5 deletions
  1. 54 4
      src/bin/msgq/msgq.py.in
  2. 10 1
      src/bin/msgq/tests/msgq_test.py

+ 54 - 4
src/bin/msgq/msgq.py.in

@@ -29,6 +29,7 @@ import errno
 import time
 import select
 import random
+import threading
 from optparse import OptionParser, OptionValueError
 import isc.util.process
 import isc.log
@@ -37,7 +38,13 @@ from isc.log_messages.msgq_messages import *
 import isc.cc
 
 isc.util.process.rename()
+# Logger that is used in the actual msgq handling - startup, shutdown and the
+# poller thread.
 logger = isc.log.Logger("msgq")
+# A separate copy for the master/config thread when the poller thread runs.
+# We use a separate instance, since the logger itself doesn't have to be
+# thread safe.
+config_logger = isc.log.Logger("msgq")
 TRACE_START = logger.DBGLVL_START_SHUT
 TRACE_BASIC = logger.DBGLVL_TRACE_BASIC
 TRACE_DETAIL = logger.DBGLVL_TRACE_DETAIL
@@ -191,6 +198,20 @@ class MsgQ:
         else:
             self.add_kqueue_socket(self.listen_socket)
 
+    def setup_signalsock(self):
+        """Create a socket pair used to signal when we want to finish.
+           Using a socket is easy and thread/signal safe way to signal
+           the termination.
+        """
+        # The __poller_sock will be the end in the poller. When it is
+        # closed, we should shut down.
+        (self.__poller_sock, self.__control_sock) = socket.socketpair()
+
+        if self.poller:
+            self.poller.register(self.__poller_sock, select.POLLIN)
+        else:
+            self.add_kqueue_socket(self.__poller_sock)
+
     def setup(self):
         """Configure listener socket, polling, etc.
            Raises a socket.error if the socket_file cannot be
@@ -198,6 +219,7 @@ class MsgQ:
         """
 
         self.setup_poller()
+        self.setup_signalsock()
         self.setup_listener()
 
         logger.debug(TRACE_START, MSGQ_LISTENER_STARTED);
@@ -496,6 +518,10 @@ class MsgQ:
             for (fd, event) in events:
                 if fd == self.listen_socket.fileno():
                     self.process_accept()
+                elif fd == self.__poller_sock.fileno():
+                    # If it's the signal socket, we should terminate now.
+                    self.running = False
+                    break
                 else:
                     if event & select.POLLOUT:
                         self.__process_write(fd)
@@ -515,6 +541,10 @@ class MsgQ:
             for event in events:
                 if event.ident == self.listen_socket.fileno():
                     self.process_accept()
+                elif event.ident == self.__poller_sock.fileno():
+                    # If it's the signal socket, we should terminate now.
+                    self.running = False
+                    break;
                 else:
                     if event.filter == select.KQ_FILTER_WRITE:
                         self.__process_write(event.ident)
@@ -526,12 +556,26 @@ class MsgQ:
                                          self.sockets[event.ident])
 
     def stop(self):
-        self.running = False
+        # Signal it should terminate.
+        self.__control_sock.close()
+        self.__control_sock = None
+
+    def cleanup_signalsock(self):
+        """Close the signal sockets. We could do it directly in shutdown,
+           but this part is reused in tests.
+        """
+        if self.__poller_sock:
+            self.__poller_sock.close()
+            self.__poller_sock = None
+        if self.__control_sock:
+            self.__control_sock.close()
+            self.__control_sock = None
 
     def shutdown(self):
         """Stop the MsgQ master."""
         logger.debug(TRACE_START, MSGQ_SHUTDOWN)
         self.listen_socket.close()
+        self.cleanup_signalsock()
         if os.path.exists(self.socket_file):
             os.remove(self.socket_file)
 
@@ -541,8 +585,7 @@ msgq = None
 
 def signal_handler(signal, frame):
     if msgq:
-        msgq.shutdown()
-    sys.exit(0)
+        msgq.stop()
 
 if __name__ == "__main__":
     def check_port(option, opt_str, value, parser):
@@ -583,8 +626,15 @@ if __name__ == "__main__":
         logger.fatal(MSGQ_START_FAIL, e)
         sys.exit(1)
 
+    # We run the processing in a separate thread. This is because we want to
+    # connect to the msgq ourself. But the cc library is unfortunately blocking
+    # in many places and waiting for the processing part to answer, it would
+    # deadlock.
+    poller_thread = threading.Thread(target=msgq.run)
+    poller_thread.daemon = True
     try:
-        msgq.run()
+        poller_thread.start()
+        poller_thread.join()
     except KeyboardInterrupt:
         pass
 

+ 10 - 1
src/bin/msgq/tests/msgq_test.py

@@ -101,7 +101,7 @@ class TestSubscriptionManager(unittest.TestCase):
         try:
             msgq.setup()
             self.assertTrue(os.path.exists(socket_file))
-            msgq.shutdown();
+            msgq.shutdown()
             self.assertFalse(os.path.exists(socket_file))
         except socket.error:
             # ok, the install path doesn't exist at all,
@@ -115,6 +115,8 @@ class TestSubscriptionManager(unittest.TestCase):
     def test_open_socket_bad(self):
         msgq = MsgQ("/does/not/exist")
         self.assertRaises(socket.error, msgq.setup)
+        # But we can clean up after that.
+        msgq.shutdown()
 
 class DummySocket:
     """
@@ -282,8 +284,10 @@ class SendNonblock(unittest.TestCase):
             if queue_pid == 0:
                 signal.alarm(120)
                 msgq.setup_poller()
+                msgq.setup_signalsock()
                 msgq.register_socket(queue)
                 msgq.run()
+                msgq.cleanup_signalsock()
             else:
                 try:
                     def killall(signum, frame):
@@ -357,6 +361,7 @@ class SendNonblock(unittest.TestCase):
         # Don't need a listen_socket
         msgq.listen_socket = DummySocket
         msgq.setup_poller()
+        msgq.setup_signalsock()
         msgq.register_socket(write)
         msgq.register_socket(control_write)
         # Queue the message for sending
@@ -384,6 +389,10 @@ class SendNonblock(unittest.TestCase):
         # Fail the test if it didn't stop
         self.assertFalse(msgq_thread.isAlive(), "Thread did not stop")
 
+        # Clean up some internals of msgq (usually called as part of
+        # shutdown, but we skip that one here)
+        msgq.cleanup_signalsock()
+
         # Check the exception from the thread, if any
         # First, if we didn't expect it; reraise it (to make test fail and
         # show the stacktrace for debugging)