Browse Source

[1927] Test msgq normal operations

The success cases of sending, receiving, subscribing, etc.
Michal 'vorner' Vaner 12 years ago
parent
commit
c8b7d82c25
2 changed files with 157 additions and 4 deletions
  1. 1 0
      src/bin/msgq/msgq.py.in
  2. 156 4
      src/bin/msgq/tests/msgq_run_test.py

+ 1 - 0
src/bin/msgq/msgq.py.in

@@ -42,6 +42,7 @@ import isc.cc
 isc.util.process.rename()
 
 isc.log.init("b10-msgq", buffer=True)
+isc.log.resetUnitTestRootLogger()
 # Logger that is used in the actual msgq handling - startup, shutdown and the
 # poller thread.
 logger = isc.log.Logger("msgq")

+ 156 - 4
src/bin/msgq/tests/msgq_run_test.py

@@ -40,6 +40,8 @@ import subprocess
 import time
 
 import isc.log
+import isc.cc.session
+from isc.cc.proto_defs import *
 
 SOCKET_PATH = os.path.abspath(os.environ['BIND10_TEST_SOCKET_FILE'])
 MSGQ_PATH = os.environ['B10_FROM_BUILD'] + '/src/bin/msgq/run_msgq.sh'
@@ -53,6 +55,7 @@ class MsgqRunTest(unittest.TestCase):
         and wait for it to start.
         """
         self.__msgq = None
+        self.__opened_connections = []
         # A precondition check
         self.assertFalse(os.path.exists(SOCKET_PATH))
         signal.alarm(TIMEOUT)
@@ -71,6 +74,15 @@ class MsgqRunTest(unittest.TestCase):
         while not os.path.exists(SOCKET_PATH):
             # Just a short wait, so we don't hog CPU, but don't wait too long
             time.sleep(0.01)
+        # Some testing data
+        self.__no_recpt = {"result": [-1, "No such recipient"]}
+
+    def __message(self, data):
+        """
+        Provide some testing message. The data will be included in it, so
+        several different messages can be created.
+        """
+        return {"Message": "Text", "Data": data}
 
     def tearDown(self):
         """
@@ -93,6 +105,9 @@ class MsgqRunTest(unittest.TestCase):
         Kill msgq (if running) and restore original signal handlers.
         """
         # Remove the socket (as we kill, msgq might not clean up)
+        for conn in self.__opened_connections:
+            conn.close()
+        self.__opened_connections = []
         if self.__msgq:
             self.__msgq.kill()
             self.__msgq = None
@@ -103,13 +118,150 @@ class MsgqRunTest(unittest.TestCase):
         # Cancel timeout (so someone else is not hit by it)
         signal.alarm(0)
 
-    def test_empty_run(self):
+    def __get_connection(self):
+        """
+        Create a connection to the daemon and make sure it is properly closed
+        at the end of the test.
+        """
+        connection = isc.cc.session.Session(SOCKET_PATH)
+        self.__opened_connections.append(connection)
+        return connection
+
+    def test_send_direct(self):
+        """
+        Connect twice to msgq, send a message from one to another using direct
+        l-name and see it comes.
+        """
+        # Create the connections
+        conn1 = self.__get_connection()
+        conn2 = self.__get_connection()
+        # Send the message
+        lname1 = conn1.lname
+        conn2.group_sendmsg(self.__message(1), "*", to=lname1)
+        # Receive the message and see it contains correct data
+        (msg, env) = conn1.group_recvmsg(nonblock=False)
+        self.assertEqual(self.__message(1), msg)
+        # We don't check there are no extra headers, just that none are missing
+        # or wrong.
+        self.assertEqual(lname1, env[CC_HEADER_TO])
+        self.assertEqual(conn2.lname, env[CC_HEADER_FROM])
+        self.assertEqual("*", env[CC_HEADER_GROUP])
+        self.assertEqual(CC_INSTANCE_WILDCARD, env[CC_HEADER_INSTANCE])
+        self.assertEqual(CC_COMMAND_SEND, env[CC_HEADER_TYPE])
+        self.assertFalse(env[CC_HEADER_WANT_ANSWER])
+
+    def __barrier(self, connections):
+        """
+        Make sure all previous commands on all supplied connections are
+        processed, by sending a ping and waiting for an answer.
+        """
+        for c in connections:
+            c.sendmsg({"type": "ping"})
+        for c in connections:
+            pong = c.recvmsg(nonblock=False)
+            self.assertEqual(({"type": "pong"}, None), pong)
+
+    def test_send_group(self):
+        """
+        Create several connections. First, try to send a message to a (empty)
+        group and see an error is bounced back. Then subscribe the others
+        to the group and send it again. Send to a different group and see it
+        bounced back. Unsubscribe and see it is bounced again.
+
+        Then the other connections answer (after unsubscribing, strange, but
+        legal). See both answers come.
+
+        Then, look there are no more waiting messages.
+        """
+        conn_a = self.__get_connection()
+        conn_b = []
+        for i in range(0, 10):
+            conn_b.append(self.__get_connection())
+        # Send a message to empty group and get an error answer
+        seq = conn_a.group_sendmsg(self.__message(1), "group",
+                                   want_answer=True)
+        (msg, env) = conn_a.group_recvmsg(nonblock=False, seq=seq)
+        self.assertEqual(self.__no_recpt, msg)
+        self.assertEqual(conn_a.lname, env[CC_HEADER_TO])
+        # Subscribe the two connections
+        for c in conn_b:
+            c.group_subscribe("group")
+        # The subscribe doesn't wait for answer, so make sure it is
+        # all processed before continuing.
+        self.__barrier(conn_b)
+        # Send a message to the group (this time not empty)
+        seq = conn_a.group_sendmsg(self.__message(2), "group",
+                                   want_answer=True)
+        envs = []
+        for c in conn_b:
+            (msg, env) = c.group_recvmsg(nonblock=False)
+            self.assertEqual(self.__message(2), msg)
+            self.assertEqual(conn_a.lname, env[CC_HEADER_FROM])
+            # The daemon does not mangle the headers. Is it OK?
+            self.assertEqual(CC_TO_WILDCARD, env[CC_HEADER_TO])
+            self.assertEqual("group", env[CC_HEADER_GROUP])
+            self.assertEqual(CC_INSTANCE_WILDCARD, env[CC_HEADER_INSTANCE])
+            self.assertEqual(CC_COMMAND_SEND, env[CC_HEADER_TYPE])
+            self.assertTrue(env[CC_HEADER_WANT_ANSWER])
+            envs.append(env)
+        # Send to non-existing group
+        seq_ne = conn_a.group_sendmsg(self.__message(3), "no-group",
+                                      want_answer=True)
+        (msg, env) = conn_a.group_recvmsg(nonblock=False, seq=seq_ne)
+        self.assertEqual(self.__no_recpt, msg)
+        self.assertEqual(conn_a.lname, env[CC_HEADER_TO])
+        # Unsubscribe the connections
+        for c in conn_b:
+            c.group_unsubscribe("group")
+        # Synchronize the unsubscriptions
+        self.__barrier(conn_b)
+        seq_ne = conn_a.group_sendmsg(self.__message(4), "group",
+                                      want_answer=True)
+        (msg, env) = conn_a.group_recvmsg(nonblock=False, seq=seq_ne)
+        self.assertEqual(self.__no_recpt, msg)
+        self.assertEqual(conn_a.lname, env[CC_HEADER_TO])
+        # Send answers for the original message that was delivered
+        lnames = set()
+        for (c, env) in zip(conn_b, envs):
+            c.group_reply(env, self.__message("Reply"))
+            lnames.add(c.lname)
+        # Check the both answers come
+        while lnames:
+            # While there are still connections we didn't get the answer from
+            # (the order is not guaranteed, therefore the juggling with set)
+            (msg, env) = conn_a.group_recvmsg(nonblock=False, seq=seq)
+            self.assertEqual(self.__message("Reply"), msg)
+            lname = env[CC_HEADER_FROM]
+            self.assertTrue(lname in lnames)
+            lnames.remove(lname)
+
+        # Wait a short time, so messages sent where they don't belong get some
+        # time to arrive. They could still be late, so the test could still
+        # pass on a slow machine, but an unreliable test is better than none
+        # at all. Any idea how to do it in a better way?
+        time.sleep(0.1)
+        for c in conn_b:
+            self.assertEqual((None, None), c.group_recvmsg())
+        self.assertEqual((None, None), conn_a.group_recvmsg())
+
+    def test_conn_disconn(self):
         """
-        Temporary empty test, to see if we can start and stop msgq.
-        Testing the test harness.
+        Keep connecting and disconnecting, checking we can still send
+        and receive messages.
         """
-        pass
+        conn = self.__get_connection()
+        conn.group_subscribe("group")
+        for i in range(0, 50):
+            new = self.__get_connection()
+            new.group_subscribe("group")
+            self.__barrier([conn, new])
+            new.group_sendmsg(self.__message(i), "group")
+            (msg, env) = conn.group_recvmsg(nonblock=False)
+            self.assertEqual(self.__message(i), msg)
+            conn.close()
+            conn = new
 
 if __name__ == '__main__':
+    isc.log.init("msgq-tests")
     isc.log.resetUnitTestRootLogger()
     unittest.main()