Browse Source

Merge #2932

Receiving of notifications in C++.
Michal 'vorner' Vaner 11 years ago
parent
commit
da04b48073
3 changed files with 198 additions and 0 deletions
  1. 84 0
      src/lib/config/ccsession.cc
  2. 55 0
      src/lib/config/ccsession.h
  3. 59 0
      src/lib/config/tests/ccsession_unittests.cc

+ 84 - 0
src/lib/config/ccsession.cc

@@ -611,6 +611,11 @@ ModuleCCSession::checkCommand() {
             return (0);
         }
 
+        // In case it is notification, eat it.
+        if (checkNotification(routing, data)) {
+            return (0);
+        }
+
         /* ignore result messages (in case we're out of sync, to prevent
          * pingpongs */
         if (data->getType() != Element::map ||
@@ -902,5 +907,84 @@ ModuleCCSession::notify(const std::string& group, const std::string& name,
                  isc::cc::CC_TO_WILDCARD, false);
 }
 
+ModuleCCSession::NotificationID
+ModuleCCSession::subscribeNotification(const std::string& notification_group,
+                                       const NotificationCallback& callback)
+{
+    // Either insert a new empty list of callbacks or get an existing one.
+    // Either way, get the iterator for its position.
+    const std::pair<SubscribedNotifications::iterator, bool>& inserted =
+        notifications_.insert(
+            std::pair<std::string, NotificationCallbacks>(notification_group,
+                NotificationCallbacks()));
+    if (inserted.second) {
+        // It was newly inserted. In that case, we need to subscribe to the
+        // group.
+        session_.subscribe(isc::cc::CC_GROUP_NOTIFICATION_PREFIX +
+                           notification_group);
+    }
+    // Insert the callback to the chain
+    NotificationCallbacks& callbacks = inserted.first->second;
+    const NotificationCallbacks::iterator& callback_id =
+        callbacks.insert(callbacks.end(), callback);
+    // Just pack the iterators to form the ID
+    return (NotificationID(inserted.first, callback_id));
+}
+
+void
+ModuleCCSession::unsubscribeNotification(const NotificationID& notification) {
+    NotificationCallbacks& callbacks = notification.first->second;
+    // Remove the callback
+    callbacks.erase(notification.second);
+    // If it became empty, remove it from the map and unsubscribe
+    if (callbacks.empty()) {
+        session_.unsubscribe(isc::cc::CC_GROUP_NOTIFICATION_PREFIX +
+                             notification.first->first);
+        notifications_.erase(notification.first);
+    }
+}
+
+bool
+ModuleCCSession::checkNotification(const data::ConstElementPtr& envelope,
+                                   const data::ConstElementPtr& msg)
+{
+    if (msg->getType() != data::Element::map) {
+        // If it's not a map, then it's not a notification
+        return (false);
+    }
+    if (msg->contains(isc::cc::CC_PAYLOAD_NOTIFICATION)) {
+        // There's a notification inside. Extract its parameters.
+        const std::string& group =
+            envelope->get(isc::cc::CC_HEADER_GROUP)->stringValue();
+        const std::string& notification_group =
+            group.substr(std::string(isc::cc::CC_GROUP_NOTIFICATION_PREFIX).
+                         size());
+        const data::ConstElementPtr& notification =
+            msg->get(isc::cc::CC_PAYLOAD_NOTIFICATION);
+        // The first one is the event that happened
+        const std::string& event = notification->get(0)->stringValue();
+        // Any other params are second. But they may be missing
+        const data::ConstElementPtr params =
+            notification->size() == 1 ? data::ConstElementPtr() :
+            notification->get(1);
+        // Find the chain of notification callbacks
+        const SubscribedNotifications::iterator& chain_iter =
+            notifications_.find(notification_group);
+        if (chain_iter == notifications_.end()) {
+            // This means we no longer have any notifications for this group.
+            // This can happen legally as a race condition - if msgq sends
+            // us a notification, but we unsubscribe before we get to it
+            // in the input stream.
+            return (false);
+        }
+        BOOST_FOREACH(const NotificationCallback& callback,
+                      chain_iter->second) {
+            callback(event, params);
+        }
+        return (true);
+    }
+    return (false); // Not a notification
+}
+
 }
 }

+ 55 - 0
src/lib/config/ccsession.h

@@ -575,6 +575,57 @@ public:
     /// \param id The id of request as returned by groupRecvMsgAsync.
     void cancelAsyncRecv(const AsyncRecvRequestID& id);
 
+    /// \brief Called when a notification comes
+    ///
+    /// The callback should be exception-free. If it raises an exception,
+    /// it'll leak through the event loop up and probably terminate the
+    /// application.
+    ///
+    /// \param event_name The identification of event type.
+    /// \param params The parameters of the event. This may be NULL
+    ///     pointer in case no parameters were sent with the event.
+    typedef boost::function<void (const std::string& event_name,
+                                  const data::ConstElementPtr& params)>
+        NotificationCallback;
+
+    /// \brief Multiple notification callbacks for the same notification
+    typedef std::list<NotificationCallback> NotificationCallbacks;
+
+    /// \brief Mapping from groups to callbacks
+    typedef std::map<std::string, NotificationCallbacks>
+        SubscribedNotifications;
+
+    /// \brief Identification of single callback
+    typedef std::pair<SubscribedNotifications::iterator,
+                      NotificationCallbacks::iterator>
+        NotificationID;
+
+    /// \brief Subscribe to a notification group
+    ///
+    /// From now on, every notification that is sent to the given group
+    /// triggers the passed callback.
+    ///
+    /// There may be multiple (independent) callbacks for the same channel.
+    /// This one adds a new one, to the end of the chain (the callbacks
+    /// are called in the same order as they were registered).
+    ///
+    /// \param notification_group The channel of notifications.
+    /// \param callback The callback to be added.
+    /// \return ID of the notification callback. It is an opaque ID and can
+    ///     be used to remove this callback.
+    NotificationID subscribeNotification(const std::string& notification_group,
+                                         const NotificationCallback& callback);
+
+    /// \brief Unsubscribe the callback from its notification group.
+    ///
+    /// Express that the desire for this callback to be executed is no longer
+    /// relevant. All the other callbacks (even for the same notification
+    /// group) are left intact.
+    ///
+    /// \param notification The ID of notification callback returned by
+    ///     subscribeNotification.
+    void unsubscribeNotification(const NotificationID& notification);
+
     /// \brief Subscribe to a group
     ///
     /// Wrapper around the CCSession::subscribe.
@@ -634,6 +685,8 @@ private:
     ///     otherwise.
     bool checkAsyncRecv(const data::ConstElementPtr& envelope,
                         const data::ConstElementPtr& msg);
+    bool checkNotification(const data::ConstElementPtr& envelope,
+                           const data::ConstElementPtr& msg);
     /// \brief Checks if a message with this envelope matches the request
     bool requestMatch(const AsyncRecvRequest& request,
                       const data::ConstElementPtr& envelope) const;
@@ -643,6 +696,8 @@ private:
     isc::cc::AbstractSession& session_;
     ModuleSpec module_specification_;
     AsyncRecvRequests async_recv_requests_;
+    SubscribedNotifications notifications_;
+
     isc::data::ConstElementPtr handleConfigUpdate(
         isc::data::ConstElementPtr new_config);
 

+ 59 - 0
src/lib/config/tests/ccsession_unittests.cc

@@ -87,6 +87,65 @@ protected:
     const std::string root_name;
 };
 
+void
+notificationCallback(std::vector<std::string>* called,
+                     const std::string& id, const std::string& notification,
+                     const ConstElementPtr& params)
+{
+    called->push_back(id);
+    EXPECT_EQ("event", notification);
+    EXPECT_TRUE(el("{\"param\": true}")->equals(*params));
+}
+
+TEST_F(CCSessionTest, receiveNotification) {
+    // Not subscribed to the group yet
+    ModuleCCSession mccs(ccspecfile("spec1.spec"), session, NULL, NULL,
+                         false, false);
+    EXPECT_FALSE(session.haveSubscription("notifications/group", "*"));
+    std::vector<std::string> called;
+    // Subscribe to the notification. Twice.
+    const ModuleCCSession::NotificationID& first =
+        mccs.subscribeNotification("group", boost::bind(&notificationCallback,
+                                                        &called, "first",
+                                                        _1, _2));
+    const ModuleCCSession::NotificationID& second =
+        mccs.subscribeNotification("group", boost::bind(&notificationCallback,
+                                                        &called, "second",
+                                                        _1, _2));
+    EXPECT_TRUE(session.haveSubscription("notifications/group", "*"));
+    EXPECT_TRUE(called.empty());
+    // Send the notification
+    const isc::data::ConstElementPtr msg = el("{"
+        "       \"notification\": ["
+        "           \"event\", {"
+        "               \"param\": true"
+        "           }"
+        "       ]"
+        "   }");
+    session.addMessage(msg, "notifications/group", "*");
+    mccs.checkCommand();
+    ASSERT_EQ(2, called.size());
+    EXPECT_EQ("first", called[0]);
+    EXPECT_EQ("second", called[1]);
+    called.clear();
+    // Unsubscribe one of them
+    mccs.unsubscribeNotification(first);
+    // We are still subscribed to the group and handle the requests
+    EXPECT_TRUE(session.haveSubscription("notifications/group", "*"));
+    // Send the notification
+    session.addMessage(msg, "notifications/group", "*");
+    mccs.checkCommand();
+    ASSERT_EQ(1, called.size());
+    EXPECT_EQ("second", called[0]);
+    // Unsubscribe the other one too. That should cancel the upstream
+    // subscription
+    mccs.unsubscribeNotification(second);
+    EXPECT_FALSE(session.haveSubscription("notifications/group", "*"));
+    // Nothing crashes if out of sync notification comes unexpected
+    session.addMessage(msg, "notifications/group", "*");
+    EXPECT_NO_THROW(mccs.checkCommand());
+}
+
 // Test we can send an RPC (command) and get an answer. The answer is success
 // in this case.
 TEST_F(CCSessionTest, rpcCallSuccess) {