Browse Source

[3329] dhcp_ddns::NameChangeSender extended to support running ready IO

Rememebering the io_service in use and how to run ready IO handlers was
pushed down from D2ClientMgr into NameChangeSender. NameChangeSender stop
logic was altered so it will now cleanly complete the last send as well
as interrupting the daisy-chain of instigating the next send upon
completion of the current send.
Thomas Markwalder 11 years ago
parent
commit
74341d7cf0

+ 6 - 0
src/lib/dhcp_ddns/dhcp_ddns_messages.mes

@@ -19,6 +19,12 @@ This is an error message that indicates that an invalid request to update
 a DNS entry was received by the application.  Either the format or the content
 of the request is incorrect. The request will be ignored.
 
+% DHCP_DDNS_NCR_FLUSH_IO_ERROR DHCP-DDNS Last send before stopping did not complete successfully: %1
+This is an error message that indicates the DHCP-DDNS client was unable to
+complete the last send prior to exiting send mode.  This is a programmatic
+error, highly unlikely to occur, and should not impair the application's to
+process requests.
+
 % DHCP_DDNS_NCR_LISTEN_CLOSE_ERROR application encountered an error while closing the listener used to receive NameChangeRequests : %1
 This is an error message that indicates the application was unable to close the
 listener connection used to receive NameChangeRequests.  Closure may occur

+ 40 - 5
src/lib/dhcp_ddns/ncr_io.cc

@@ -15,6 +15,7 @@
 #include <dhcp_ddns/dhcp_ddns_log.h>
 #include <dhcp_ddns/ncr_io.h>
 
+#include <asio.hpp>
 #include <boost/algorithm/string/predicate.hpp>
 
 namespace isc {
@@ -159,7 +160,7 @@ NameChangeListener::invokeRecvHandler(const Result result,
 NameChangeSender::NameChangeSender(RequestSendHandler& send_handler,
                                    size_t send_queue_max)
     : sending_(false), send_handler_(send_handler),
-      send_queue_max_(send_queue_max) {
+      send_queue_max_(send_queue_max), io_service_(NULL) {
 
     // Queue size must be big enough to hold at least 1 entry.
     setQueueMaxSize(send_queue_max);
@@ -177,6 +178,8 @@ NameChangeSender::startSending(isc::asiolink::IOService& io_service) {
 
     // Call implementation dependent open.
     try {
+        // Remember io service we're given.
+        io_service_ = &io_service;
         open(io_service);
     } catch (const isc::Exception& ex) {
         stopSending();
@@ -189,6 +192,23 @@ NameChangeSender::startSending(isc::asiolink::IOService& io_service) {
 
 void
 NameChangeSender::stopSending() {
+    // Set it send indicator to false, no matter what. This allows us to at 
+    // least try to re-open via startSending(). Also, setting it false now, 
+    // allows us to break sendNext() chain in invokeSendHandler.
+    setSending(false);
+
+    // If there is an outstanding IO to complete, attempt to process it.
+    if (ioReady() && io_service_ != NULL) {
+        try {
+            runReadyIO();
+        } catch (const std::exception& ex) {
+            // Swallow exceptions. If we have some sort of error we'll log
+            // it but we won't propagate the throw.
+            LOG_ERROR(dhcp_ddns_logger,
+                  DHCP_DDNS_NCR_FLUSH_IO_ERROR).arg(ex.what());
+        }
+    }
+
     try {
         // Call implementation dependent close.
         close();
@@ -199,9 +219,7 @@ NameChangeSender::stopSending() {
                   DHCP_DDNS_NCR_SEND_CLOSE_ERROR).arg(ex.what());
     }
 
-    // Set it false, no matter what.  This allows us to at least try to
-    // re-open via startSending().
-    setSending(false);
+    io_service_ = NULL;
 }
 
 void
@@ -274,7 +292,9 @@ NameChangeSender::invokeSendHandler(const NameChangeSender::Result result) {
 
     // Set up the next send
     try {
-        sendNext();
+        if (amSending()) {
+            sendNext();
+        }
     } catch (const isc::Exception& ex) {
         // It is possible though unlikely, for sendNext to fail without
         // scheduling the send. While, unlikely, it does mean the callback
@@ -367,5 +387,20 @@ NameChangeSender::getSelectFd() {
     isc_throw(NotImplemented, "NameChangeSender::getSelectFd is not supported");
 }
 
+void
+NameChangeSender::runReadyIO() {
+    if (!io_service_) {
+        isc_throw(NcrSenderError, "NameChangeSender::runReadyIO"
+                  " sender io service is null");
+    }
+
+    // We shouldn't be here if IO isn't ready to execute.
+    // By running poll we're gauranteed not to hang.
+    /// @todo Trac# 3325 requests that asiolink::IOService provide a
+    /// wrapper for poll().
+    io_service_->get_io_service().poll_one();
+}
+
+
 } // namespace isc::dhcp_ddns
 } // namespace isc

+ 16 - 0
src/lib/dhcp_ddns/ncr_io.h

@@ -576,6 +576,8 @@ public:
     /// @throw NcrSenderError if the sender is not in send mode,
     virtual int getSelectFd() = 0;
 
+    virtual bool ioReady() = 0;
+
 protected:
     /// @brief Dequeues and sends the next request on the send queue.
     ///
@@ -715,6 +717,14 @@ public:
     /// end of the queue.
     const NameChangeRequestPtr& peekAt(const size_t index) const;
 
+    /// @brief Processes sender IO events
+    ///
+    /// Executes at most one ready handler on the sender's IO service. If
+    /// no handlers are ready it returns immediately.
+    /// @warning - Running all ready handlers, in theory, could process all
+    /// messages currently queued.
+    virtual void runReadyIO();
+
 protected:
     /// @brief Returns a reference to the send queue.
     SendQueue& getSendQueue() {
@@ -746,6 +756,12 @@ private:
 
     /// @brief Pointer to the request which is in the process of being sent.
     NameChangeRequestPtr ncr_to_send_;
+
+    /// @brief Pointer to the IOService currently being used by the sender.
+    /// @note We need to remember the io_service but we receive it by
+    /// reference.  Use a raw pointer to store it.  This value should never be
+    /// exposed and is only valid while in send mode.
+    asiolink::IOService* io_service_;
 };
 
 /// @brief Defines a smart pointer to an instance of a sender.

+ 10 - 0
src/lib/dhcp_ddns/ncr_udp.cc

@@ -359,6 +359,16 @@ NameChangeUDPSender::getSelectFd() {
     return(watch_socket_->getSelectFd());
 }
 
+bool
+NameChangeUDPSender::ioReady() {
+    if (watch_socket_) {
+        return (watch_socket_->isReady());
+    }
+
+    return (false);
+}
+
+
 
 }; // end of isc::dhcp_ddns namespace
 }; // end of isc namespace

+ 2 - 0
src/lib/dhcp_ddns/ncr_udp.h

@@ -542,6 +542,8 @@ public:
     /// @throw NcrSenderError if the sender is not in send mode,
     virtual int getSelectFd();
 
+    virtual bool ioReady();
+
 private:
     /// @brief IP address from which to send.
     isc::asiolink::IOAddress ip_address_;

+ 30 - 18
src/lib/dhcp_ddns/tests/ncr_udp_unittests.cc

@@ -361,7 +361,11 @@ TEST(NameChangeUDPSenderBasicTest, basicSendTests) {
 
     // Verify select_fd is valid and currently shows no ready to read.
     ASSERT_NE(dhcp_ddns::WatchSocket::INVALID_SOCKET, select_fd);
+
+    // Make sure select_fd does evaluates to not ready via select and
+    // that ioReady() method agrees.
     ASSERT_EQ(0, selectCheck(select_fd));
+    ASSERT_FALSE(sender.ioReady());
 
     // Iterate over a series of messages, sending each one. Since we
     // do not invoke IOService::run, then the messages should accumulate
@@ -392,18 +396,22 @@ TEST(NameChangeUDPSenderBasicTest, basicSendTests) {
     // IOService::run_one. This should complete the send of exactly one
     // message and the queue count should decrement accordingly.
     for (int i = num_msgs; i > 0; i--) {
-        // Verify that sender shows IO ready.
+        // Make sure select_fd does evaluates to ready via select and
+        // that ioReady() method agrees.
         ASSERT_TRUE(selectCheck(select_fd) > 0);
+        ASSERT_TRUE(sender.ioReady());
 
         // Execute at one ready handler.
-        io_service.run_one();
+        ASSERT_NO_THROW(sender.runReadyIO());
 
         // Verify that the queue count decrements in step with each run.
         EXPECT_EQ(i-1, sender.getQueueSize());
     }
 
-    // Verify that sender shows no IO ready.
-    EXPECT_EQ(0, selectCheck(select_fd));
+    // Make sure select_fd does evaluates to not ready via select and
+    // that ioReady() method agrees.
+    ASSERT_EQ(0, selectCheck(select_fd));
+    ASSERT_FALSE(sender.ioReady());
 
     // Verify that the queue is empty.
     EXPECT_EQ(0, sender.getQueueSize());
@@ -419,16 +427,21 @@ TEST(NameChangeUDPSenderBasicTest, basicSendTests) {
     // Verify that flushing the queue is not allowed in sending state.
     EXPECT_THROW(sender.clearSendQueue(), NcrSenderError);
 
-    // Put a message on the queue.
-    EXPECT_NO_THROW(sender.sendRequest(ncr));
-    EXPECT_EQ(1, sender.getQueueSize());
+    // Put num_msgs messages on the queue.
+    for (int i = 0; i < num_msgs; i++) {
+        ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[i]));
+        EXPECT_NO_THROW(sender.sendRequest(ncr));
+    }
+
+    // Make sure we have number of messages expected.
+    EXPECT_EQ(num_msgs, sender.getQueueSize());
 
     // Verify that we can gracefully stop sending.
     EXPECT_NO_THROW(sender.stopSending());
     EXPECT_FALSE(sender.amSending());
 
     // Verify that the queue is preserved after leaving sending state.
-    EXPECT_EQ(1, sender.getQueueSize());
+    EXPECT_EQ(num_msgs - 1, sender.getQueueSize());
 
     // Verify that flushing the queue works when not sending.
     EXPECT_NO_THROW(sender.clearSendQueue());
@@ -454,23 +467,19 @@ TEST(NameChangeUDPSenderBasicTest, anyAddressSend) {
     ASSERT_NO_THROW(sender.startSending(io_service));
     EXPECT_TRUE(sender.amSending());
 
-    // Fetch the sender's select-fd.
-    int select_fd = sender.getSelectFd();
-
     // Create and queue up a message.
     NameChangeRequestPtr ncr;
     ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[0]));
     EXPECT_NO_THROW(sender.sendRequest(ncr));
     EXPECT_EQ(1, sender.getQueueSize());
 
-    // message and the queue count should decrement accordingly.
-    // Execute at one ready handler.
-    ASSERT_TRUE(selectCheck(select_fd) > 0);
-    ASSERT_NO_THROW(io_service.run_one());
+    // Verify we have a ready IO, then execute at one ready handler.
+    ASSERT_TRUE(sender.ioReady());
+    ASSERT_NO_THROW(sender.runReadyIO());
 
     // Verify that sender shows no IO ready.
     // and that the queue is empty.
-    EXPECT_EQ(0, selectCheck(select_fd));
+    ASSERT_FALSE(sender.ioReady());
     EXPECT_EQ(0, sender.getQueueSize());
 }
 
@@ -514,6 +523,9 @@ TEST(NameChangeSender, assumeQueue) {
     // Take sender1 out of send mode.
     ASSERT_NO_THROW(sender1.stopSending());
     ASSERT_FALSE(sender1.amSending());
+    // Stopping should have completed the first message.
+    --num_msgs;
+    EXPECT_EQ(num_msgs, sender1.getQueueSize());
 
     // Transfer should succeed. Verify sender1 has none,
     // and sender2 has num_msgs queued.
@@ -719,7 +731,7 @@ TEST(NameChangeUDPSenderBasicTest, watchClosedAfterSendRequest) {
     // Run one handler. This should execute the send completion handler
     // after sending the first message.  Duing completion handling, we will
     // attempt to queue the second message which should fail.
-    ASSERT_NO_THROW(io_service.run_one());
+    ASSERT_NO_THROW(sender.runReadyIO());
 
     // Verify handler got called twice. First request should have be sent
     // without error, second call should have failed to send due to watch
@@ -767,7 +779,7 @@ TEST(NameChangeUDPSenderBasicTest, watchSocketBadRead) {
     // after sending the message.  Duing completion handling clearing the
     // watch socket should fail, which will close the socket, but not
     // result in a throw.
-    ASSERT_NO_THROW(io_service.run_one());
+    ASSERT_NO_THROW(sender.runReadyIO());
 
     // Verify handler got called twice. First request should have be sent
     // without error, second call should have failed to send due to watch