Browse Source

[2398] Introduce stop mechanism in msgqM

Introduced a lowlevel mechanism into msgq to stop it (other than ye olde SIGTERM); a lowlevel message 'stop' will cause it to stop listening and shut down. Needed to add timeouts to poll() and control() for that, but chose them relatively high (their loop should simply reloop if they time out)

This allows the threaded msgq testing code to be more reliable (and not having to daemonize and arbitrarily kill threads except in the case of major failures, in which case the test itself will fail)
Jelte Jansen 12 years ago
parent
commit
0ce073c9aa
2 changed files with 56 additions and 16 deletions
  1. 23 5
      src/bin/msgq/msgq.py.in
  2. 33 11
      src/bin/msgq/tests/msgq_test.py

+ 23 - 5
src/bin/msgq/msgq.py.in

@@ -127,6 +127,7 @@ class MsgQ:
         self.subs = SubscriptionManager()
         self.lnames = {}
         self.sendbuffs = {}
+        self.running = False
 
     def setup_poller(self):
         """Set up the poll thing.  Internal function."""
@@ -238,6 +239,7 @@ class MsgQ:
         self.subs.unsubscribe_all(sock)
         lname = [ k for k, v in self.lnames.items() if v == sock ][0]
         del self.lnames[lname]
+        sock.shutdown(socket.SHUT_RDWR)
         sock.close()
         del self.sockets[fd]
         if fd in self.sendbuffs:
@@ -315,6 +317,8 @@ class MsgQ:
         elif cmd == 'ping':
             # Command for testing purposes
             self.process_command_ping(sock, routing, data)
+        elif cmd == 'stop':
+            self.stop()
         else:
             sys.stderr.write("[b10-msgq] Invalid command: %s\n" % cmd)
 
@@ -469,6 +473,7 @@ class MsgQ:
 
     def run(self):
         """Process messages.  Forever.  Mostly."""
+        self.running = True
 
         if self.poller:
             self.run_poller()
@@ -476,9 +481,13 @@ class MsgQ:
             self.run_kqueue()
 
     def run_poller(self):
-        while True:
+        while self.running:
             try:
-                events = self.poller.poll()
+                # Poll with a timeout so that every once in a while,
+                # the loop checks for self.running.
+                # Timeout set to 2 seconds so as not to block too long,
+                # but also not cause too many loop cycles
+                events = self.poller.poll(2000)
             except select.error as err:
                 if err.args[0] == errno.EINTR:
                     events = []
@@ -491,12 +500,18 @@ class MsgQ:
                 else:
                     if event & select.POLLOUT:
                         self.__process_write(fd)
-                    if event & select.POLLIN:
+                    elif event & select.POLLIN:
                         self.process_socket(fd)
+                    else:
+                        print("[XX] UNKNOWN EVENT")
 
     def run_kqueue(self):
-        while True:
-            events = self.kqueue.control(None, 10)
+        while self.running:
+            # Check with a timeout so that every once in a while,
+            # the loop checks for self.running.
+            # Timeout set to 2 seconds so as not to block too long,
+            # but also not cause too many loop cycles
+            events = self.kqueue.control(None, 10, 2)
             if not events:
                 raise RuntimeError('serve: kqueue returned no events')
 
@@ -513,6 +528,9 @@ class MsgQ:
                         self.kill_socket(event.ident,
                                          self.sockets[event.ident])
 
+    def stop(self):
+        self.running = False
+
     def shutdown(self):
         """Stop the MsgQ master."""
         if self.verbose:

+ 33 - 11
src/bin/msgq/tests/msgq_test.py

@@ -180,16 +180,19 @@ class MsgQThread(threading.Thread):
     def __init__(self, msgq):
         threading.Thread.__init__(self)
         self.msgq_ = msgq
-        self.stop = False
         self.caught_exception = None
+        self.lock = threading.Lock()
 
     def run(self):
         try:
-            while not self.stop:
-                self.msgq_.run()
+            self.msgq_.run()
         except Exception as exc:
+            # Store the exception to make the test fail if necessary
             self.caught_exception = exc
 
+    def stop(self):
+        self.msgq_.stop()
+
 
 class SendNonblock(unittest.TestCase):
     """
@@ -352,7 +355,6 @@ class SendNonblock(unittest.TestCase):
         # Run it in a thread
         msgq_thread = MsgQThread(msgq)
         # If we're done, just kill it
-        msgq_thread.daemon = True
         msgq_thread.start()
 
         if expect_arrive:
@@ -361,12 +363,35 @@ class SendNonblock(unittest.TestCase):
             self.assertEqual(env, recv_env)
             self.assertEqual(msg, recv_msg)
 
-        # Give it a chance to stop, if it doesn't, no problem, it'll
-        # die when the program does
-        msgq_thread.join(0.2)
+            # expect_arrive also suggests everything should
+            # still be working, so a stop command should also
+            # be processed correctly
+            msg = msgq.preparemsg({"type" : "stop"})
+            read.sendall(msg)
+        else:
+            # OK, then bluntly call stop itself
+            # First give it a chance to handle any remaining events.
+            # 1 second arbitrarily chosen to hopefully be long enough
+            # yet not bog down the tests too much.
+            msgq_thread.join(1.0)
+            msgq.stop()
+
+        # Wait for thread to stop if it hasn't already
+        # Put in a (long) timeout; the thread *should* stop, but if it
+        # does not, we don't want the test to hang forever
+        msgq_thread.join(60)
+        # Fail the test if it didn't stop
+        self.assertFalse(msgq_thread.isAlive(), "Thread did not stop")
 
         # Check the exception from the thread, if any
-        self.assertEqual(expect_send_exception, msgq_thread.caught_exception)
+        # First, if we didn't expect it; reraise it (to make test fail and
+        # show the stacktrace for debugging)
+        if expect_send_exception is None:
+            if msgq_thread.caught_exception is not None:
+                raise msgq_thread.caught_exception
+        else:
+            # If we *did* expect it, fail it there was none
+            self.assertIsNotNone(msgq_thread.caught_exception)
 
     def do_send_with_send_error(self, raise_on_send, send_exception,
                                 expect_answer=True,
@@ -384,9 +409,6 @@ class SendNonblock(unittest.TestCase):
                                send_exception is raised by BadSocket.
         """
         (write, read) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
-        # prevent the test from hanging if something goes wrong
-        read.settimeout(0.2)
-        write.settimeout(0.2)
         badwrite = BadSocket(write, raise_on_send, send_exception)
         self.do_send(badwrite, read, expect_answer, expect_send_exception)
         write.close()