Browse Source

Merge #2931

Receiving notifications in python
Michal 'vorner' Vaner 12 years ago
parent
commit
73656261a3

+ 82 - 2
src/lib/python/isc/config/ccsession.py

@@ -220,6 +220,9 @@ class ModuleCCSession(ConfigData):
         self._remote_module_configs = {}
         self._remote_module_configs = {}
         self._remote_module_callbacks = {}
         self._remote_module_callbacks = {}
 
 
+        self._notification_callbacks = {}
+        self._last_notif_id = 0
+
         if handle_logging_config:
         if handle_logging_config:
             self.add_remote_config(path_search('logging.spec', bind10_config.PLUGIN_PATHS),
             self.add_remote_config(path_search('logging.spec', bind10_config.PLUGIN_PATHS),
                                    default_logconfig_handler)
                                    default_logconfig_handler)
@@ -294,8 +297,27 @@ class ModuleCCSession(ConfigData):
            configuration update. Calls the corresponding handler
            configuration update. Calls the corresponding handler
            functions if present. Responds on the channel if the
            functions if present. Responds on the channel if the
            handler returns a message."""
            handler returns a message."""
-        # should we default to an answer? success-by-default? unhandled error?
-        if msg is not None and not CC_PAYLOAD_RESULT in msg:
+        if msg is None:
+            return
+        if CC_PAYLOAD_NOTIFICATION in msg:
+            group_s = env[CC_HEADER_GROUP].split('/', 1)
+            # What to do with these bogus inputs? We just ignore them for now.
+            if len(group_s) != 2:
+                return
+            [prefix, group] = group_s
+            if prefix + '/' != CC_GROUP_NOTIFICATION_PREFIX:
+                return
+            # Now, get the callbacks and call one by one
+            callbacks = self._notification_callbacks.get(group, {})
+            event = msg[CC_PAYLOAD_NOTIFICATION][0]
+            params = None
+            if len(msg[CC_PAYLOAD_NOTIFICATION]) > 1:
+                params = msg[CC_PAYLOAD_NOTIFICATION][1]
+            for key in sorted(callbacks.keys()):
+                callbacks[key](event, params)
+        elif not CC_PAYLOAD_RESULT in msg:
+            # should we default to an answer? success-by-default? unhandled
+            # error?
             answer = None
             answer = None
             try:
             try:
                 module_name = env[CC_HEADER_GROUP]
                 module_name = env[CC_HEADER_GROUP]
@@ -575,6 +597,64 @@ class ModuleCCSession(ConfigData):
                                     to=CC_TO_WILDCARD,
                                     to=CC_TO_WILDCARD,
                                     want_answer=False)
                                     want_answer=False)
 
 
+    def subscribe_notification(self, notification_group, callback):
+        """
+        Subscribe to receive notifications in given notification group. When a
+        notification comes to the group, the callback is called with two
+        parameters, the name of the event (the value of `event_name` parameter
+        passed to `notify`) and the parameters of the event (the value
+        of `params` passed to `notify`).
+
+        This is a fast operation (there may be communication with the message
+        queue daemon, but it does not wait for any remote process).
+
+        The callback may get called multiple times (once for each notification).
+        It is possible to subscribe multiple callbacks for the same notification,
+        by multiple calls of this method, and they will be called in the order
+        of registration when the notification comes.
+
+        Throws:
+        - CCSessionError: for low-level communication errors.
+        Params:
+        - notification_group (string): Notification group to subscribe to.
+          Notification with the same value of the same parameter of `notify`
+          will be received.
+        - callback (callable): The callback to be called whenever the
+          notification comes.
+
+          The callback should not raise exceptions, such exceptions are
+          likely to propagate through the loop and terminate program.
+        Returns: Opaque id of the subscription. It can be used to cancel
+          the subscription by unsubscribe_notification.
+        """
+        self._last_notif_id += 1
+        my_id = self._last_notif_id
+        if notification_group in self._notification_callbacks:
+            self._notification_callbacks[notification_group][my_id] = callback
+        else:
+            self._session.group_subscribe(CC_GROUP_NOTIFICATION_PREFIX +
+                                          notification_group)
+            self._notification_callbacks[notification_group] = \
+                { my_id: callback }
+        return (notification_group, my_id)
+
+    def unsubscribe_notification(self, nid):
+        """
+        Remove previous subscription for notifications. Pass the id returned
+        from subscribe_notification.
+
+        Throws:
+        - CCSessionError: for low-level communication errors.
+        - KeyError: The id does not correspond to valid subscription.
+        """
+        (group, cid) = nid
+        del self._notification_callbacks[group][cid]
+        if not self._notification_callbacks[group]:
+            # Removed the last one
+            self._session.group_unsubscribe(CC_GROUP_NOTIFICATION_PREFIX +
+                                            group)
+            del self._notification_callbacks[group]
+
 class UIModuleCCSession(MultiConfigData):
 class UIModuleCCSession(MultiConfigData):
     """This class is used in a configuration user interface. It contains
     """This class is used in a configuration user interface. It contains
        specific functions for getting, displaying, and sending
        specific functions for getting, displaying, and sending

+ 71 - 0
src/lib/python/isc/config/tests/ccsession_test.py

@@ -376,6 +376,77 @@ class TestModuleCCSession(unittest.TestCase):
             ],
             ],
             fake_session.message_queue)
             fake_session.message_queue)
 
 
+    def test_notify_receive(self):
+        """
+        Test we can subscribe to notifications, receive them, unsubscribe, etc.
+        """
+        fake_session = FakeModuleCCSession()
+        mccs = self.create_session("spec1.spec", None, None, fake_session)
+        fake_session.group_sendmsg({"notification": ["event", {
+            "param": True
+        }]}, "notifications/group")
+        # Not subscribed to notifications -> not subscribed to
+        # 'notifications/group' -> message not eaten yet
+        mccs.check_command()
+        self.assertEqual(fake_session.message_queue, [['notifications/group',
+                         None, {'notification': ['event', {'param': True}]},
+                         False]])
+        # Place to log called notifications
+        notifications = []
+        def notified(tag, event, params):
+            notifications.append((tag, event, params))
+        # Subscribe to the notifications. Twice.
+        id1 = mccs.subscribe_notification('group',
+                                          lambda event, params:
+                                              notified("first", event, params))
+        id2 = mccs.subscribe_notification('group',
+                                          lambda event, params:
+                                              notified("second", event,
+                                              params))
+        # Now the message gets eaten because we are subscribed, and both
+        # callbacks are called.
+        mccs.check_command()
+        self.assertEqual(fake_session.message_queue, [])
+        self.assertEqual(notifications, [
+            ("first", "event", {'param': True}),
+            ("second", "event", {'param': True})
+        ])
+        del notifications[:]
+        # If a notification for different group comes, it is left untouched.
+        fake_session.group_sendmsg({"notification": ["event", {
+            "param": True
+        }]}, "notifications/other")
+        mccs.check_command()
+        self.assertEqual(notifications, [])
+        self.assertEqual(fake_session.message_queue, [['notifications/other',
+                         None, {'notification': ['event', {'param': True}]},
+                         False]])
+        del fake_session.message_queue[:]
+        # Unsubscribe one of the notifications and see that only the other
+        # is triggered.
+        mccs.unsubscribe_notification(id2)
+        fake_session.group_sendmsg({"notification": ["event", {
+            "param": True
+        }]}, "notifications/group")
+        mccs.check_command()
+        self.assertEqual(fake_session.message_queue, [])
+        self.assertEqual(notifications, [
+            ("first", "event", {'param': True})
+        ])
+        del notifications[:]
+        # If we try to unsubscribe again, it complains.
+        self.assertRaises(KeyError, mccs.unsubscribe_notification, id2)
+        # Unsubscribe the other one too. From now on, it doesn't eat the
+        # messages again.
+        mccs.unsubscribe_notification(id1)
+        fake_session.group_sendmsg({"notification": ["event", {
+            "param": True
+        }]}, "notifications/group")
+        mccs.check_command()
+        self.assertEqual(fake_session.message_queue, [['notifications/group',
+                         None, {'notification': ['event', {'param': True}]},
+                         False]])
+
     def my_config_handler_ok(self, new_config):
     def my_config_handler_ok(self, new_config):
         return isc.config.ccsession.create_answer(0)
         return isc.config.ccsession.create_answer(0)