Browse Source

[2398] Add second control socket pair to test msgq

And remove internal timeouts in msgq again; actions are now only triggered by available data, as in the original
Jelte Jansen 12 years ago
parent
commit
8e6846495d
2 changed files with 21 additions and 11 deletions
  1. 2 6
      src/bin/msgq/msgq.py.in
  2. 19 5
      src/bin/msgq/tests/msgq_test.py

+ 2 - 6
src/bin/msgq/msgq.py.in

@@ -493,9 +493,7 @@ class MsgQ:
             try:
             try:
                 # Poll with a timeout so that every once in a while,
                 # Poll with a timeout so that every once in a while,
                 # the loop checks for self.running.
                 # the loop checks for self.running.
-                # Timeout set to 2 seconds so as not to block too long,
-                # but also not cause too many loop cycles
-                events = self.poller.poll(2000)
+                events = self.poller.poll()
             except select.error as err:
             except select.error as err:
                 if err.args[0] == errno.EINTR:
                 if err.args[0] == errno.EINTR:
                     events = []
                     events = []
@@ -517,9 +515,7 @@ class MsgQ:
         while self.running:
         while self.running:
             # Check with a timeout so that every once in a while,
             # Check with a timeout so that every once in a while,
             # the loop checks for self.running.
             # the loop checks for self.running.
-            # Timeout set to 2 seconds so as not to block too long,
-            # but also not cause too many loop cycles
-            events = self.kqueue.control(None, 10, 2)
+            events = self.kqueue.control(None, 10)
             if not events:
             if not events:
                 raise RuntimeError('serve: kqueue returned no events')
                 raise RuntimeError('serve: kqueue returned no events')
 
 

+ 19 - 5
src/bin/msgq/tests/msgq_test.py

@@ -323,15 +323,23 @@ class SendNonblock(unittest.TestCase):
             data = data + data
             data = data + data
         self.send_many(data)
         self.send_many(data)
 
 
-    def do_send(self, write, read, expect_arrive=True,
-                expect_send_exception=None):
+    def do_send(self, write, read, control_write, control_read,
+                expect_arrive=True, expect_send_exception=None):
         """
         """
         Makes a msgq object that is talking to itself,
         Makes a msgq object that is talking to itself,
         run it in a separate thread so we can use and
         run it in a separate thread so we can use and
         test run().
         test run().
+        It is given two sets of connected sockets; write/read, and
+        control_write/control_read. The former may be throwing errors
+        and mangle data to test msgq. The second is mainly used to
+        send msgq the stop command.
+        (Note that the terms 'read' and 'write' are from the msgq
+        point of view, so the test itself writes to 'control_read')
         Parameters:
         Parameters:
         write: a socket that is used to send the data to
         write: a socket that is used to send the data to
         read: a socket that is used to read the data from
         read: a socket that is used to read the data from
+        control_write: a second socket for communication with msgq
+        control_read: a second socket for communication with msgq
         expect_arrive: if True, the read socket is read from, and the data
         expect_arrive: if True, the read socket is read from, and the data
                        that is read is expected to be the same as the data
                        that is read is expected to be the same as the data
                        that has been sent to the write socket.
                        that has been sent to the write socket.
@@ -348,6 +356,7 @@ class SendNonblock(unittest.TestCase):
         msgq.listen_socket = DummySocket
         msgq.listen_socket = DummySocket
         msgq.setup_poller()
         msgq.setup_poller()
         msgq.register_socket(write)
         msgq.register_socket(write)
+        msgq.register_socket(control_write)
         # Queue the message for sending
         # Queue the message for sending
         msgq.sendmsg(write, env, msg)
         msgq.sendmsg(write, env, msg)
 
 
@@ -366,14 +375,16 @@ class SendNonblock(unittest.TestCase):
             # still be working, so a stop command should also
             # still be working, so a stop command should also
             # be processed correctly
             # be processed correctly
             msg = msgq.preparemsg({"type" : "stop"})
             msg = msgq.preparemsg({"type" : "stop"})
-            read.sendall(msg)
+            control_read.sendall(msg)
         else:
         else:
             # OK, then bluntly call stop itself
             # OK, then bluntly call stop itself
             # First give it a chance to handle any remaining events.
             # First give it a chance to handle any remaining events.
             # 1 second arbitrarily chosen to hopefully be long enough
             # 1 second arbitrarily chosen to hopefully be long enough
             # yet not bog down the tests too much.
             # yet not bog down the tests too much.
-            msgq_thread.join(1.0)
+            #msgq_thread.join(1.0)
             # If it didn't crash, stop it now.
             # If it didn't crash, stop it now.
+            msg = msgq.preparemsg({"type" : "stop"})
+            control_read.sendall(msg)
             msgq.stop()
             msgq.stop()
 
 
         # Wait for thread to stop if it hasn't already.
         # Wait for thread to stop if it hasn't already.
@@ -409,10 +420,13 @@ class SendNonblock(unittest.TestCase):
                                send_exception is raised by BadSocket.
                                send_exception is raised by BadSocket.
         """
         """
         (write, read) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
         (write, read) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+        (control_write, control_read) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
         badwrite = BadSocket(write, raise_on_send, send_exception)
         badwrite = BadSocket(write, raise_on_send, send_exception)
-        self.do_send(badwrite, read, expect_answer, expect_send_exception)
+        self.do_send(badwrite, read, control_write, control_read, expect_answer, expect_send_exception)
         write.close()
         write.close()
         read.close()
         read.close()
+        control_write.close()
+        control_read.close()
 
 
     def test_send_raise_recoverable(self):
     def test_send_raise_recoverable(self):
         """
         """