Browse Source

[trac598_new] Rewrite the stop logic of forward query.

zhanglikun 14 years ago
parent
commit
32872f1e74
1 changed files with 49 additions and 30 deletions
  1. 49 30
      src/lib/resolve/recursive_query.cc

+ 49 - 30
src/lib/resolve/recursive_query.cc

@@ -778,10 +778,14 @@ private:
     asio::deadline_timer client_timer;
     asio::deadline_timer client_timer;
     asio::deadline_timer lookup_timer;
     asio::deadline_timer lookup_timer;
 
 
-    // If we have a client timeout, we send back an answer, but don't
-    // stop. We use this variable to make sure we don't send another
-    // answer if we do find one later (or if we have a lookup_timeout)
-    bool answer_sent_;
+    // Make FowardQuery deletes itself safely. for more information see
+    // the comments of outstanding_events in RunningQuery.
+    size_t outstanding_events_;
+
+    // If we have a client timeout, we call back with a failure message,
+    // but we do not stop yet. We use this variable to make sure we
+    // don't call back a second time later
+    bool callback_called_;
 
 
     // send the query to the server.
     // send the query to the server.
     void send(IOFetch::Protocol protocol = IOFetch::UDP) {
     void send(IOFetch::Protocol protocol = IOFetch::UDP) {
@@ -791,6 +795,7 @@ private:
         ConstQuestionPtr question = *(query_message_->beginQuestion());
         ConstQuestionPtr question = *(query_message_->beginQuestion());
         dlog("Sending upstream query (" + question->toText() +
         dlog("Sending upstream query (" + question->toText() +
              ") to " + upstream_->at(serverIndex).first);
              ") to " + upstream_->at(serverIndex).first);
+        ++outstanding_events_;
         // Forward the query, create the IOFetch with
         // Forward the query, create the IOFetch with
         // query message, so that query flags can be forwarded
         // query message, so that query flags can be forwarded
         // together.
         // together.
@@ -819,37 +824,57 @@ public:
         query_timeout_(query_timeout),
         query_timeout_(query_timeout),
         client_timer(io.get_io_service()),
         client_timer(io.get_io_service()),
         lookup_timer(io.get_io_service()),
         lookup_timer(io.get_io_service()),
-        answer_sent_(false)
+        outstanding_events_(0)
     {
     {
         // Setup the timer to stop trying (lookup_timeout)
         // Setup the timer to stop trying (lookup_timeout)
         if (lookup_timeout >= 0) {
         if (lookup_timeout >= 0) {
             lookup_timer.expires_from_now(
             lookup_timer.expires_from_now(
                 boost::posix_time::milliseconds(lookup_timeout));
                 boost::posix_time::milliseconds(lookup_timeout));
-            lookup_timer.async_wait(boost::bind(&ForwardQuery::stop, this, false));
+            ++outstanding_events_;
+            lookup_timer.async_wait(boost::bind(&ForwardQuery::lookupTimeout, this));
         }
         }
 
 
         // Setup the timer to send an answer (client_timeout)
         // Setup the timer to send an answer (client_timeout)
         if (client_timeout >= 0) {
         if (client_timeout >= 0) {
             client_timer.expires_from_now(
             client_timer.expires_from_now(
                 boost::posix_time::milliseconds(client_timeout));
                 boost::posix_time::milliseconds(client_timeout));
+            ++outstanding_events_;
             client_timer.async_wait(boost::bind(&ForwardQuery::clientTimeout, this));
             client_timer.async_wait(boost::bind(&ForwardQuery::clientTimeout, this));
         }
         }
 
 
         send();
         send();
     }
     }
 
 
+    virtual void lookupTimeout() {
+        callCallback(false);
+        assert(outstanding_events_ > 0);
+        --outstanding_events_;
+        stop();
+    }
+
     virtual void clientTimeout() {
     virtual void clientTimeout() {
-        // Return a SERVFAIL, but do not stop until
-        // we have an answer or timeout ourselves
-        isc::resolve::makeErrorMessage(answer_message_,
-                                       Rcode::SERVFAIL());
-        if (!answer_sent_) {
-            answer_sent_ = true;
-            resolvercallback_->success(answer_message_);
+        callCallback(false);
+        assert(outstanding_events_ > 0);
+        --outstanding_events_;
+        stop();
+    }
+
+    // If the callback has not been called yet, call it now
+    // If success is true, we call 'success' with our answer_message
+    // If it is false, we call failure()
+    void callCallback(bool success) {
+        if (!callback_called_) {
+            callback_called_ = true;
+            isc::resolve::makeErrorMessage(answer_message_, Rcode::SERVFAIL());
+            if (success) {
+                resolvercallback_->success(answer_message_);
+            } else {
+                resolvercallback_->failure();
+            }
         }
         }
     }
     }
 
 
-    virtual void stop(bool resume) {
+    virtual void stop() {
         // if we cancel our timers, we will still get an event for
         // if we cancel our timers, we will still get an event for
         // that, so we cannot delete ourselves just yet (those events
         // that, so we cannot delete ourselves just yet (those events
         // would be bound to a deleted object)
         // would be bound to a deleted object)
@@ -857,36 +882,30 @@ public:
         // here again.
         // here again.
         // same goes if we have an outstanding query (can't delete
         // same goes if we have an outstanding query (can't delete
         // until that one comes back to us)
         // until that one comes back to us)
-        if (resume && !answer_sent_) {
-            answer_sent_ = true;
-            resolvercallback_->success(answer_message_);
-        } else {
-            resolvercallback_->failure();
-        }
-        if (lookup_timer.cancel() != 0) {
-            return;
-        }
-        if (client_timer.cancel() != 0) {
+        lookup_timer.cancel();
+        client_timer.cancel();
+        if (outstanding_events_ > 0) {
             return;
             return;
+        } else {
+            delete this;
         }
         }
-
-        delete this;
     }
     }
 
 
     // This function is used as callback from DNSQuery.
     // This function is used as callback from DNSQuery.
     virtual void operator()(IOFetch::Result result) {
     virtual void operator()(IOFetch::Result result) {
         // XXX is this the place for TCP retry?
         // XXX is this the place for TCP retry?
+        assert(outstanding_events_ > 0);
+        --outstanding_events_;
         if (result != IOFetch::TIME_OUT) {
         if (result != IOFetch::TIME_OUT) {
             // we got an answer
             // we got an answer
             Message incoming(Message::PARSE);
             Message incoming(Message::PARSE);
             InputBuffer ibuf(buffer_->getData(), buffer_->getLength());
             InputBuffer ibuf(buffer_->getData(), buffer_->getLength());
             incoming.fromWire(ibuf);
             incoming.fromWire(ibuf);
             isc::resolve::copyResponseMessage(incoming, answer_message_);
             isc::resolve::copyResponseMessage(incoming, answer_message_);
-            stop(true);
-        } else {
-            // timeout, give up for now
-            stop(false);
+            callCallback(true);
         }
         }
+
+        stop();
     }
     }
 };
 };