Parcourir la source

[2932] Handle the notifications

Detect incoming notifications and handle them according to the
subscriptions.
Michal 'vorner' Vaner il y a 11 ans
Parent
commit
d5d77acc2f

+ 47 - 0
src/lib/config/ccsession.cc

@@ -611,6 +611,11 @@ ModuleCCSession::checkCommand() {
             return (0);
         }
 
+        // In case it is notification, eat it.
+        if (checkNotification(routing, data)) {
+            return (0);
+        }
+
         /* ignore result messages (in case we're out of sync, to prevent
          * pingpongs */
         if (data->getType() != Element::map ||
@@ -939,5 +944,47 @@ ModuleCCSession::unsubscribeNotification(const NotificationID& notification) {
     }
 }
 
+bool
+ModuleCCSession::checkNotification(const data::ConstElementPtr& envelope,
+                                   const data::ConstElementPtr& msg)
+{
+    if (msg->getType() != data::Element::map) {
+        // If it's not a map, then it's not a notification
+        return (false);
+    }
+    if (msg->contains(isc::cc::CC_PAYLOAD_NOTIFICATION)) {
+        // There's a notification inside. Extract its parameters.
+        const std::string& group =
+            envelope->get(isc::cc::CC_HEADER_GROUP)->stringValue();
+        const std::string& notification_group =
+            group.substr(std::string(isc::cc::CC_GROUP_NOTIFICATION_PREFIX).
+                         size());
+        const data::ConstElementPtr& notification =
+            msg->get(isc::cc::CC_PAYLOAD_NOTIFICATION);
+        // The first one is the event that happened
+        const std::string& event = notification->get(0)->stringValue();
+        // Any other params are second. But they may be missing
+        const data::ConstElementPtr params =
+            notification->size() == 1 ? data::ConstElementPtr() :
+            notification->get(1);
+        // Find the chain of notification callbacks
+        const SubscribedNotifications::iterator& chain_iter =
+            notifications_.find(notification_group);
+        if (chain_iter == notifications_.end()) {
+            // This means we no longer have any notifications for this group.
+            // This can happen legally as a race condition - if msgq sends
+            // us a notification, but we unsubscribe before we get to it
+            // in the input stream.
+            return (false);
+        }
+        BOOST_FOREACH(const NotificationCallback& callback,
+                      chain_iter->second) {
+            callback(event, params);
+        }
+        return (true);
+    }
+    return (false); // Not a notification
+}
+
 }
 }

+ 2 - 0
src/lib/config/ccsession.h

@@ -647,6 +647,8 @@ private:
     ///     otherwise.
     bool checkAsyncRecv(const data::ConstElementPtr& envelope,
                         const data::ConstElementPtr& msg);
+    bool checkNotification(const data::ConstElementPtr& envelope,
+                           const data::ConstElementPtr& msg);
     /// \brief Checks if a message with this envelope matches the request
     bool requestMatch(const AsyncRecvRequest& request,
                       const data::ConstElementPtr& envelope) const;

+ 7 - 12
src/lib/config/tests/ccsession_unittests.cc

@@ -115,13 +115,14 @@ TEST_F(CCSessionTest, receiveNotification) {
     EXPECT_TRUE(session.haveSubscription("notifications/group", "*"));
     EXPECT_TRUE(called.empty());
     // Send the notification
-    session.getMessages()->add(el("{"
+    const isc::data::ConstElementPtr msg = el("{"
         "       \"notification\": ["
         "           \"event\", {"
         "               \"param\": true"
         "           }"
         "       ]"
-        "   }"));
+        "   }");
+    session.addMessage(msg, "notifications/group", "*");
     mccs.checkCommand();
     ASSERT_EQ(2, called.size());
     EXPECT_EQ("first", called[0]);
@@ -132,23 +133,17 @@ TEST_F(CCSessionTest, receiveNotification) {
     // We are still subscribed to the group and handle the requests
     EXPECT_TRUE(session.haveSubscription("notifications/group", "*"));
     // Send the notification
-    session.getMessages()->add(el("{"
-        "       \"notification\": ["
-        "           \"event\", {"
-        "               \"param\": true"
-        "           }"
-        "       ]"
-        "   }"));
+    session.addMessage(msg, "notifications/group", "*");
     mccs.checkCommand();
     ASSERT_EQ(1, called.size());
     EXPECT_EQ("second", called[0]);
-    // Try to unsubscribe again. That should fail.
-    EXPECT_THROW(mccs.unsubscribeNotification(first), isc::InvalidParameter);
-    EXPECT_TRUE(session.haveSubscription("notifications/group", "*"));
     // Unsubscribe the other one too. That should cancel the upstream
     // subscription
     mccs.unsubscribeNotification(second);
     EXPECT_FALSE(session.haveSubscription("notifications/group", "*"));
+    // Nothing crashes if out of sync notification comes unexpected
+    session.addMessage(msg, "notifications/group", "*");
+    EXPECT_NO_THROW(mccs.checkCommand());
 }
 
 // Test we can send an RPC (command) and get an answer. The answer is success