Browse Source

[trac495] Keep track of 'outstanding events' instead of separate things

this should fix the timeout problem
Jelte Jansen 14 years ago
parent
commit
895e8df77e
2 changed files with 102 additions and 65 deletions
  1. 57 63
      src/lib/resolve/recursive_query.cc
  2. 45 2
      src/lib/resolve/tests/recursive_query_unittest.cc

+ 57 - 63
src/lib/resolve/recursive_query.cc

@@ -94,7 +94,8 @@ public:
         // Drop query or send servfail?
         rq_->nsasCallbackCalled();
         rq_->makeSERVFAIL();
-        rq_->stop(false);
+        rq_->callCallback(true);
+        rq_->stop();
     }
 
 private:
@@ -148,6 +149,7 @@ private:
     asio::deadline_timer client_timer;
     bool client_timer_canceled_;
     asio::deadline_timer lookup_timer;
+    bool lookup_timer_canceled_;
 
     size_t queries_out_;
     
@@ -157,7 +159,7 @@ private:
     // If we have a client timeout, we send back an answer, but don't
     // stop. We use this variable to make sure we don't send another
     // answer if we do find one later (or if we have a lookup_timeout)
-    bool answer_sent_;
+    bool callback_called_;
 
     // Reference to our NSAS
     isc::nsas::NameserverAddressStore& nsas_;
@@ -173,6 +175,8 @@ private:
     bool nsas_callback_out_;
     isc::nsas::NameserverAddress current_ns_address;
     struct timeval current_ns_qsent_time;
+    
+    size_t outstanding_events_;
 
     // perform a single lookup; first we check the cache to see
     // if we have a response for our query stored already. if
@@ -185,7 +189,8 @@ private:
                           question_.getClass(), cached_message)) {
             dlog("Message found in cache, returning that");
             if (handleRecursiveAnswer(cached_message)) {
-                stop(true);
+                callCallback(true);
+                stop();
             }
         } else {
             cur_zone_ = ".";
@@ -200,6 +205,7 @@ private:
         current_ns_address = address;
         gettimeofday(&current_ns_qsent_time, NULL);
         ++queries_out_;
+        ++outstanding_events_;
         IOFetch query(IPPROTO_UDP, io_, question_,
             current_ns_address.getAddress(),
             53, buffer_, this,
@@ -216,6 +222,7 @@ private:
             dlog("Sending upstream query (" + question_.toText() +
                 ") to " + upstream_->at(serverIndex).first);
             ++queries_out_;
+            ++outstanding_events_;
             IOFetch query(IPPROTO_UDP, io_, question_,
                 upstream_->at(serverIndex).first,
                 upstream_->at(serverIndex).second, buffer_, this,
@@ -227,7 +234,6 @@ private:
             dlog("Look up nameserver for " + cur_zone_ + " in NSAS");
             // Can we have multiple calls to nsas_out? Let's assume not
             // for now
-            std::cout << "[XX] NSASLOOKUP " << this << " for " << cur_zone_ << std::endl;
             assert(!nsas_callback_out_);
             nsas_callback_out_ = true;
             nsas_.lookup(cur_zone_, question_.getClass(), nsas_callback_);
@@ -235,7 +241,6 @@ private:
     }
     
     void nsasCallbackCalled() {
-        std::cout << "[XX] NSASLOOKUP DONE " << this << " for " << cur_zone_ << std::endl;
         nsas_callback_out_ = false;
     }
 
@@ -395,64 +400,63 @@ public:
         client_timer(io.get_io_service()),
         client_timer_canceled_(false),
         lookup_timer(io.get_io_service()),
+        lookup_timer_canceled_(false),
         queries_out_(0),
         done_(false),
-        answer_sent_(false),
+        callback_called_(false),
         nsas_(nsas),
         cache_(cache),
         nsas_callback_(boost::shared_ptr<ResolverNSASCallback>(
                                      new ResolverNSASCallback(this))),
-        nsas_callback_out_(false)
+        nsas_callback_out_(false),
+        outstanding_events_(0)
     {
         // Setup the timer to stop trying (lookup_timeout)
         if (lookup_timeout >= 0) {
             lookup_timer.expires_from_now(
                 boost::posix_time::milliseconds(lookup_timeout));
-            lookup_timer.async_wait(boost::bind(&RunningQuery::stop, this, false));
+            ++outstanding_events_;
+            lookup_timer.async_wait(boost::bind(&RunningQuery::lookupTimeout, this));
         }
         
         // Setup the timer to send an answer (client_timeout)
         if (client_timeout >= 0) {
             client_timer.expires_from_now(
                 boost::posix_time::milliseconds(client_timeout));
+            ++outstanding_events_;
             client_timer.async_wait(boost::bind(&RunningQuery::clientTimeout, this));
         }
         
         doLookup();
     }
 
-    virtual void clientTimeout() {
-        dlog("[XX] client timer fired");
-        // Return a SERVFAIL, but do not stop until
-        // we have an answer or timeout ourselves
-        if (!answer_sent_) {
-            dlog("[XX] answer not sent yet");
-            answer_sent_ = true;
+    // called if we have a lookup timeout; if our callback has
+    // not been called, call it now. Then stop.
+    void lookupTimeout() {
+        if (!callback_called_) {
             makeSERVFAIL();
-            resolvercallback_->success(answer_message_);
+            callCallback(true);
         }
-        // if we got here because we canceled it in stop(), we
-        // need to go back to stop()
-        if (client_timer_canceled_) {
-            dlog("[XX] fired due to cancellation");
-            stop(false);
+        --outstanding_events_;
+        stop();
+    }
+    
+    // called if we have a client timeout; if our callback has
+    // not been called, call it now. But do not stop.
+    void clientTimeout() {
+        if (!callback_called_) {
+            makeSERVFAIL();
+            callCallback(true);
         }
+        --outstanding_events_;
     }
 
-    virtual void stop(bool resume) {
-        dlog("[XX] stop() called");
-        // if we cancel our timers, we will still get an event for
-        // that, so we cannot delete ourselves just yet (those events
-        // would be bound to a deleted object)
-        // cancel them one by one, both cancels should get us back
-        // here again.
-        // same goes if we have an outstanding query (can't delete
-        // until that one comes back to us)
-        done_ = true;
-        dlog("[XX] stop() called1");
-        if (!answer_sent_) {
-            dlog("[XX] no answer sent yet");
-            answer_sent_ = true;
+    // If the callback has not been called yet, call it now
+    // If success is true, we call 'success' with our answer_message
+    // If it is false, we call failure()
+    void callCallback(bool success) {
+        if (!callback_called_) {
+            callback_called_ = true;
 
             // There are two types of messages we could store in the
             // cache;
@@ -471,46 +475,34 @@ public:
             // stores Messages on their question section only, this
             // does mean that we overwrite the messages we stored in
             // the previous iteration if we are following a delegation.
-            if (resume) {
-                cache_.update(*answer_message_);
-    
+            if (success) {
                 resolvercallback_->success(answer_message_);
             } else {
                 resolvercallback_->failure();
             }
-            return;
-        }
-        dlog("[XX] stop() called2");
-        if (lookup_timer.cancel() != 0) {
-            dlog("[XX] lookup timer canceled");
-            return;
-        }
-        dlog("[XX] stop() called3");
-        if (client_timer.cancel() != 0) {
-            dlog("[XX] client timer canceled");
-            client_timer_canceled_ = true;
-            return;
-        } else {
-            dlog("[XX] no client timer anymore");
         }
-        dlog("[XX] continuing");
-        if (queries_out_ > 0) {
-            dlog("[XX] still one or more queries out");
-            return;
-        }
-        dlog("[XX] stop() called4");
+    }
+
+    void stop() {
+        done_ = true;
         if (nsas_callback_out_) {
             nsas_.cancel(cur_zone_, question_.getClass(), nsas_callback_);
             nsas_callback_out_ = false;
         }
-        dlog("Recursive query stopped, deleting");
-        delete this;
+        client_timer.cancel();
+        lookup_timer.cancel();
+        if (outstanding_events_ > 0) {
+            return;
+        } else {
+            delete this;
+        }
     }
 
     // This function is used as callback from DNSQuery.
     virtual void operator()(IOFetch::Result result) {
         // XXX is this the place for TCP retry?
         --queries_out_;
+        --outstanding_events_;
         
         if (!done_ && result != IOFetch::TIME_OUT) {
             // we got an answer
@@ -544,7 +536,8 @@ public:
             }
             
             if (done_) {
-                stop(true);
+                callCallback(true);
+                stop();
             }
         } else if (!done_ && retries_--) {
             // We timed out, but we have some retries, so send again
@@ -559,10 +552,11 @@ public:
             if (recursive_mode()) {
                 current_ns_address.updateRTT(isc::nsas::AddressEntry::UNREACHABLE);
             }
-            if (!answer_sent_) {
+            if (!callback_called_) {
                 makeSERVFAIL();
+                callCallback(true);
             }
-            stop(!answer_sent_);
+            stop();
         }
     }
     

+ 45 - 2
src/lib/resolve/tests/recursive_query_unittest.cc

@@ -753,12 +753,55 @@ TEST_F(RecursiveQueryTest, forwardLookupTimeout) {
     int num = 0;
     bool read_success = tryRead(sock_, recv_options, 5, &num);
 
-    // The query should fail
-    EXPECT_FALSE(done);
+    // The query should fail and respond with an error
+    EXPECT_TRUE(done);
     EXPECT_EQ(3, num);
     EXPECT_FALSE(read_success);
 }
 
+// Set everything very low and see if this doesn't cause weird
+// behaviour
+TEST_F(RecursiveQueryTest, lowtimeouts) {
+    // Prepare the service (we do not use the common setup, we do not answer
+    setDNSService();
+
+    // Prepare the socket
+    sock_ = createTestSocket();
+
+    // Prepare the server
+    bool done(true);
+    MockServerStop server(*io_service_, &done);
+
+    MessagePtr answer(new Message(Message::RENDER));
+
+    // Do the answer
+    const uint16_t port = boost::lexical_cast<uint16_t>(TEST_CLIENT_PORT);
+    // Set up the test so that it will retry 5 times, but the lookup
+    // timeout will fire after only 3 normal timeouts
+    RecursiveQuery query(*dns_service_,
+                         *nsas_, cache_,
+                         singleAddress(TEST_IPV4_ADDR, port),
+                         singleAddress(TEST_IPV4_ADDR, port),
+                         1, 1, 1, 1);
+    Question question(Name("example.net"), RRClass::IN(), RRType::A());
+    OutputBufferPtr buffer(new OutputBuffer(0));
+    query.resolve(question, answer, buffer, &server);
+
+    // Run the test
+    io_service_->run();
+
+    int recv_options = setSocketTimeout(sock_, 1, 0);
+
+    // Try to read 5 times, should stop after 3 reads
+    int num = 0;
+    bool read_success = tryRead(sock_, recv_options, 5, &num);
+
+    // The query should fail and respond with an error
+    EXPECT_TRUE(done);
+    EXPECT_EQ(1, num);
+    EXPECT_FALSE(read_success);
+}
+
 // as mentioned above, we need a more better framework for this,
 // in addition to that, this sends out queries into the world
 // (which we should catch somehow and fake replies for)