Parcourir la source

[1914] Check for async requests on a new message

We now check if the message looks like something a callback wants in the
checkCommand() call. For some reason, it does not seem to work with the
tests, it needs to be checked why.
Michal 'vorner' Vaner il y a 13 ans
Parent
commit
132d530f80
2 fichiers modifiés avec 76 ajouts et 1 suppressions
  1. 61 0
      src/lib/config/ccsession.cc
  2. 15 1
      src/lib/config/ccsession.h

+ 61 - 0
src/lib/config/ccsession.cc

@@ -601,6 +601,11 @@ ModuleCCSession::checkCommand() {
     ConstElementPtr cmd, routing, data;
     if (session_.group_recvmsg(routing, data, true)) {
 
+        // In case the message is wanted asynchronously, it gets used.
+        if (checkAsyncRecv(routing, data)) {
+            return (0);
+        }
+
         /* ignore result messages (in case we're out of sync, to prevent
          * pingpongs */
         if (data->getType() != Element::map || data->contains("result")) {
@@ -786,5 +791,61 @@ ModuleCCSession::groupRecvMsgAsync(const AsyncRecvCallback& callback,
     return (async_recv_requests_.insert(async_recv_requests_.end(), request));
 }
 
+bool
+ModuleCCSession::checkAsyncRecv(const ConstElementPtr& envelope,
+                                const ConstElementPtr& msg)
+{
+    for (AsyncRecvRequestID request(async_recv_requests_.begin());
+         request != async_recv_requests_.end(); ++ request) {
+        // Just go through all the requests and look for a matching one
+        if (requestMatch(**request, envelope)) {
+            // We want the request to be still alive at the time we
+            // call the callback. But we need to remove it on an exception
+            // too, so we use the class. If just C++ had the finally keyword.
+            class RequestDeleter {
+            public:
+                RequestDeleter(AsyncRecvRequests& requests,
+                               AsyncRecvRequestID& request) :
+                    requests_(requests),
+                    request_(request)
+                { }
+                ~ RequestDeleter() {
+                    requests_.erase(request_);
+                }
+            private:
+                AsyncRecvRequests requests_;
+                AsyncRecvRequestID request_;
+            } deleter(async_recv_requests_, request);
+            // Call the callback
+            (*request)->callback(envelope, msg, request);
+        }
+    }
+    return (false);
+}
+
+bool
+ModuleCCSession::requestMatch(const AsyncRecvRequest& request,
+                              const ConstElementPtr& envelope) const
+{
+    if (request.is_reply != envelope->contains("reply")) {
+        // Wrong type of message
+        return (false);
+    }
+    if (request.is_reply &&
+        (request.seq == -1 ||
+         request.seq == envelope->get("reply")->intValue())) {
+        // This is the correct reply
+        return (true);
+    }
+    if (!request.is_reply &&
+        (request.recipient == "" ||
+         request.recipient == envelope->get("group")->stringValue())) {
+        // This is the correct command
+        return (true);
+    }
+    // If nothing from the above, we don't want it
+    return (false);
+}
+
 }
 }

+ 15 - 1
src/lib/config/ccsession.h

@@ -372,7 +372,7 @@ public:
     typedef std::list<boost::shared_ptr<AsyncRecvRequest> > AsyncRecvRequests;
 
     /// \brief Identifier of single request for asynchronous read.
-    typedef AsyncRecvRequests::const_iterator AsyncRecvRequestID;
+    typedef AsyncRecvRequests::iterator AsyncRecvRequestID;
 
     /// \brief Callback which is called when an asynchronous receive finishes.
     ///
@@ -475,6 +475,20 @@ private:
     ModuleSpec readModuleSpecification(const std::string& filename);
     void startCheck();
     void sendStopping();
+    /// \brief Check if the message is wanted by asynchronous read
+    ///
+    /// It checks if any of the previously queued requests match
+    /// the message. If so, the callback is dispatched and removed.
+    ///
+    /// \param envelope The envelope of the message.
+    /// \param msg The actual message data.
+    /// \return True if the message was used for a callback, false
+    ///     otherwise.
+    bool checkAsyncRecv(const data::ConstElementPtr& envelope,
+                        const data::ConstElementPtr& msg);
+    /// \brief Checks if a message with this envelope matches the request
+    bool requestMatch(const AsyncRecvRequest& request,
+                      const data::ConstElementPtr& envelope) const;
 
     bool started_;
     std::string module_name_;