Browse Source

Tests for non-blocking msgq

* Tests that try sending data and expect to time out.
* Test that tries to repeatedly send a command to msgq and expects it to
  be answered (to see that the sending does not only timeout, but works
  when the data are read as well).
Michal 'vorner' Vaner 14 years ago
parent
commit
a1210be107
2 changed files with 98 additions and 2 deletions
  1. 6 0
      src/bin/msgq/msgq.py.in
  2. 92 2
      src/bin/msgq/tests/msgq_test.py

+ 6 - 0
src/bin/msgq/msgq.py.in

@@ -187,6 +187,12 @@ class MsgQ:
         # TODO: When we have logging, we might want
         # to add a debug message here that a new connection
         # was made
+        self.register_socket(self, newsocket)
+
+    def register_socket(self, newsocket):
+        """
+        Internal function to insert a socket. Used by process_accept and some tests.
+        """
         self.sockets[newsocket.fileno()] = newsocket
         lname = self.newlname()
         self.lnames[lname] = newsocket

+ 92 - 2
src/bin/msgq/tests/msgq_test.py

@@ -3,10 +3,12 @@ from msgq import SubscriptionManager, MsgQ
 import unittest
 import os
 import socket
+import signal
+import sys
 
 #
-# Currently only the subscription part is implemented...  I'd have to mock
-# out a socket, which, while not impossible, is not trivial.
+# Currently only the subscription part and some sending is implemented...
+# I'd have to mock out a socket, which, while not impossible, is not trivial.
 #
 
 class TestSubscriptionManager(unittest.TestCase):
@@ -108,5 +110,93 @@ class TestSubscriptionManager(unittest.TestCase):
         msgq = MsgQ("/does/not/exist")
         self.assertRaises(socket.error, msgq.setup)
 
+class TestPassed(Exception): pass
+
+class SendNonblock(unittest.TestCase):
+    """
+    Tests that the whole thing will not get blocked if someone does not read.
+    """
+
+    def terminate_check(self, task, timeout = 1):
+        """
+        Runs task in separate process (task is a function) and checks
+        it terminates sooner than timeout.
+        """
+        task_pid = os.fork()
+        if task_pid == 0:
+            # Kill the forked process after timeout by SIGALRM
+            signal.alarm(timeout)
+            # Run the task
+            # If an exception happens or we run out of time, we terminate
+            # with non-zero
+            task()
+            # If we got here, then everything worked well and in time
+            # In that case, we terminate successfully
+            sys.exit()
+        else:
+            (pid, status) = os.waitpid(task_pid, 0)
+            self.assertEqual(0, status,
+                "The task did not complete successfully in time")
+
+    def infinite_sender(self, sender):
+        """
+        Sends data until an exception happens. socket.error is caught,
+        as it means the socket got closed. Sender is called to actually
+        send the data.
+        """
+        msgq = MsgQ()
+        # We do only partial setup, so we don't create the listening socket
+        msgq.setup_poller()
+        (read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+        msgq.register_socket(write)
+        # Keep sending while it is not closed by the msgq
+        try:
+            while True:
+                sender(msgq, write)
+        except (socket.error, TestPassed):
+            pass
+
+    def test_infinite_sendmsg(self):
+        """
+        Tries sending messages (and not reading them) until it either times
+        out (in blocking call, wrong) or closes it (correct).
+        """
+        self.terminate_check(lambda: self.infinite_sender(
+            lambda msgq, socket: msgq.sendmsg(socket, {}, {"message":"x"})))
+
+    def test_infinite_sendprepared(self):
+        """
+        Tries sending data (and not reading them) until it either times
+        out (in blocking call, wrong) or closes it (correct).
+        """
+        self.terminate_check(lambda: self.infinite_sender(
+            lambda msgq, socket: msgq.send_prepared_msg(socket, b"data")))
+
+    def test_send_works(self):
+        """
+        Tries that sending a command many times and getting an answer works.
+        """
+        msgq = MsgQ()
+        msgq.setup_poller()
+        # msgq.run needs to compare with the listen_socket, so we provide
+        # a replacement
+        class DummySocket:
+            def fileno():
+                return -1
+        msgq.listen_socket = DummySocket
+        (queue, out) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+        def run():
+            queue_pid = os.fork()
+            if queue_pid == 0:
+                msgq.register_socket(queue)
+                msgq.run()
+            else:
+                msg = msgq.preparemsg({"type":"getlname"},{})
+                for i in range(1, 1000):
+                    out.send(msg)
+                    out.recv(1024)
+                os.kill(queue_pid, signal.SIGTERM)
+        self.terminate_check(run)
+
 if __name__ == '__main__':
     unittest.main()