Browse Source

[2922] Tests for sending notifications

Checking there are no problems with threads (and it seems there are).
Michal 'vorner' Vaner 12 years ago
parent
commit
4bed4a4983
1 changed files with 52 additions and 0 deletions
  1. 52 0
      src/bin/msgq/tests/msgq_run_test.py

+ 52 - 0
src/bin/msgq/tests/msgq_run_test.py

@@ -272,6 +272,58 @@ class MsgqRunTest(unittest.TestCase):
             conn.close()
             conn = new
 
+    def test_notifications(self):
+        """
+        Check that the MsgQ is actually sending notifications about events.
+        We create a socket, subscribe the socket itself and see it receives
+        it's own notification.
+
+        Testing all the places where notifications happen is task for the
+        common unit tests in msgq_test.py.
+
+        The test is here, because there might be some trouble with multiple
+        threads in msgq which would be hard to test using pure unit tests.
+        """
+        conn = self.__get_connection()
+        # Activate the session, pretend to be the config manager.
+        conn.group_subscribe("ConfigManager")
+        # Answer request for logging config
+        (msg, env) = conn.group_recvmsg(nonblock=False)
+        self.assertEqual({'command': ['get_config',
+                                      {'module_name': 'Logging'}]},
+                         msg)
+        conn.group_reply(env, {'result': [0, {}]})
+        # It sends its spec.
+        (msg, env) = conn.group_recvmsg(nonblock=False)
+        self.assertEqual('module_spec', msg['command'][0])
+        conn.group_reply(env, {'result': [0]})
+        # It asks for its own config
+        (msg, env) = conn.group_recvmsg(nonblock=False)
+        self.assertEqual({'command': ['get_config',
+                                      {'module_name': 'Msgq'}]},
+                         msg)
+        conn.group_reply(env, {'result': [0, {}]})
+        # Synchronization - make sure the session is running before
+        # we continue, so we get the notification. Similar synchronisation
+        # as in b10-init, but we don't have full ccsession here, so we
+        # do so manually.
+        synchronised = False
+        attempts = 100
+        while not synchronised:
+            time.sleep(0.1)
+            seq = conn.group_sendmsg({'command': ['Are you running?']},
+                                     'Msgq', want_answer=True)
+            msg = conn.group_recvmsg(nonblock=False, seq=seq)
+            synchronised = msg[0] != -1
+        self.assertTrue(synchronised)
+        # The actual test
+        conn.group_subscribe("notifications/cc_members")
+        (msg, env) = conn.group_recvmsg(nonblock=False)
+        self.assertEqual({'notification': ['subscribed', {
+            'client': conn.lname,
+            'group': 'notifications/cc_members'
+        }]}, msg)
+
 if __name__ == '__main__':
     isc.log.init("msgq-tests")
     isc.log.resetUnitTestRootLogger()