Browse Source

[master] Merge branch 'trac3075'

Added main process event loop to src/bin/d2/D2Process,
which is the primary application object in b10-dchp-ddns.
Thomas Markwalder 11 years ago
parent
commit
e2f9d2e4c1

+ 9 - 4
src/bin/d2/d2_config.cc

@@ -159,7 +159,7 @@ DdnsDomainListMgr::matchDomain(const std::string& fqdn, DdnsDomainPtr& domain) {
 // *********************** TSIGKeyInfoParser  *************************
 
 TSIGKeyInfoParser::TSIGKeyInfoParser(const std::string& entry_name,
-    TSIGKeyInfoMapPtr keys)
+                                     TSIGKeyInfoMapPtr keys)
     : entry_name_(entry_name), keys_(keys), local_scalars_() {
     if (!keys_) {
         isc_throw(D2CfgError, "TSIGKeyInfoParser ctor:"
@@ -226,7 +226,7 @@ TSIGKeyInfoParser::commit() {
         isc_throw(D2CfgError, "TSIG Key Info must specify name");
     }
 
-    // Algorithme cannot be blank.
+    // Algorithm cannot be blank.
     if (algorithm.empty()) {
         isc_throw(D2CfgError, "TSIG Key Info must specify algorithm");
     }
@@ -253,7 +253,8 @@ TSIGKeyInfoParser::commit() {
 
 TSIGKeyInfoListParser::TSIGKeyInfoListParser(const std::string& list_name,
                                        TSIGKeyInfoMapPtr keys)
-    :list_name_(list_name), keys_(keys), parsers_() {
+    :list_name_(list_name), keys_(keys), local_keys_(new TSIGKeyInfoMap()), 
+     parsers_() {
     if (!keys_) {
         isc_throw(D2CfgError, "TSIGKeyInfoListParser ctor:"
                   " key storage cannot be null");
@@ -277,7 +278,7 @@ build(isc::data::ConstElementPtr key_list){
         // Create a name for the parser based on its position in the list.
         std::string entry_name = boost::lexical_cast<std::string>(i++);
         isc::dhcp::ParserPtr parser(new TSIGKeyInfoParser(entry_name,
-                                                            keys_));
+                                                            local_keys_));
         parser->build(key_config);
         parsers_.push_back(parser);
     }
@@ -290,6 +291,10 @@ TSIGKeyInfoListParser::commit() {
     BOOST_FOREACH(isc::dhcp::ParserPtr parser, parsers_) {
         parser->commit();
     }
+  
+    // Now that we know we have a valid list, commit that list to the
+    // area given to us during construction (i.e. to the d2 context).   
+    *keys_ = *local_keys_;
 }
 
 // *********************** DnsServerInfoParser  *************************

+ 9 - 2
src/bin/d2/d2_config.h

@@ -606,8 +606,12 @@ public:
 
     /// @brief Iterates over the internal list of TSIGKeyInfoParsers,
     /// invoking commit on each.  This causes each parser to instantiate a
-    /// TSIGKeyInfo from its internal data values and add that that key
-    /// instance to the storage area, keys_.
+    /// TSIGKeyInfo from its internal data values and add that key
+    /// instance to the local key storage area, local_keys_.   If all of the
+    /// key parsers commit cleanly, then update the context key map (keys_)
+    /// with the contents of local_keys_.  This is done to allow for duplicate
+    /// key detection while parsing the keys, but not get stumped by it
+    /// updating the context with a valid list.
     virtual void commit();
 
 private:
@@ -618,6 +622,9 @@ private:
     /// the list of newly created TSIGKeyInfo instances. This is given to us
     /// as a constructor argument by an upper level.
     TSIGKeyInfoMapPtr keys_;
+    
+    /// @brief Local storage area to which individual key parsers commit.
+    TSIGKeyInfoMapPtr local_keys_;
 
     /// @brief Local storage of TSIGKeyInfoParser instances
     isc::dhcp::ParserCollection parsers_;

+ 70 - 3
src/bin/d2/d2_messages.mes

@@ -22,7 +22,7 @@ to disconnect from its session with the BIND10 control channel.
 This debug message is issued just before the controller attempts
 to establish a session with the BIND10 control channel.
 
-% DCTL_COMMAND_RECEIVED %1 received command %2, arguments: %3
+% DCTL_COMMAND_RECEIVED %1 received command: %2, arguments: %3
 A debug message listing the command (and possible arguments) received
 from the BIND10 control system by the controller.
 
@@ -116,6 +116,10 @@ following a shut down (normal or otherwise) of the service.
 This is a debug message that indicates that the application has DHCP_DDNS
 requests in the queue but is working as many concurrent requests as allowed.
 
+% DHCP_DDNS_CLEARED_FOR_SHUTDOWN application has met shutdown criteria for shutdown type: %1
+This is an informational message issued when the application has been instructed
+to shutdown and has met the required criteria to exit.
+
 % DHCP_DDNS_COMMAND command directive received, command: %1 - args: %2
 This is a debug message issued when the Dhcp-Ddns application command method
 has been invoked.
@@ -168,12 +172,75 @@ needs to be increased, the DHCP-DDNS clients are simply generating too many
 requests too quickly, or perhaps upstream DNS servers are experiencing
 load issues.
 
+% DHCP_DDNS_QUEUE_MGR_RECONFIGURING application is reconfiguring the queue manager
+This is an informational message indicating that DHCP_DDNS is reconfiguring the
+queue manager as part of normal startup or in response to a new configuration.
+
+% DHCP_DDNS_QUEUE_MGR_RECOVERING application is attempting to recover from a
+queue manager IO error
+This is an informational message indicating that DHCP_DDNS is attempting to
+restart the queue manager after it suffered an IO error while receiving
+requests.
+
 % DHCP_DDNS_QUEUE_MGR_RECV_ERROR application's queue manager was notified of a request receive error by its listener.
 This is an error message indicating that the NameChangeRequest listener used by
 DHCP-DDNS to receive requests encountered a IO error.  There should be
 corresponding log messages from the listener layer with more details. This may
 indicate a network connectivity or system resource issue.
 
+% DHCP_DDNS_QUEUE_MGR_RESUME_ERROR application could not restart the queue manager, reason: %1
+This is an error message indicating that DHCP_DDNS's Queue Manager could not
+be restarted after stopping due to an a full receive queue.  This means that
+the application cannot receive requests. This is most likely due to DHCP_DDNS
+configuration parameters referring to resources such as an IP address or port,
+that is no longer unavailable.  DHCP_DDNS will attempt to restart the queue
+manager if given a new configuration.
+
+% DHCP_DDNS_QUEUE_MGR_RESUMING application is resuming listening for requests now that the request queue size has reached %1 of a maximum %2 allowed
+This is an informational message indicating that DHCP_DDNS, which had stopped
+accepting new requests, has processed enough entries from the receive queue to
+resume accepting requests.
+
+% DHCP_DDNS_QUEUE_MGR_STARTED application's queue manager has begun listening for requests.
+This is a debug message indicating that DHCP_DDNS's Queue Manager has
+successfully started and is now listening for NameChangeRequests.
+
+% DHCP_DDNS_QUEUE_MGR_START_ERROR application could not start the queue manager, reason: %1
+This is an error message indicating that DHCP_DDNS's Queue Manager could not
+be started.  This means that the application cannot receive requests. This is
+most likely due to DHCP_DDNS configuration parameters referring to resources
+such as an IP address or port, that are unavailable.  DHCP_DDNS will attempt to
+restart the queue manager if given a new configuration.
+
+% DHCP_DDNS_QUEUE_MGR_STOPPED application's queue manager has stopped listening for requests.
+This is an informational message indicating that DHCP_DDNS's Queue Manager has
+stopped listening for NameChangeRequests.  This may be because of normal event
+such as reconfiguration or as a result of an error.  There should be log
+messages preceding this one to indicate why it has stopped.
+
+% DHCP_DDNS_QUEUE_MGR_STOPPING application is stopping the queue manager for %1
+This is an informational message indicating that DHCP_DDNS is stopping the
+queue manager either to reconfigure it or as part of application shutdown.
+
+% DHCP_DDNS_QUEUE_MGR_STOP_ERROR application encountered an error stopping the queue manager: %1
+This is an error message indicating that DHCP_DDNS encountered an error while
+trying to stop the queue manager.  This error is unlikely to occur or to
+impair the application's ability to function but it should be reported for
+analysis.
+
+% DHCP_DDNS_QUEUE_MGR_UNEXPECTED_HANDLER_ERROR application's queue manager request receive handler experienced an unexpected exception %1:
+This is an error message indicating that an unexpected error occurred within the
+DHCP_DDNS's Queue Manager request receive completion handler. This is most
+likely a programmatic issue that should be reported.  The application may
+recover on its own.
+
+% DHCP_DDNS_QUEUE_MGR_UNEXPECTED_STOP application's queue manager receive was
+aborted unexpectedly while queue manager state is: %1
+This is an error message indicating that DHCP_DDNS's Queue Manager request
+receive was unexpected interrupted.  Normally, the read is receive is only
+interrupted as a normal part of stopping the queue manager.  This is most
+likely a programmatic issue that should be reported.
+
 % DHCP_DDNS_RUN_ENTER application has entered the event loop
 This is a debug message issued when the Dhcp-Ddns application enters
 its run method.
@@ -182,6 +249,6 @@ its run method.
 This is a debug message issued when the Dhcp-Ddns exits the
 in event loop.
 
-% DHCP_DDNS_SHUTDOWN application is performing a normal shut down
-This is a debug message issued when the application has been instructed
+% DHCP_DDNS_SHUTDOWN application received shutdown command with args: %1
+This is informational message issued when the application has been instructed
 to shut down by the controller.

+ 324 - 20
src/bin/d2/d2_process.cc

@@ -17,13 +17,31 @@
 #include <d2/d2_cfg_mgr.h>
 #include <d2/d2_process.h>
 
-using namespace asio;
+#include <asio.hpp>
 
 namespace isc {
 namespace d2 {
 
+// Setting to 80% for now. This is an arbitrary choice and should probably
+// be configurable.
+const unsigned int D2Process::QUEUE_RESTART_PERCENT =  80;
+
 D2Process::D2Process(const char* name, IOServicePtr io_service)
-    : DProcessBase(name, io_service, DCfgMgrBasePtr(new D2CfgMgr())) {
+    : DProcessBase(name, io_service, DCfgMgrBasePtr(new D2CfgMgr())),
+     reconf_queue_flag_(false), shutdown_type_(SD_NORMAL) {
+
+    // Instantiate queue manager.  Note that queue manager does not start
+    // listening at this point.  That can only occur after configuration has
+    // been received.  This means that until we receive the configuration,
+    // D2 will neither receive nor process NameChangeRequests.
+    // Pass in IOService for NCR IO event processing.
+    queue_mgr_.reset(new D2QueueMgr(*getIoService()));
+
+    // Instantiate update manager.
+    // Pass in both queue manager and configuration manager.
+    // Pass in IOService for DNS update transaction IO event processing.
+    D2CfgMgrPtr tmp = getD2CfgMgr();
+    update_mgr_.reset(new D2UpdateMgr(queue_mgr_,  tmp,  *getIoService()));
 };
 
 void
@@ -32,17 +50,31 @@ D2Process::init() {
 
 void
 D2Process::run() {
-    // Until shut down or an fatal error occurs, wait for and
-    // execute a single callback. This is a preliminary implementation
-    // that is likely to evolve as development progresses.
-    // To use run(), the "managing" layer must issue an io_service::stop
-    // or the call to run will continue to block, and shutdown will not
-    // occur.
     LOG_DEBUG(dctl_logger, DBGLVL_START_SHUT, DHCP_DDNS_RUN_ENTER);
-    IOServicePtr& io_service = getIoService();
-    while (!shouldShutdown()) {
+    // Loop forever until we are allowed to shutdown.
+    while (!canShutdown()) {
         try {
-            io_service->run_one();
+            // Check on the state of the request queue. Take any
+            // actions necessary regarding it.
+            checkQueueStatus();
+
+            // Give update manager a time slice to queue new jobs and
+            // process finished ones.
+            update_mgr_->sweep();
+
+            // Wait on IO event(s)  - block until one or more of the following
+            // has occurred:
+            //   a. NCR message has been received
+            //   b. Transaction IO has completed
+            //   c. Interval timer expired
+            //   d. Something stopped IO service (runIO returns 0)
+            if (runIO() == 0) {
+                // Pretty sure this amounts to an unexpected stop and we
+                // should bail out now.  Normal shutdowns do not utilize
+                // stopping the IOService.
+                isc_throw(DProcessBaseError,
+                          "Primary IO service stopped unexpectedly");
+            }
         } catch (const std::exception& ex) {
             LOG_FATAL(dctl_logger, DHCP_DDNS_FAILED).arg(ex.what());
             isc_throw (DProcessBaseError,
@@ -50,29 +82,274 @@ D2Process::run() {
         }
     }
 
+    // @todo - if queue isn't empty, we may need to persist its contents
+    // this might be the place to do it, once there is a persistence mgr.
+    // This may also be better in checkQueueStatus.
+
     LOG_DEBUG(dctl_logger, DBGLVL_START_SHUT, DHCP_DDNS_RUN_EXIT);
+
 };
 
-void
-D2Process::shutdown() {
-    LOG_DEBUG(dctl_logger, DBGLVL_START_SHUT, DHCP_DDNS_SHUTDOWN);
+size_t
+D2Process::runIO() {
+    // We want to block until at least one handler is called.  We'll use
+    // asio::io_service directly for two reasons. First off
+    // asiolink::IOService::run_one is a void and asio::io_service::stopped
+    // is not present in older versions of boost.  We need to know if any
+    // handlers ran or if the io_service was stopped.  That latter represents
+    // some form of error and the application cannot proceed with a stopped
+    // service.  Secondly, asiolink::IOService does not provide the poll
+    // method.  This is a handy method which runs all ready handlers without
+    // blocking.
+    IOServicePtr& io = getIoService();
+    asio::io_service& asio_io_service  = io->get_io_service();
+
+    // Poll runs all that are ready. If none are ready it returns immediately
+    // with a count of zero.
+    size_t cnt = asio_io_service.poll();
+    if (!cnt) {
+        // Poll ran no handlers either none are ready or the service has been
+        // stopped.  Either way, call run_one to wait for a IO event. If the
+        // service is stopped it will return immediately with a cnt of zero.
+        cnt = asio_io_service.run_one();
+    }
+
+    return (cnt);
+}
+
+bool
+D2Process::canShutdown() const {
+    bool all_clear = false;
+
+    // If we have been told to shutdown, find out if we are ready to do so.
+    if (shouldShutdown()) {
+        switch (shutdown_type_) {
+        case SD_NORMAL:
+            // For a normal shutdown we need to stop the queue manager but
+            // wait until we have finished all the transactions in progress.
+            all_clear = (((queue_mgr_->getMgrState() != D2QueueMgr::RUNNING) &&
+                          (queue_mgr_->getMgrState() != D2QueueMgr::STOPPING))
+                         && (update_mgr_->getTransactionCount() == 0));
+            break;
+
+        case SD_DRAIN_FIRST:
+            // For a drain first shutdown we need to stop the queue manager but
+            // process all of the requests in the receive queue first.
+            all_clear = (((queue_mgr_->getMgrState() != D2QueueMgr::RUNNING) &&
+                          (queue_mgr_->getMgrState() != D2QueueMgr::STOPPING))
+                          && (queue_mgr_->getQueueSize() == 0)
+                          && (update_mgr_->getTransactionCount() == 0));
+            break;
+
+        case SD_NOW:
+            // Get out right now, no niceties.
+            all_clear = true;
+            break;
+
+        default:
+            // shutdown_type_ is an enum and should only be one of the above.
+            // if its getting through to this, something is whacked.
+            break;
+        }
+
+        if (all_clear) {
+            LOG_INFO(dctl_logger,DHCP_DDNS_CLEARED_FOR_SHUTDOWN)
+                     .arg(getShutdownTypeStr(shutdown_type_));
+        }
+    }
+
+    return (all_clear);
+}
+
+isc::data::ConstElementPtr
+D2Process::shutdown(isc::data::ConstElementPtr args) {
+    LOG_INFO(dctl_logger, DHCP_DDNS_SHUTDOWN).arg(args ? args->str()
+                                                  : "(no args)");
+
+    // Default shutdown type is normal.
+    std::string type_str(getShutdownTypeStr(SD_NORMAL));
+    shutdown_type_ = SD_NORMAL;
+
+    if (args) {
+        if ((args->getType() == isc::data::Element::map) &&
+            args->contains("type")) {
+            type_str = args->get("type")->stringValue();
+
+            if (type_str == getShutdownTypeStr(SD_NORMAL)) {
+                shutdown_type_ = SD_NORMAL;
+            } else if (type_str == getShutdownTypeStr(SD_DRAIN_FIRST)) {
+                shutdown_type_ = SD_DRAIN_FIRST;
+            } else if (type_str == getShutdownTypeStr(SD_NOW)) {
+                shutdown_type_ = SD_NOW;
+            } else {
+                setShutdownFlag(false);
+                return (isc::config::createAnswer(1, "Invalid Shutdown type: "
+                                                  + type_str));
+            }
+        }
+    }
+
+    // Set the base class's shutdown flag.
     setShutdownFlag(true);
+    return (isc::config::createAnswer(0, "Shutdown initiated, type is: "
+                                      + type_str));
 }
 
 isc::data::ConstElementPtr
 D2Process::configure(isc::data::ConstElementPtr config_set) {
-    // @todo This is the initial implementation passes the configuration onto
-    // the D2CfgMgr.  There may be additional steps taken added to handle
-    // configuration changes but for now, assume that D2CfgMgr is handling it
-    // all. 
     LOG_DEBUG(dctl_logger, DBGLVL_TRACE_BASIC,
               DHCP_DDNS_CONFIGURE).arg(config_set->str());
 
-    return (getCfgMgr()->parseConfig(config_set));
+    int rcode = 0;
+    isc::data::ConstElementPtr comment;
+    isc::data::ConstElementPtr answer = getCfgMgr()->parseConfig(config_set);;
+    comment = isc::config::parseAnswer(rcode, answer);
+
+    if (rcode) {
+        // Non-zero means we got an invalid configuration, take no further
+        // action.  In integrated mode, this will send a failed response back 
+        // to BIND10.
+        reconf_queue_flag_ = false;
+        return (answer);
+    }
+
+    // Set the reconf_queue_flag to indicate that we need to reconfigure
+    // the queue manager.  Reconfiguring the queue manager may be asynchronous
+    // and require one or more events to occur, therefore we set a flag
+    // indicating it needs to be done but we cannot do it here.  It must
+    // be done over time, while events are being processed.  Remember that
+    // the method we are in now is invoked as part of the configuration event
+    // callback.  This means you can't wait for events here, you are already
+    // in one.
+    // (@todo NOTE This could be turned into a bitmask of flags if we find other
+    // things that need reconfiguration.  It might also be useful if we
+    // did some analysis to decide what if anything we need to do.)
+    reconf_queue_flag_ = true;
+
+    // If we are here, configuration was valid, at least it parsed correctly
+    // and therefore contained no invalid values.
+    // Return the success answer from above.
+    return (answer);
+}
+
+void
+D2Process::checkQueueStatus() {
+    switch (queue_mgr_->getMgrState()){
+    case D2QueueMgr::RUNNING:
+        if (reconf_queue_flag_ || shouldShutdown()) {
+            // If we need to reconfigure the queue manager or we have been
+            // told to shutdown, then stop listening first.  Stopping entails
+            // canceling active listening which may generate an IO event, so
+            // instigate the stop and get out.
+            try {
+                LOG_INFO(dctl_logger, DHCP_DDNS_QUEUE_MGR_STOPPING)
+                         .arg(reconf_queue_flag_ ? "reconfiguration"
+                                                   : "shutdown");
+                queue_mgr_->stopListening();
+            } catch (const isc::Exception& ex) {
+                // It is very unlikey that we would experience an error
+                // here, but theoretically possible. 
+                LOG_ERROR(dctl_logger, DHCP_DDNS_QUEUE_MGR_STOP_ERROR)
+                          .arg(ex.what());
+            }
+        }
+        break;
+
+    case D2QueueMgr::STOPPED_QUEUE_FULL: {
+            // Resume receiving once the queue has decreased by twenty
+            // percent.  This is an arbitrary choice. @todo this value should
+            // probably be configurable.
+            size_t threshold = (((queue_mgr_->getMaxQueueSize()
+                                * QUEUE_RESTART_PERCENT)) / 100);
+            if (queue_mgr_->getQueueSize() <= threshold) {
+                LOG_INFO (dctl_logger, DHCP_DDNS_QUEUE_MGR_RESUMING)
+                          .arg(threshold).arg(queue_mgr_->getMaxQueueSize());
+                try {
+                    queue_mgr_->startListening();
+                } catch (const isc::Exception& ex) {
+                    LOG_ERROR(dctl_logger, DHCP_DDNS_QUEUE_MGR_RESUME_ERROR)
+                              .arg(ex.what());
+                }
+            }
+
+        break;
+        }
+
+    case D2QueueMgr::STOPPED_RECV_ERROR:
+        // If the receive error is not due to some fallout from shutting
+        // down then we will attempt to recover by reconfiguring the listener.
+        // This will close and destruct the current listener and make a new
+        // one with new resources.
+        // @todo This may need a safety valve such as retry count or a timer
+        // to keep from endlessly retrying over and over, with little time
+        // in between.
+        if (!shouldShutdown()) {
+            LOG_INFO (dctl_logger, DHCP_DDNS_QUEUE_MGR_RECOVERING);
+            reconfigureQueueMgr();
+        }
+        break;
+
+    case D2QueueMgr::STOPPING:
+        // We are waiting for IO to cancel, so this is a NOP.
+        // @todo Possible timer for self-defense?  We could conceivably
+        // get into a condition where we never get the event, which would
+        // leave us stuck in stopping.  This is hugely unlikely but possible?
+        break;
+
+    default:
+        // If the reconfigure flag is set, then we are in a state now where
+        // we can do the reconfigure. In other words, we aren't RUNNING or
+        // STOPPING.
+        if (reconf_queue_flag_) {
+            LOG_INFO (dctl_logger, DHCP_DDNS_QUEUE_MGR_RECONFIGURING);
+            reconfigureQueueMgr();
+        }
+        break;
+    }
+}
+
+void
+D2Process::reconfigureQueueMgr() {
+    // Set reconfigure flag to false.  We are only here because we have
+    // a valid configuration to work with so if we fail below, it will be
+    // an operational issue, such as a busy IP address. That will leave
+    // queue manager in INITTED state, which is fine.
+    // What we dont' want is to continually attempt to reconfigure so set
+    // the flag false now.
+    // @todo This method assumes only 1 type of listener.  This will change
+    // to support at least a TCP version, possibly some form of RDBMS listener
+    // as well.
+    reconf_queue_flag_ = false;
+    try {
+        // Wipe out the current listener.
+        queue_mgr_->removeListener();
+
+        // Get the configuration parameters that affect Queue Manager.
+        // @todo Need to add parameters for listener TYPE, FORMAT, address reuse
+        std::string ip_address;
+        uint32_t port;
+        getCfgMgr()->getContext()->getParam("ip_address", ip_address);
+        getCfgMgr()->getContext()->getParam("port", port);
+        isc::asiolink::IOAddress addr(ip_address);
+
+        // Instantiate the listener.
+        queue_mgr_->initUDPListener(addr, port, dhcp_ddns::FMT_JSON, true);
+
+        // Now start it. This assumes that starting is a synchronous,
+        // blocking call that executes quickly.  @todo Should that change then
+        // we will have to expand the state model to accommodate this.
+        queue_mgr_->startListening();
+    } catch (const isc::Exception& ex) {
+        // Queue manager failed to initialize and therefore not listening. 
+        // This is most likely due to an unavailable IP address or port, 
+        // which is a configuration issue.
+        LOG_ERROR(dctl_logger, DHCP_DDNS_QUEUE_MGR_START_ERROR).arg(ex.what());
+    }
 }
 
 isc::data::ConstElementPtr
-D2Process::command(const std::string& command, isc::data::ConstElementPtr args){
+D2Process::command(const std::string& command, 
+                   isc::data::ConstElementPtr args) {
     // @todo This is the initial implementation.  If and when D2 is extended
     // to support its own commands, this implementation must change. Otherwise
     // it should reject all commands as it does now.
@@ -86,5 +363,32 @@ D2Process::command(const std::string& command, isc::data::ConstElementPtr args){
 D2Process::~D2Process() {
 };
 
+D2CfgMgrPtr
+D2Process::getD2CfgMgr() {
+    // The base class gives a base class pointer to our configuration manager.
+    // Since we are D2, and we need D2 specific extensions, we need a pointer
+    // to D2CfgMgr for some things.
+    return (boost::dynamic_pointer_cast<D2CfgMgr>(getCfgMgr()));
+}
+
+const char* D2Process::getShutdownTypeStr(const ShutdownType& type) {
+    const char* str = "invalid";
+    switch (type) {
+    case SD_NORMAL:
+        str = "normal";
+        break;
+    case SD_DRAIN_FIRST:
+        str = "drain_first";
+        break;
+    case SD_NOW:
+        str = "now";
+        break;
+    default:
+        break;
+    }
+
+    return (str);
+}
+
 }; // namespace isc::d2
 }; // namespace isc

+ 249 - 17
src/bin/d2/d2_process.h

@@ -16,6 +16,8 @@
 #define D2_PROCESS_H
 
 #include <d2/d_process.h>
+#include <d2/d2_queue_mgr.h>
+#include <d2/d2_update_mgr.h>
 
 namespace isc {
 namespace d2 {
@@ -27,11 +29,37 @@ namespace d2 {
 /// to receive DNS mapping change requests and carry them out.
 /// It implements the DProcessBase interface, which structures it such that it
 /// is a managed "application", controlled by a management layer.
-
 class D2Process : public DProcessBase {
 public:
+
+    /// @brief Defines the shutdown types supported by D2Process
+    ///
+    /// * SD_NORMAL - Stops the queue manager and finishes all current
+    /// transactions before exiting. This is the default.
+    ///
+    /// * SD_DRAIN_FIRST - Stops the queue manager but continues processing
+    /// requests from the queue until it is empty.
+    ///
+    /// * SD_NOW - Exits immediately.
+    enum ShutdownType {
+      SD_NORMAL,
+      SD_DRAIN_FIRST,
+      SD_NOW
+    };
+
+    /// @brief Defines the point at which to resume receiving requests.
+    /// If the receive queue has become full, D2Process will "pause" the
+    /// reception of requests by putting the queue manager in the stopped
+    /// state.  Once the number of entries has decreased to this percentage
+    /// of  the maximum allowed, D2Process will "resume" receiving requests
+    /// by restarting the queue manager.
+    static const unsigned int QUEUE_RESTART_PERCENT;
+
     /// @brief Constructor
     ///
+    /// Construction creates the configuration manager, the queue
+    /// manager, and the update manager.
+    ///
     /// @param name name is a text label for the process. Generally used
     /// in log statements, but otherwise arbitrary.
     /// @param io_service is the io_service used by the caller for
@@ -40,27 +68,79 @@ public:
     /// @throw DProcessBaseError is io_service is NULL.
     D2Process(const char* name, IOServicePtr io_service);
 
-    /// @brief Will be used after instantiation to perform initialization
-    /// unique to D2. @todo This will likely include interactions with
-    /// QueueMgr and UpdateMgr, to prepare for request receipt and processing.
-    /// Current implementation successfully does nothing.
-    /// @throw throws a DProcessBaseError if the initialization fails.
+    /// @brief Called after instantiation to perform initialization unique to
+    /// D2.
+    ///
+    /// This is invoked by the controller after command line arguments but
+    /// PRIOR to configuration reception.  The base class provides this method
+    /// as a place to perform any derivation-specific initialization steps
+    /// that are inapppropriate for the constructor but necessary prior to
+    /// launch.  So far, no such steps have been identified for D2, so its
+    /// implementantion is empty but required.
+    ///
+    /// @throw DProcessBaseError if the initialization fails.
     virtual void init();
 
     /// @brief Implements the process's event loop.
-    /// The initial implementation is quite basic, surrounding calls to
-    /// io_service->runOne() with a test of the shutdown flag.
-    /// Once invoked, the method will continue until the process itself is
-    /// exiting due to a request to shutdown or some anomaly forces an exit.
-    /// @throw throws a DProcessBaseError if an error is encountered.
+    ///
+    /// Once entered, the main control thread remains inside this method
+    /// until shutdown.  The event loop logic is as follows:
+    /// @code
+    ///    while should not down {
+    ///       process queue manager state change
+    ///       process completed jobs
+    ///       dequeue new jobs
+    ///       wait for IO event(s)
+    ///
+    ///       ON an exception, exit with fatal error
+    ///    }
+    /// @endcode
+    ///
+    /// To summarize, each pass through the event loop first checks the state
+    /// of the received queue and takes any steps required to ensure it is
+    /// operating in the manner necessary.  Next the update manager is given
+    /// a chance to clean up any completed transactions and start new
+    /// transactions by dequeuing jobs from the request queue.  Lastly, it
+    /// allows IOService to process until one or more event handlers are
+    /// called.  Note that this last step will block until at least one
+    /// ready handler is invoked.  In other words, if no IO events have occurred
+    /// since it was last called, the event loop will block at this step until
+    /// an IO event occurs.  At that time we return to the top of the loop.
+    ///
+    /// @throw DProcessBaseError if an error is encountered.  Note that
+    /// exceptions thrown at this point are assumed to be FATAL exceptions.
+    /// This includes exceptions generated but not caught by IO callbacks.
+    /// Services which rely on callbacks are expected to be well behaved and
+    /// any errors they encounter handled internally.
     virtual void run();
 
-    /// @brief Implements the process's shutdown processing. When invoked, it
-    /// should ensure that the process gracefully exits the run method.
-    /// Current implementation simply sets the shutdown flag monitored by the
-    /// run method. @todo this may need to expand as the implementation evolves.
-    /// @throw throws a DProcessBaseError if an error is encountered.
-    virtual void shutdown();
+    /// @brief Initiates the D2Process shutdown process.
+    ///
+    /// This is last step in the shutdown event callback chain. It is invoked
+    /// to notify D2Process that it needs to begin its shutdown procedure.
+    /// Note that shutting down may be neither instantaneous nor synchronous,
+    /// This method records the request for and the type of shutdown desired.
+    /// Generally it will require one or more subsequent events to complete,
+    /// dependent on the type of shutdown requested.  The type of shutdown is
+    /// specified as an optional argument of the shutdown command. The types
+    /// of shutdown supported are:
+    ///
+    /// * "normal" - Stops the queue manager and finishes all current
+    /// transactions before exiting. This is the default.
+    ///
+    /// * "drain_first" - Stops the queue manager but continues processing
+    /// requests from the queue until it is empty.
+    ///
+    /// * "now" - Exits immediately.
+    ///
+    /// @param args Specifies the shutdown "type" as "normal", "drain_first",
+    /// or "now"
+    ///
+    /// @return an Element that contains the results of argument processing,
+    /// consisting of an integer status value (0 means successful,
+    /// non-zero means failure), and a string explanation of the outcome.
+    virtual isc::data::ConstElementPtr
+        shutdown(isc::data::ConstElementPtr args);
 
     /// @brief Processes the given configuration.
     ///
@@ -70,6 +150,16 @@ public:
     /// processing errors and return a success or failure answer as described
     /// below.
     ///
+    /// This method passes the newly received configuration to the configuration
+    /// manager instance for parsing.  The configuration manager parses the
+    /// configuration and updates the necessary values within the context,
+    /// assuming it parses correctly.  If that's the case this method sets the
+    /// flag to reconfigure the queue manager and returns a successful response
+    /// as described below.
+    ///
+    /// If the new configuration fails to parse, then the current configuration
+    /// is retained and a failure response is returned as described below.
+    ///
     /// @param config_set a new configuration (JSON) for the process
     /// @return an Element that contains the results of configuration composed
     /// of an integer status value (0 means successful, non-zero means failure),
@@ -93,8 +183,150 @@ public:
                                                isc::data::ConstElementPtr args);
     /// @brief Destructor
     virtual ~D2Process();
+
+protected:
+    /// @brief Monitors current queue manager state, takes action accordingly
+    ///
+    /// This method ensures that the queue manager transitions to the state
+    /// most appropriate to the operational state of the D2Process and any
+    /// events that may have occurred since it was last called.  It is called
+    /// once for each iteration of the event loop.  It is essentially a
+    /// switch statement based on the D2QueueMgr's current state.  The logic
+    /// is as follows:
+    ///
+    /// If the state is D2QueueMgr::RUNNING, and the queue manager needs to be
+    /// reconfigured or we have been told to shutdown, then instruct the queue
+    /// manager to stop listening. Exit the method.
+    ///
+    /// If the state is D2QueueMgr::STOPPED_QUEUE_FULL, then check if the
+    /// number of entries in the queue has fallen below the "resume threshold".
+    /// If it has, then instruct the queue manager to start listening. Exit
+    /// the method.
+    ///
+    /// If the state is D2QueueMgr::STOPPED_RECV_ERROR, then attempt to recover
+    /// by calling reconfigureQueueMgr(). Exit the method.
+    ///
+    /// If the state is D2QueueMgr::STOPPING, simply exit the method. This is
+    /// a NOP condition as we are waiting for the IO cancel event
+    ///
+    /// For any other state, (NOT_INITTED,INITTED,STOPPED), if the reconfigure
+    /// queue flag is set, call reconfigureQueueMgr(). Exit the method.
+    ///
+    /// This method is exception safe.
+    virtual void checkQueueStatus();
+
+    /// @brief Initializes then starts the queue manager.
+    ///
+    /// This method is initializes the queue manager with the current
+    /// configuration parameters and instructs it to start listening.
+    /// Note the existing listener instance (if it exists) is destroyed,
+    /// and that a new listener is created during initialization.
+    ///
+    /// This method is exception safe.
+    virtual void reconfigureQueueMgr();
+
+    /// @brief Allows IO processing to run until at least callback is invoked.
+    ///
+    /// This method is called from within the D2Process main event loop and is
+    /// the point at which the D2Process blocks, waiting for IO events to
+    /// cause IO event callbacks to be invoked.
+    ///
+    /// If callbacks are ready to be executed upon entry, the method will
+    /// return as soon as these callbacks have completed.  If no callbacks
+    /// are ready, then it will wait (indefinitely) until at least one callback
+    /// is executed.
+    ///
+    /// @note: Should become desirable to periodically force an
+    /// event, an interval timer could be used to do so.
+    ///
+    /// @return The number of callback handlers executed, or 0 if the IO
+    /// service has been stopped.
+    ///
+    /// @throw This method does not throw directly, but the execution of
+    /// callbacks invoked in response to IO events might.  If so, these
+    /// will propagate upward out of this method.
+    virtual size_t runIO();
+
+    /// @brief Indicates whether or not the process can perform a shutdown.
+    ///
+    /// Determines if the process has been instructed to shutdown and if
+    /// the criteria for performing the type of shutdown requested has been
+    /// met.
+    ///
+    /// @return Returns true if the criteria has been met, false otherwise.
+    virtual bool canShutdown() const;
+
+    /// @brief Sets queue reconfigure indicator to the given value.
+    ///
+    /// @param value is the new value to assign to the indicator
+    ///
+    /// @note this method is really only intended for testing purposes.
+    void setReconfQueueFlag(const bool value) {
+        reconf_queue_flag_ = value;
+    }
+
+    /// @brief Sets the shutdown type to the given value.
+    ///
+    /// @param value is the new value to assign to shutdown type.
+    ///
+    /// @note this method is really only intended for testing purposes.
+    void setShutdownType(const ShutdownType& value) {
+        shutdown_type_ = value;
+    }
+
+public:
+    /// @brief Returns a pointer to the configuration manager.
+    /// Note, this method cannot return a reference as it uses dynamic
+    /// pointer casting of the base class configuration manager.
+    D2CfgMgrPtr getD2CfgMgr();
+
+    /// @brief Returns a reference to the queue manager.
+    const D2QueueMgrPtr& getD2QueueMgr() const {
+        return (queue_mgr_);
+    }
+
+    /// @brief Returns a reference to the update manager.
+    const D2UpdateMgrPtr& getD2UpdateMgr() const {
+        return (update_mgr_);
+    }
+
+    /// @brief Returns true if the queue manager should be reconfigured.
+    bool getReconfQueueFlag() const {
+        return (reconf_queue_flag_);
+    }
+
+    /// @brief Returns the type of shutdown requested.
+    ///
+    /// Note, this value is meaningless unless shouldShutdown() returns true.
+    ShutdownType getShutdownType() const {
+        return (shutdown_type_);
+    }
+
+    /// @brief Returns a text label for the given shutdown type.
+    ///
+    /// @param type the numerical shutdown type for which the label is desired.
+    ///
+    /// @return A text label corresponding the value or "invalid" if the
+    /// value is not a valid value.
+    static const char* getShutdownTypeStr(const ShutdownType& type);
+
+private:
+    /// @brief Pointer to our queue manager instance.
+    D2QueueMgrPtr queue_mgr_;
+
+    /// @brief Pointer to our update manager instance.
+    D2UpdateMgrPtr update_mgr_;
+
+    /// @brief Indicates if the queue manager should be reconfigured.
+    bool reconf_queue_flag_;
+
+    /// @brief Indicates the type of shutdown requested.
+    ShutdownType shutdown_type_;
 };
 
+/// @brief Defines a shared pointer to D2Process.
+typedef boost::shared_ptr<D2Process> D2ProcessPtr;
+
 }; // namespace isc::d2
 }; // namespace isc
 

+ 85 - 42
src/bin/d2/d2_queue_mgr.cc

@@ -25,48 +25,71 @@ const size_t D2QueueMgr::MAX_QUEUE_DEFAULT;
 D2QueueMgr::D2QueueMgr(isc::asiolink::IOService& io_service,
                        const size_t max_queue_size)
     : io_service_(io_service), max_queue_size_(max_queue_size),
-      mgr_state_(NOT_INITTED) {
+      mgr_state_(NOT_INITTED), target_stop_state_(NOT_INITTED) {
     // Use setter to do validation.
     setMaxQueueSize(max_queue_size);
 }
 
 D2QueueMgr::~D2QueueMgr() {
-    // clean up
-    try {
-        stopListening();
-    } catch (...) {
-        // This catch is strictly for safety's sake, in case a future
-        // implementation isn't tidy or careful. 
-    }
 }
 
 void
 D2QueueMgr::operator()(const dhcp_ddns::NameChangeListener::Result result,
                        dhcp_ddns::NameChangeRequestPtr& ncr) {
-    // Note that error conditions must be handled here without throwing
-    // exceptions. Remember this is the application level "link" in the
-    // callback chain.  Throwing an exception here will "break" the
-    // io_service "run" we are operating under.  With that in mind,
-    // if we hit a problem, we will stop the listener transition to
-    // the appropriate stopped state.  Upper layer(s) must monitor our
-    // state as well as our queue size.
-
-    // If the receive was successful, attempt to queue the request.
-    if (result == dhcp_ddns::NameChangeListener::SUCCESS) {
-        if (getQueueSize() < getMaxQueueSize()) {
-            // There's room on the queue, add to the end
-            enqueue(ncr);
-            return;
+    try {
+        // Note that error conditions must be handled here without throwing
+        // exceptions. Remember this is the application level "link" in the
+        // callback chain.  Throwing an exception here will "break" the
+        // io_service "run" we are operating under.  With that in mind,
+        // if we hit a problem, we will stop the listener transition to
+        // the appropriate stopped state.  Upper layer(s) must monitor our
+        // state as well as our queue size.
+        switch (result) {
+        case dhcp_ddns::NameChangeListener::SUCCESS:
+            // Receive was successful, attempt to queue the request.
+            if (getQueueSize() < getMaxQueueSize()) {
+                // There's room on the queue, add to the end
+                enqueue(ncr);
+                return;
+            }
+
+            // Queue is full, stop the listener.
+            // Note that we can move straight to a STOPPED state as there
+            // is no receive in progress.
+            LOG_ERROR(dctl_logger, DHCP_DDNS_QUEUE_MGR_QUEUE_FULL)
+                      .arg(max_queue_size_);
+            stopListening(STOPPED_QUEUE_FULL);
+            break;
+
+        case dhcp_ddns::NameChangeListener::STOPPED:
+            if (mgr_state_ == STOPPING) {
+                // This is confirmation that the listener has stopped and its
+                // callback will not be called again, unless its restarted.
+                updateStopState();
+            } else {
+                // We should not get an receive complete status of stopped
+                // unless we canceled the read as part of stopping. Therefore
+                // this is unexpected so we will treat it as a receive error.
+                // This is most likely an unforeseen programmatic issue.
+                LOG_ERROR(dctl_logger, DHCP_DDNS_QUEUE_MGR_UNEXPECTED_STOP)
+                          .arg(mgr_state_);
+                stopListening(STOPPED_RECV_ERROR);
+            }
+
+            break;
+
+        default:
+            // Receive failed, stop the listener.
+            // Note that we can move straight to a STOPPED state as there
+            // is no receive in progress.
+            LOG_ERROR(dctl_logger, DHCP_DDNS_QUEUE_MGR_RECV_ERROR);
+            stopListening(STOPPED_RECV_ERROR);
+            break;
         }
-
-        // Queue is full, stop the listener.
-        stopListening(STOPPED_QUEUE_FULL);
-        LOG_ERROR(dctl_logger, DHCP_DDNS_QUEUE_MGR_QUEUE_FULL)
-                  .arg(max_queue_size_);
-    } else {
-        // Receive failed, stop the listener.
-        stopListening(STOPPED_RECV_ERROR);
-        LOG_ERROR(dctl_logger, DHCP_DDNS_QUEUE_MGR_RECV_ERROR);
+    } catch (const std::exception& ex) {
+        // On the outside chance a throw occurs, let's log it and swallow it.
+        LOG_ERROR(dctl_logger, DHCP_DDNS_QUEUE_MGR_UNEXPECTED_HANDLER_ERROR)
+                  .arg(ex.what());
     }
 }
 
@@ -112,26 +135,46 @@ D2QueueMgr::startListening() {
         isc_throw(D2QueueMgrError, "D2QueueMgr listener start failed: "
                   << ex.what());
     }
+
+    LOG_INFO (dctl_logger, DHCP_DDNS_QUEUE_MGR_STARTED);
 }
 
 void
-D2QueueMgr::stopListening(const State stop_state) {
-    // Note, stopListening is guaranteed not to throw.
+D2QueueMgr::stopListening(const State target_stop_state) {
     if (listener_) {
-        listener_->stopListening();
-    }
-
-    // Enforce only valid "stop" states.
-    if (stop_state != STOPPED && stop_state != STOPPED_QUEUE_FULL &&
-        stop_state != STOPPED_RECV_ERROR) {
+        // Enforce only valid "stop" states.
         // This is purely a programmatic error and should never happen.
-        isc_throw(D2QueueMgrError, "D2QueueMgr invalid value for stop state: "
-                  << stop_state);
+        if (target_stop_state != STOPPED &&
+            target_stop_state != STOPPED_QUEUE_FULL &&
+            target_stop_state != STOPPED_RECV_ERROR) {
+            isc_throw(D2QueueMgrError,
+                      "D2QueueMgr invalid value for stop state: "
+                      << target_stop_state);
+        }
+
+        // Remember the state we want to acheive.
+        target_stop_state_ = target_stop_state;
+
+        // Instruct the listener to stop.  If the listener reports that  it
+        // has IO pending, then we transition to STOPPING to wait for the
+        // cancellation event.  Otherwise, we can move directly to the targeted
+        // state.
+        listener_->stopListening();
+        if (listener_->isIoPending()) {
+            mgr_state_ = STOPPING;
+        } else {
+            updateStopState();
+        }
     }
+}
 
-    mgr_state_ = stop_state;
+void
+D2QueueMgr::updateStopState() {
+    mgr_state_ = target_stop_state_;
+    LOG_INFO (dctl_logger, DHCP_DDNS_QUEUE_MGR_STOPPED);
 }
 
+
 void
 D2QueueMgr::removeListener() {
     // Force our managing layer(s) to stop us properly first.

+ 26 - 6
src/bin/d2/d2_queue_mgr.h

@@ -108,6 +108,10 @@ public:
 ///     without any further steps.   This method may be called from the INITTED
 ///     or one of the STOPPED states.
 ///
+///     * STOPPING - The listener is in the process of stopping active
+///     listening. This is transitory state between RUNNING and STOPPED, which
+///     is completed by IO cancellation event.
+///
 ///     * STOPPED - The listener has been listening but has been stopped
 ///     without error. To return to listening, startListener() must be invoked.
 ///
@@ -144,6 +148,7 @@ public:
       NOT_INITTED,
       INITTED,
       RUNNING,
+      STOPPING,
       STOPPED_QUEUE_FULL,
       STOPPED_RECV_ERROR,
       STOPPED,
@@ -203,6 +208,10 @@ public:
     /// If the queue is at maximum capacity, stopListening() is invoked and
     /// the state is set to STOPPED_QUEUE_FULL.
     ///
+    /// If the result indicates IO stopped, then the state is set to STOPPED.
+    /// Note this is not an error, it results from a deliberate cancellation
+    /// of listener IO as part of a normal stopListener call.
+    ///
     /// If the result indicates a failed receive, stopListening() is invoked
     /// and the state is set to STOPPED_RECV_ERROR.
     ///
@@ -220,14 +229,18 @@ public:
 
     /// @brief Stops listening for requests.
     ///
-    /// Invokes the listener's stopListening method which should cause it to
-    /// cancel any pending IO and close its IO source.  It the sets the state
-    /// to the given value.
+    /// Invokes the listener's stopListening method which will cause it to
+    /// cancel any pending IO and close its IO source.  It the sets target
+    /// stop state to the given value.
     ///
-    /// @param stop_state is one of the three stopped state values.
+    /// If there is no IO pending, the manager state is immediately set to the
+    /// target stop state, otherwise the manager state is set to STOPPING.
+    ///
+    /// @param target_stop_state is one of the three stopped state values.
     ///
     /// @throw D2QueueMgrError if stop_state is a valid stop state.
-    void stopListening(const State stop_state = STOPPED);
+    void stopListening(const State target_stop_state = STOPPED);
+
 
     /// @brief Deletes the current listener
     ///
@@ -308,6 +321,12 @@ public:
     void clearQueue();
 
   private:
+    /// @brief Sets the manager state to the target stop state.
+    ///
+    /// Convenience method which sets the manager state to the target stop
+    /// state and logs that the manager is stopped.
+    void updateStopState();
+
     /// @brief IOService that our listener should use for IO management.
     isc::asiolink::IOService& io_service_;
 
@@ -323,7 +342,8 @@ public:
     /// @brief Current state of the manager.
     State mgr_state_;
 
-
+    /// @brief Tracks the state the manager should be in once stopped.
+    State target_stop_state_;
 };
 
 /// @brief Defines a pointer for manager instances.

+ 8 - 9
src/bin/d2/d_controller.cc

@@ -364,7 +364,7 @@ DControllerBase::executeCommand(const std::string& command,
     // as it may be supported there.
     isc::data::ConstElementPtr answer;
     if (command.compare(SHUT_DOWN_COMMAND) == 0) {
-        answer = shutdown();
+        answer = shutdown(args);
     } else {
         // It wasn't shutdown, so may be a custom controller command.
         int rcode = 0;
@@ -390,16 +390,15 @@ DControllerBase::customControllerCommand(const std::string& command,
 }
 
 isc::data::ConstElementPtr
-DControllerBase::shutdown() {
+DControllerBase::shutdown(isc::data::ConstElementPtr args) {
     if (process_) {
-        process_->shutdown();
-    } else {
-        // Not really a failure, but this condition is worth noting. In reality
-        // it should be pretty hard to cause this.
-        LOG_WARN(dctl_logger, DCTL_NOT_RUNNING).arg(app_name_);
-    }
+        return (process_->shutdown(args));
+    } 
 
-    return (isc::config::createAnswer(0, "Shutting down."));
+    // Not really a failure, but this condition is worth noting. In reality
+    // it should be pretty hard to cause this.
+    LOG_WARN(dctl_logger, DCTL_NOT_RUNNING).arg(app_name_);
+    return (isc::config::createAnswer(0, "Process has not been initialzed."));
 }
 
 void

+ 11 - 4
src/bin/d2/d_controller.h

@@ -487,13 +487,20 @@ private:
 
     /// @brief Initiates shutdown procedure.  This method is invoked
     /// by executeCommand in response to the shutdown command. It will invoke
-    /// the application process's shutdown method, which causes the process to
-    /// exit it's event loop.
+    /// the application process's shutdown method which causes the process to
+    /// to begin its shutdown process.
+    ///
+    /// Note, it is assumed that the process of shutting down is neither
+    /// instanteneous nor synchronous.  This method does not "block" waiting
+    /// until the process has halted.  Rather it is used to convey the
+    /// need to shutdown.  A successful return indicates that the shutdown
+    /// has successfully commenced, but does not indicate that the process
+    /// has actually exited. 
     ///
     /// @return returns an Element that contains the results of shutdown
-    /// attempt composed of an integer status value (0 means successful,
+    /// command composed of an integer status value (0 means successful,
     /// non-zero means failure), and a string explanation of the outcome.
-    isc::data::ConstElementPtr shutdown();
+    isc::data::ConstElementPtr shutdown(isc::data::ConstElementPtr args);
 
     /// @brief Prints the program usage text to std error.
     ///

+ 36 - 17
src/bin/d2/d_process.h

@@ -35,10 +35,17 @@ public:
         isc::Exception(file, line, what) { };
 };
 
+/// @brief String value for the shutdown command.
+static const std::string SHUT_DOWN_COMMAND("shutdown");
+
+/// @brief Returned by the process to indicate a command was successful.
 static const int COMMAND_SUCCESS = 0;
+
+/// @brief Returned by the process to indicates a command failed.
 static const int COMMAND_ERROR = 1;
+
+/// @brief Returned by the process to indicates a command is not valid.
 static const int COMMAND_INVALID = 2;
-static const std::string SHUT_DOWN_COMMAND("shutdown");
 
 /// @brief Application Process Interface
 ///
@@ -81,24 +88,31 @@ public:
     /// to application. It must be invoked prior to invoking run. This would
     /// likely include the creation of additional IO sources and their
     /// integration into the io_service.
-    /// @throw throws DProcessBaseError if the initialization fails.
+    /// @throw DProcessBaseError if the initialization fails.
     virtual void init() = 0;
 
     /// @brief Implements the process's event loop. In its simplest form it
     /// would an invocation io_service_->run().  This method should not exit
     /// until the process itself is exiting due to a request to shutdown or
     /// some anomaly is forcing an exit.
-    /// @throw throws DProcessBaseError if an operational error is encountered.
+    /// @throw DProcessBaseError if an operational error is encountered.
     virtual void run() = 0;
 
-    /// @brief Implements the process's shutdown processing. When invoked, it
-    /// should ensure that the process gracefully exits the run method.
-    /// The default implementation sets the shutdown flag and stops IOService.
-    /// @throw throws DProcessBaseError if an operational error is encountered.
-    virtual void shutdown() {
-        setShutdownFlag(true);
-        stopIOService();
-    };
+    /// @brief Initiates the process's shutdown process. 
+    /// 
+    /// This is last step in the shutdown event callback chain, that is 
+    /// intended to notify the process it is to begin its shutdown process.
+    ///
+    /// @param args an Element set of shutdown arguments (if any) that are
+    /// supported by the process derivation. 
+    /// 
+    /// @return an Element that contains the results of argument processing,
+    /// consisting of an integer status value (0 means successful, 
+    /// non-zero means failure), and a string explanation of the outcome. 
+    ///  
+    /// @throw DProcessBaseError if an operational error is encountered.
+    virtual isc::data::ConstElementPtr 
+        shutdown(isc::data::ConstElementPtr args) = 0;
 
     /// @brief Processes the given configuration.
     ///
@@ -125,7 +139,12 @@ public:
     /// @param args is a set of arguments (if any) required for the given
     /// command.
     /// @return an Element that contains the results of command composed
-    /// of an integer status value (0 means successful, non-zero means failure),
+    /// of an integer status value: 
+    ///
+    /// - COMMAND_SUCCESS indicates a command was successful.
+    /// - COMMAND_ERROR indicates a valid command failed execute.
+    /// - COMMAND_INVALID indicates a command is not valid.
+    ///
     /// and a string explanation of the outcome.
     virtual isc::data::ConstElementPtr command(
             const std::string& command, isc::data::ConstElementPtr args) = 0;
@@ -135,8 +154,8 @@ public:
 
     /// @brief Checks if the process has been instructed to shut down.
     ///
-    /// @return returns true if process shutdown flag is true.
-    bool shouldShutdown() {
+    /// @return true if process shutdown flag is true.
+    bool shouldShutdown() const {
         return (shut_down_flag_);
     }
 
@@ -149,14 +168,14 @@ public:
 
     /// @brief Fetches the application name.
     ///
-    /// @return returns a the application name string.
+    /// @return application name string.
     const std::string getAppName() const {
         return (app_name_);
     }
 
     /// @brief Fetches the controller's IOService.
     ///
-    /// @return returns a reference to the controller's IOService.
+    /// @return a reference to the controller's IOService.
     IOServicePtr& getIoService() {
         return (io_service_);
     }
@@ -171,7 +190,7 @@ public:
 
     /// @brief Fetches the process's configuration manager.
     ///
-    /// @return returns a reference to the configuration manager.
+    /// @return a reference to the configuration manager.
     DCfgMgrBasePtr& getCfgMgr() {
         return (cfg_mgr_);
     }

+ 8 - 1
src/bin/d2/dhcp-ddns.spec

@@ -195,8 +195,15 @@
     "commands": [
         {
             "command_name": "shutdown",
-            "command_description": "Shuts down DHCPv6 server.",
+            "command_description": "Shuts down b10-dhcp-ddns module server.",
             "command_args": [
+            {
+                "item_name": "type",
+                "item_type": "string",
+                "item_optional": true,
+                "item_default": "normal",
+                "item_description": "values: normal (default), now, or drain_first"
+            }
             ]
         }
     ]

+ 6 - 1
src/bin/d2/tests/d2_cfg_mgr_unittests.cc

@@ -416,7 +416,7 @@ TEST_F(TSIGKeyInfoTest, validTSIGKeyList) {
 
     // Verify the correct number of keys are present
     int count =  keys_->size();
-    ASSERT_EQ(count, 3);
+    ASSERT_EQ(3, count);
 
     // Find the 1st key and retrieve it.
     TSIGKeyInfoMap::iterator gotit = keys_->find("key1");
@@ -1007,6 +1007,11 @@ TEST_F(D2CfgMgrTest, fullConfig) {
         EXPECT_TRUE(servers);
         EXPECT_EQ(3, count);
     }
+
+    // Verify that parsing the exact same configuration a second time
+    // does not cause a duplicate value errors. 
+    answer_ = cfg_mgr_->parseConfig(config_set_);
+    ASSERT_TRUE(checkAnswer(0));
 }
 
 /// @brief Tests the basics of the D2CfgMgr FQDN-domain matching

+ 481 - 43
src/bin/d2/tests/d2_process_unittests.cc

@@ -15,8 +15,10 @@
 
 #include <config/ccsession.h>
 #include <d2/d2_process.h>
+#include <dhcp_ddns/ncr_io.h>
 #include <d_test_stubs.h>
 
+#include <boost/bind.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
 #include <gtest/gtest.h>
 
@@ -31,47 +33,114 @@ using namespace boost::posix_time;
 
 namespace {
 
+/// @brief Valid configuration containing an unavailable IP address.
+const char* bad_ip_d2_config = "{ "
+                        "\"interface\" : \"eth1\" , "
+                        "\"ip_address\" : \"1.1.1.1\" , "
+                        "\"port\" : 5031, "
+                        "\"tsig_keys\": ["
+                        "{ \"name\": \"d2_key.tmark.org\" , "
+                        "   \"algorithm\": \"md5\" ,"
+                        "   \"secret\": \"0123456989\" "
+                        "} ],"
+                        "\"forward_ddns\" : {"
+                        "\"ddns_domains\": [ "
+                        "{ \"name\": \"tmark.org\" , "
+                        "  \"key_name\": \"d2_key.tmark.org\" , "
+                        "  \"dns_servers\" : [ "
+                        "  { \"hostname\": \"one.tmark\" } "
+                        "] } ] }, "
+                        "\"reverse_ddns\" : {"
+                        "\"ddns_domains\": [ "
+                        "{ \"name\": \" 0.168.192.in.addr.arpa.\" , "
+                        "  \"key_name\": \"d2_key.tmark.org\" , "
+                        "  \"dns_servers\" : [ "
+                        "  { \"ip_address\": \"127.0.0.101\" , "
+                        "    \"port\": 100 } ] } "
+                        "] } }";
+
 /// @brief D2Process test fixture class
-class D2ProcessTest : public ::testing::Test {
+//class D2ProcessTest : public D2Process, public ::testing::Test {
+class D2ProcessTest : public D2Process, public ConfigParseTest {
 public:
 
-    /// @brief Static instance accessible via test callbacks.
-    static DProcessBasePtr process_;
-
     /// @brief Constructor
-    D2ProcessTest() {
-        io_service_.reset(new isc::asiolink::IOService());
-        process_.reset(new D2Process("TestProcess", io_service_));
+    D2ProcessTest() : D2Process("d2test",
+                                IOServicePtr(new isc::asiolink::IOService())) {
     }
 
     /// @brief Destructor
-    ~D2ProcessTest() {
-        io_service_.reset();
-        process_.reset();
+    virtual ~D2ProcessTest() {
     }
 
     /// @brief Callback that will invoke shutdown method.
-    static void genShutdownCallback() {
-        process_->shutdown();
+    void genShutdownCallback() {
+        shutdown(isc::data::ConstElementPtr());
     }
 
     /// @brief Callback that throws an exception.
-    static void genFatalErrorCallback() {
+    void genFatalErrorCallback() {
         isc_throw (DProcessBaseError, "simulated fatal error");
     }
 
-    /// @brief IOService for event processing. Fills in for IOService
-    /// supplied by management layer.
-    IOServicePtr io_service_;
-};
+    /// @brief Reconfigures and starts the queue manager given a configuration.
+    ///
+    /// This method emulates the reception of a new configuration and should
+    /// conclude with the Queue manager placed in the RUNNING state.
+    ///
+    /// @param config is the configuration to use
+    ///
+    /// @return Returns AssertionSuccess if the queue manager was successfully
+    /// reconfigured, AssertionFailure otherwise.
+    ::testing::AssertionResult runWithConfig(const char* config) {
+        int rcode = -1;
+        // Convert the string configuration into an Element set.
+        ::testing::AssertionResult res = fromJSON(config);
+        if (res != ::testing::AssertionSuccess()) {
+            return res;
+        }
+
+        isc::data::ConstElementPtr answer = configure(config_set_);
+        isc::data::ConstElementPtr comment;
+        comment = isc::config::parseAnswer(rcode, answer);
+
+        if (rcode) {
+            return (::testing::AssertionFailure(::testing::Message() <<
+                                                "configure() failed:"
+                                                << comment));
+        }
+
+        // Must call checkQueueStatus, to cause queue manager to reconfigure
+        // and start.
+        checkQueueStatus();
+        const D2QueueMgrPtr& queue_mgr = getD2QueueMgr();
 
-// Define the static process instance
-DProcessBasePtr D2ProcessTest::process_;
+        // If queue manager isn't in the RUNNING state, return failure.
+        if (D2QueueMgr::RUNNING !=  queue_mgr->getMgrState()) {
+            return (::testing::AssertionFailure(::testing::Message() <<
+                                               "queue manager did not start"));
+        }
 
+        //  Good to go.
+        return (::testing::AssertionSuccess());
+    }
+
+    /// @brief Checks if shutdown criteria would be met given a shutdown type.
+    ///
+    /// This method sets the D2Process shutdown type to the given value, and
+    /// calls the canShutdown() method, returning its return value.
+    ///
+    /// @return Returns the boolean result canShutdown.
+    bool checkCanShutdown(ShutdownType shutdown_type) {
+        setShutdownType(shutdown_type);
+        return (canShutdown());
+    }
+};
 
-/// @brief Verifies D2Process constructor behavior.
+/// @brief Verifies D2Process construction behavior.
 /// 1. Verifies that constructor fails with an invalid IOService
 /// 2. Verifies that constructor succeeds with a valid IOService
+/// 3. Verifies that all managers are accessible
 TEST(D2Process, construction) {
     // Verify that the constructor will fail if given an empty
     // io service.
@@ -81,26 +150,253 @@ TEST(D2Process, construction) {
     // Verify that the constructor succeeds with a valid io_service
     lcl_io_service.reset(new isc::asiolink::IOService());
     ASSERT_NO_THROW (D2Process("TestProcess", lcl_io_service));
+
+    // Verify that the configuration, queue, and update managers
+    // are all accessible after construction.
+    D2Process d2process("TestProcess", lcl_io_service);
+
+    D2CfgMgrPtr cfg_mgr = d2process.getD2CfgMgr();
+    ASSERT_TRUE(cfg_mgr);
+
+    D2QueueMgrPtr queue_mgr = d2process.getD2QueueMgr();
+    ASSERT_TRUE(queue_mgr);
+
+    const D2UpdateMgrPtr& update_mgr = d2process.getD2UpdateMgr();
+    ASSERT_TRUE(update_mgr);
 }
 
 /// @brief Verifies basic configure method behavior.
-///  This test is simplistic and will need to be augmented as configuration
-/// ability is implemented.
+/// This test primarily verifies that upon receipt of a new configuration,
+/// D2Process will reconfigure the queue manager if the configuration is valid,
+/// or leave queue manager unaffected if not.  Currently, the queue manager is
+/// only D2 component that must adapt to new configurations. Other components,
+/// such as Transactions will be unaffected as they are transient and use
+/// whatever configuration was in play at the time they were created.
+/// If other components need to provide "dynamic" configuration responses,
+/// those tests would need to be added.
 TEST_F(D2ProcessTest, configure) {
-    int rcode = -1;
+    // Verify the queue manager is not yet initialized.
+    D2QueueMgrPtr queue_mgr = getD2QueueMgr();
+    ASSERT_TRUE(queue_mgr);
+    ASSERT_EQ(D2QueueMgr::NOT_INITTED, queue_mgr->getMgrState());
+
+    // Verify that reconfigure queue manager flag is false.
+    ASSERT_FALSE(getReconfQueueFlag());
+
+    // Create a valid configuration set from text config.
+    ASSERT_TRUE(fromJSON(valid_d2_config));
+
+    // Invoke configure() with a valid D2 configuration.
+    isc::data::ConstElementPtr answer = configure(config_set_);
+
+    // Verify that configure result is success and reconfigure queue manager
+    // flag is true.
+    ASSERT_TRUE(checkAnswer(answer, 0));
+    ASSERT_TRUE(getReconfQueueFlag());
+
+    // Call checkQueueStatus, to cause queue manager to reconfigure and start.
+    checkQueueStatus();
+
+    // Verify that queue manager is now in the RUNNING state, and flag is false.
+    ASSERT_EQ(D2QueueMgr::RUNNING, queue_mgr->getMgrState());
+    ASSERT_FALSE(getReconfQueueFlag());
+
+    //  Create an invalid configuration set from text config.
+    ASSERT_TRUE(fromJSON("{ \"bogus\": 1000 } "));
+
+    // Invoke configure() with the invalid configuration.
+    answer = configure(config_set_);
 
-    // Use a small, valid D2 configuration to verify successful parsing.
-    isc::data::ElementPtr json = isc::data::Element::fromJSON(valid_d2_config);
-    isc::data::ConstElementPtr answer = process_->configure(json);
-    isc::config::parseAnswer(rcode, answer);
-    EXPECT_EQ(0, rcode);
-
-    // Use an invalid configuration to verify parsing error return.
-    string config = "{ \"bogus\": 1000 } ";
-    json = isc::data::Element::fromJSON(config);
-    answer = process_->configure(json);
-    isc::config::parseAnswer(rcode, answer);
-    EXPECT_EQ(1, rcode);
+    // Verify that configure result is failure, the reconfigure flag is
+    // false, and that the queue manager is still running.
+    ASSERT_TRUE(checkAnswer(answer, 1));
+    EXPECT_FALSE(getReconfQueueFlag());
+    EXPECT_EQ(D2QueueMgr::RUNNING, queue_mgr->getMgrState());
+}
+
+/// @brief Tests checkQueueStatus() logic for stopping the queue on shutdown
+/// This test manually sets shutdown flag and verifies that queue manager
+/// stop is initiated.
+TEST_F(D2ProcessTest, queueStopOnShutdown) {
+    ASSERT_TRUE(runWithConfig(valid_d2_config));
+    const D2QueueMgrPtr& queue_mgr = getD2QueueMgr();
+
+    setShutdownFlag(true);
+
+    // Calling checkQueueStatus restart queue manager
+    checkQueueStatus();
+
+    // Verify that the queue manager is stopping.
+    EXPECT_EQ(D2QueueMgr::STOPPING, queue_mgr->getMgrState());
+
+    // Verify that a subsequent call with no events occurring in between,
+    // results in no change to queue manager
+    checkQueueStatus();
+
+    // Verify that the queue manager is still stopping.
+    EXPECT_EQ(D2QueueMgr::STOPPING, queue_mgr->getMgrState());
+
+    // Call runIO so the IO cancel event occurs and verify that queue manager
+    // has stopped.
+    runIO();
+    ASSERT_EQ(D2QueueMgr::STOPPED, queue_mgr->getMgrState());
+}
+
+/// @brief Tests checkQueueStatus() logic for stopping the queue on reconfigure.
+/// This test manually sets queue reconfiguration flag and verifies that queue
+/// manager stop is initiated.
+TEST_F(D2ProcessTest, queueStopOnReconf) {
+    ASSERT_TRUE(runWithConfig(valid_d2_config));
+    const D2QueueMgrPtr& queue_mgr = getD2QueueMgr();
+
+    // Manually set the reconfigure indicator.
+    setReconfQueueFlag(true);
+
+    // Calling checkQueueStatus should initiate stopping the queue manager.
+    checkQueueStatus();
+
+    // Verify that the queue manager is stopping.
+    EXPECT_EQ(D2QueueMgr::STOPPING, queue_mgr->getMgrState());
+
+    // Call runIO so the IO cancel event occurs and verify that queue manager
+    // has stopped.
+    runIO();
+    ASSERT_EQ(D2QueueMgr::STOPPED, queue_mgr->getMgrState());
+}
+
+
+/// @brief Tests checkQueueStatus() logic for recovering from queue full
+/// This test manually creates a receive queue full condition and then
+/// "drains" the queue until the queue manager resumes listening.  This
+/// verifies D2Process's ability to recover from a queue full condition.
+TEST_F(D2ProcessTest, queueFullRecovery) {
+    // Valid test message, contents are unimportant.
+    const char* test_msg =
+        "{"
+        " \"change_type\" : 0 , "
+        " \"forward_change\" : true , "
+        " \"reverse_change\" : false , "
+        " \"fqdn\" : \"walah.walah.com\" , "
+        " \"ip_address\" : \"192.168.2.1\" , "
+        " \"dhcid\" : \"010203040A7F8E3D\" , "
+        " \"lease_expires_on\" : \"20130121132405\" , "
+        " \"lease_length\" : 1300 "
+        "}";
+
+    // Start queue manager with known good config.
+    ASSERT_TRUE(runWithConfig(valid_d2_config));
+    const D2QueueMgrPtr& queue_mgr = getD2QueueMgr();
+
+    // Set the maximum queue size to manageable number.
+    size_t max_queue_size = 5;
+    queue_mgr->setMaxQueueSize(max_queue_size);
+
+    // Manually enqueue max requests.
+    dhcp_ddns::NameChangeRequestPtr ncr;
+    ASSERT_NO_THROW(ncr = dhcp_ddns::NameChangeRequest::fromJSON(test_msg));
+    for (int i = 0; i < max_queue_size; i++) {
+        // Verify that the request can be added to the queue and queue
+        // size increments accordingly.
+        ASSERT_NO_THROW(queue_mgr->enqueue(ncr));
+        ASSERT_EQ(i+1, queue_mgr->getQueueSize());
+    }
+
+    // Since we are not really receiving, we will simulate QUEUE FULL
+    // detection.
+    queue_mgr->stopListening(D2QueueMgr::STOPPED_QUEUE_FULL);
+    ASSERT_EQ(D2QueueMgr::STOPPING, queue_mgr->getMgrState());
+
+    // Call runIO so the IO cancel event occurs and verify that queue manager
+    // has stopped.
+    runIO();
+    ASSERT_EQ(D2QueueMgr::STOPPED_QUEUE_FULL, queue_mgr->getMgrState());
+
+    // Dequeue requests one at a time, calling checkQueueStatus after each
+    // dequeue, until we reach the resume threshold.  This simulates update
+    // manager consuming jobs.  Queue manager should remain stopped during
+    // this loop.
+    int resume_threshold = (max_queue_size * QUEUE_RESTART_PERCENT);
+    while (queue_mgr->getQueueSize() > resume_threshold) {
+        checkQueueStatus();
+        ASSERT_EQ(D2QueueMgr::STOPPED_QUEUE_FULL, queue_mgr->getMgrState());
+        ASSERT_NO_THROW(queue_mgr->dequeue());
+    }
+
+    // Dequeue one more, which brings us under the threshold and call
+    // checkQueueStatus.
+    // Verify that the queue manager is again running.
+    ASSERT_NO_THROW(queue_mgr->dequeue());
+    checkQueueStatus();
+    EXPECT_EQ(D2QueueMgr::RUNNING, queue_mgr->getMgrState());
+}
+
+/// @brief Tests checkQueueStatus() logic for queue receive error recovery
+/// This test manually creates a queue receive error condition and tests
+/// verifies that checkQueueStatus reacts properly to recover.
+TEST_F(D2ProcessTest, queueErrorRecovery) {
+    ASSERT_TRUE(runWithConfig(valid_d2_config));
+    const D2QueueMgrPtr& queue_mgr = getD2QueueMgr();
+
+    // Since we are not really receiving, we have to stage an error.
+    queue_mgr->stopListening(D2QueueMgr::STOPPED_RECV_ERROR);
+    ASSERT_EQ(D2QueueMgr::STOPPING, queue_mgr->getMgrState());
+
+    // Call runIO so the IO cancel event occurs and verify that queue manager
+    // has stopped.
+    runIO();
+    ASSERT_EQ(D2QueueMgr::STOPPED_RECV_ERROR, queue_mgr->getMgrState());
+
+    // Calling checkQueueStatus should restart queue manager
+    checkQueueStatus();
+
+    // Verify that queue manager is again running.
+    EXPECT_EQ(D2QueueMgr::RUNNING, queue_mgr->getMgrState());
+}
+
+/// @brief Verifies queue manager recovery from unusable configuration
+/// This test checks D2Process's gracefully handle a configuration which
+/// while valid is not operationally usable (i.e. IP address is unavailable),
+/// and to subsequently recover given a usable configuration.
+TEST_F(D2ProcessTest, badConfigureRecovery) {
+    D2QueueMgrPtr queue_mgr = getD2QueueMgr();
+    ASSERT_TRUE(queue_mgr);
+
+    // Verify the queue manager is not initialized.
+    EXPECT_EQ(D2QueueMgr::NOT_INITTED, queue_mgr->getMgrState());
+
+    // Invoke configure() with a valid config that contains an unusable IP
+    ASSERT_TRUE(fromJSON(bad_ip_d2_config));
+    isc::data::ConstElementPtr answer = configure(config_set_);
+
+    // Verify that configure result is success and reconfigure queue manager
+    // flag is true.
+    ASSERT_TRUE(checkAnswer(answer, 0));
+    ASSERT_TRUE(getReconfQueueFlag());
+
+    // Call checkQueueStatus to cause queue manager to attempt to reconfigure.
+    checkQueueStatus();
+
+    // Verify that queue manager  failed to start, (i.e. is in INITTED state),
+    // and the the reconfigure flag is false.
+    ASSERT_EQ(D2QueueMgr::INITTED, queue_mgr->getMgrState());
+    ASSERT_FALSE(getReconfQueueFlag());
+
+    // Verify we can recover given a valid config with an usable IP address.
+    ASSERT_TRUE(fromJSON(valid_d2_config));
+    answer = configure(config_set_);
+
+    // Verify that configure result is success and reconfigure queue manager
+    // flag is true.
+    ASSERT_TRUE(checkAnswer(answer, 0));
+    ASSERT_TRUE(getReconfQueueFlag());
+
+    // Call checkQueueStatus to cause queue manager to reconfigure and start.
+    checkQueueStatus();
+
+    // Verify that queue manager is now in the RUNNING state, and reconfigure
+    // flag is false.
+    EXPECT_EQ(D2QueueMgr::RUNNING, queue_mgr->getMgrState());
+    EXPECT_FALSE(getReconfQueueFlag());
 }
 
 /// @brief Verifies basic command method behavior.
@@ -112,23 +408,163 @@ TEST_F(D2ProcessTest, command) {
     int rcode = -1;
     string args = "{ \"arg1\": 77 } ";
     isc::data::ElementPtr json = isc::data::Element::fromJSON(args);
-    isc::data::ConstElementPtr answer =
-                                    process_->command("bogus_command", json);
+    isc::data::ConstElementPtr answer = command("bogus_command", json);
     parseAnswer(rcode, answer);
     EXPECT_EQ(COMMAND_INVALID, rcode);
 }
 
+/// @brief Tests shutdown command argument parsing
+/// The shutdown command supports an optional "type" argument. This test
+/// checks that for valid values, the shutdown() method: sets the shutdown
+/// type to correct value, set the shutdown flag to true, and returns a
+/// success response; and for invalid values: sets the shutdown flag to false
+/// and returns a failure response.
+TEST_F(D2ProcessTest, shutdownArgs) {
+    isc::data::ElementPtr args;
+    isc::data::ConstElementPtr answer;
+    const char* default_args = "{}";
+    const char* normal_args =  "{ \"type\" : \"normal\" }";
+    const char* drain_args = "{ \"type\" : \"drain_first\" }";
+    const char* now_args = "{ \"type\" : \"now\" }";
+    const char* bogus_args = "{ \"type\" : \"bogus\" }";
+
+    // Verify defaulting to SD_NORMAL if no argument is given.
+    ASSERT_NO_THROW(args = isc::data::Element::fromJSON(default_args));
+    EXPECT_NO_THROW(answer = shutdown(args));
+    ASSERT_TRUE(checkAnswer(answer, 0));
+    EXPECT_EQ(SD_NORMAL, getShutdownType());
+    EXPECT_TRUE(shouldShutdown());
+
+    // Verify argument value "normal".
+    ASSERT_NO_THROW(args = isc::data::Element::fromJSON(normal_args));
+    EXPECT_NO_THROW(answer = shutdown(args));
+    ASSERT_TRUE(checkAnswer(answer, 0));
+    EXPECT_EQ(SD_NORMAL, getShutdownType());
+    EXPECT_TRUE(shouldShutdown());
+
+    // Verify argument value "drain_first".
+    ASSERT_NO_THROW(args = isc::data::Element::fromJSON(drain_args));
+    EXPECT_NO_THROW(answer = shutdown(args));
+    ASSERT_TRUE(checkAnswer(answer, 0));
+    EXPECT_EQ(SD_DRAIN_FIRST, getShutdownType());
+    EXPECT_TRUE(shouldShutdown());
+
+    // Verify argument value "now".
+    ASSERT_NO_THROW(args = isc::data::Element::fromJSON(now_args));
+    EXPECT_NO_THROW(answer = shutdown(args));
+    ASSERT_TRUE(checkAnswer(answer, 0));
+    EXPECT_EQ(SD_NOW, getShutdownType());
+    EXPECT_TRUE(shouldShutdown());
+
+    // Verify correct handling of an invalid value.
+    ASSERT_NO_THROW(args = isc::data::Element::fromJSON(bogus_args));
+    EXPECT_NO_THROW(answer = shutdown(args));
+    ASSERT_TRUE(checkAnswer(answer, 1));
+    EXPECT_FALSE(shouldShutdown());
+}
+
+/// @brief Tests shutdown criteria logic
+/// D2Process using the method canShutdown() to determine if a shutdown
+/// can be performed given the value of the shutdown flag and the type of
+/// shutdown requested.  For each shutdown type certain criteria must be met
+/// before the shutdown is permitted.  This method is invoked once each pass
+/// through the main event loop.  This test checks the operation of the
+/// canShutdown method.  It uses a convenience method, checkCanShutdown(),
+/// which sets the shutdown type to the given value and invokes canShutdown(),
+/// returning its result.
+TEST_F(D2ProcessTest, canShutdown) {
+    ASSERT_TRUE(runWithConfig(valid_d2_config));
+    const D2QueueMgrPtr& queue_mgr = getD2QueueMgr();
+
+    // Shutdown flag is false.  Method should return false for all types.
+    EXPECT_FALSE(checkCanShutdown(SD_NORMAL));
+    EXPECT_FALSE(checkCanShutdown(SD_DRAIN_FIRST));
+    EXPECT_FALSE(checkCanShutdown(SD_NOW));
+
+    // Set shutdown flag to true.
+    setShutdownFlag(true);
+
+    // Queue Manager is running, queue is empty, no transactions.
+    // Only SD_NOW should return true.
+    EXPECT_FALSE(checkCanShutdown(SD_NORMAL));
+    EXPECT_FALSE(checkCanShutdown(SD_DRAIN_FIRST));
+    EXPECT_TRUE(checkCanShutdown(SD_NOW));
+
+    // Tell queue manager to stop.
+    queue_mgr->stopListening();
+    // Verify that the queue manager is stopping.
+    EXPECT_EQ(D2QueueMgr::STOPPING, queue_mgr->getMgrState());
+
+    // Queue Manager is stopping, queue is empty, no transactions.
+    // Only SD_NOW should return true.
+    EXPECT_FALSE(checkCanShutdown(SD_NORMAL));
+    EXPECT_FALSE(checkCanShutdown(SD_DRAIN_FIRST));
+    EXPECT_TRUE(checkCanShutdown(SD_NOW));
+
+    // Allow cancel event to process.
+    ASSERT_NO_THROW(runIO());
+    // Verify that queue manager is stopped.
+    EXPECT_EQ(D2QueueMgr::STOPPED, queue_mgr->getMgrState());
+
+    // Queue Manager is stopped, queue is empty, no transactions.
+    // All types should return true.
+    EXPECT_TRUE(checkCanShutdown(SD_NORMAL));
+    EXPECT_TRUE(checkCanShutdown(SD_DRAIN_FIRST));
+    EXPECT_TRUE(checkCanShutdown(SD_NOW));
+
+    const char* test_msg =
+        "{"
+        " \"change_type\" : 0 , "
+        " \"forward_change\" : true , "
+        " \"reverse_change\" : false , "
+        " \"fqdn\" : \"fish.tmark.org\" , "
+        " \"ip_address\" : \"192.168.2.1\" , "
+        " \"dhcid\" : \"010203040A7F8E3D\" , "
+        " \"lease_expires_on\" : \"20130121132405\" , "
+        " \"lease_length\" : 1300 "
+        "}";
+
+    // Manually enqueue a request.  This lets us test logic with queue
+    // not empty.
+    dhcp_ddns::NameChangeRequestPtr ncr;
+    ASSERT_NO_THROW(ncr = dhcp_ddns::NameChangeRequest::fromJSON(test_msg));
+    ASSERT_NO_THROW(queue_mgr->enqueue(ncr));
+    ASSERT_EQ(1, queue_mgr->getQueueSize());
+
+    // Queue Manager is stopped. Queue is not empty, no transactions.
+    // SD_DRAIN_FIRST should be false, SD_NORMAL and SD_NOW should be true.
+    EXPECT_TRUE(checkCanShutdown(SD_NORMAL));
+    EXPECT_FALSE(checkCanShutdown(SD_DRAIN_FIRST));
+    EXPECT_TRUE(checkCanShutdown(SD_NOW));
+
+    // Now use update manager to dequeue the request and make a transaction.
+    // This lets us verify transaction list not empty logic.
+    const D2UpdateMgrPtr& update_mgr = getD2UpdateMgr();
+    ASSERT_TRUE(update_mgr);
+    ASSERT_NO_THROW(update_mgr->sweep());
+    ASSERT_EQ(0, queue_mgr->getQueueSize());
+    ASSERT_EQ(1, update_mgr->getTransactionCount());
+
+    // Queue Manager is stopped. Queue is empty, one transaction.
+    // Only SD_NOW should be true.
+    EXPECT_FALSE(checkCanShutdown(SD_NORMAL));
+    EXPECT_FALSE(checkCanShutdown(SD_DRAIN_FIRST));
+    EXPECT_TRUE(checkCanShutdown(SD_NOW));
+}
+
+
 /// @brief Verifies that an "external" call to shutdown causes the run method
 /// to exit gracefully.
 TEST_F(D2ProcessTest, normalShutdown) {
     // Use an asiolink IntervalTimer and callback to generate the
     // shutdown invocation. (Note IntervalTimer setup is in milliseconds).
-    isc::asiolink::IntervalTimer timer(*io_service_);
-    timer.setup(genShutdownCallback, 2 * 1000);
+    isc::asiolink::IntervalTimer timer(*getIoService());
+    timer.setup(boost::bind(&D2ProcessTest::genShutdownCallback, this),
+                2 * 1000);
 
     // Record start time, and invoke run().
     ptime start = microsec_clock::universal_time();
-    EXPECT_NO_THROW(process_->run());
+    EXPECT_NO_THROW(run());
 
     // Record stop time.
     ptime stop = microsec_clock::universal_time();
@@ -141,17 +577,19 @@ TEST_F(D2ProcessTest, normalShutdown) {
                 elapsed.total_milliseconds() <= 2100);
 }
 
+
 /// @brief Verifies that an "uncaught" exception thrown during event loop
 /// execution is treated as a fatal error.
 TEST_F(D2ProcessTest, fatalErrorShutdown) {
     // Use an asiolink IntervalTimer and callback to generate the
     // the exception.  (Note IntervalTimer setup is in milliseconds).
-    isc::asiolink::IntervalTimer timer(*io_service_);
-    timer.setup(genFatalErrorCallback, 2 * 1000);
+    isc::asiolink::IntervalTimer timer(*getIoService());
+    timer.setup(boost::bind(&D2ProcessTest::genFatalErrorCallback, this),
+                2 * 1000);
 
     // Record start time, and invoke run().
     ptime start = microsec_clock::universal_time();
-    EXPECT_THROW(process_->run(), DProcessBaseError);
+    EXPECT_THROW(run(), DProcessBaseError);
 
     // Record stop time.
     ptime stop = microsec_clock::universal_time();

+ 11 - 1
src/bin/d2/tests/d2_queue_mgr_unittests.cc

@@ -289,9 +289,14 @@ TEST_F (QueueMgrUDPTest, stateModel) {
     EXPECT_NO_THROW(queue_mgr_->startListening());
     EXPECT_EQ(D2QueueMgr::RUNNING, queue_mgr_->getMgrState());
 
-    // Verify that we can move from RUNNING to STOPPED by stopping the
+    // Verify that we can move from RUNNING to STOPPING by stopping the
     // listener.
     EXPECT_NO_THROW(queue_mgr_->stopListening());
+    EXPECT_EQ(D2QueueMgr::STOPPING, queue_mgr_->getMgrState());
+
+    // Stopping requires IO cancel, which result in a callback.
+    // So process one event and verify we are STOPPED.
+    io_service_.run_one();
     EXPECT_EQ(D2QueueMgr::STOPPED, queue_mgr_->getMgrState());
 
     // Verify that we can re-enter the RUNNING from STOPPED by starting the
@@ -304,6 +309,11 @@ TEST_F (QueueMgrUDPTest, stateModel) {
 
     // Stop the listener.
     EXPECT_NO_THROW(queue_mgr_->stopListening());
+    EXPECT_EQ(D2QueueMgr::STOPPING, queue_mgr_->getMgrState());
+
+    // Stopping requires IO cancel, which result in a callback.
+    // So process one event and verify we are STOPPED.
+    io_service_.run_one();
     EXPECT_EQ(D2QueueMgr::STOPPED, queue_mgr_->getMgrState());
 
     // Verify that we can remove the listener in the STOPPED state and

+ 7 - 5
src/bin/d2/tests/d_test_stubs.cc

@@ -23,8 +23,8 @@ namespace d2 {
 
 const char* valid_d2_config = "{ "
                         "\"interface\" : \"eth1\" , "
-                        "\"ip_address\" : \"192.168.1.33\" , "
-                        "\"port\" : 88 , "
+                        "\"ip_address\" : \"127.0.0.1\" , "
+                        "\"port\" : 5031, "
                         "\"tsig_keys\": ["
                         "{ \"name\": \"d2_key.tmark.org\" , "
                         "   \"algorithm\": \"md5\" ,"
@@ -83,14 +83,16 @@ DStubProcess::run() {
     }
 };
 
-void
-DStubProcess::shutdown() {
+isc::data::ConstElementPtr
+DStubProcess::shutdown(isc::data::ConstElementPtr /* args */) {
     if (SimFailure::shouldFailOn(SimFailure::ftProcessShutdown)) {
         // Simulates a failure during shutdown process.
         isc_throw(DProcessBaseError, "DStubProcess simulated shutdown failure");
     }
 
-    DProcessBase::shutdown();
+    setShutdownFlag(true);
+    stopIOService();
+    return (isc::config::createAnswer(0, "Shutdown inititiated."));
 }
 
 isc::data::ConstElementPtr

+ 20 - 7
src/bin/d2/tests/d_test_stubs.h

@@ -135,10 +135,11 @@ public:
     /// indicate an abnormal termination.
     virtual void run();
 
-    /// @brief Implements the process shutdown procedure. Currently this is
-    /// limited to setting the instance shutdown flag, which is monitored in
-    /// run().
-    virtual void shutdown();
+    /// @brief Implements the process shutdown procedure. 
+    ///
+    /// This sets the instance shutdown flag monitored by run()  and stops 
+    /// the IO service.
+    virtual isc::data::ConstElementPtr shutdown(isc::data::ConstElementPtr);
 
     /// @brief Processes the given configuration.
     ///
@@ -604,7 +605,7 @@ public:
     /// convert.
     /// @return returns AssertionSuccess if there were no parsing errors,
     /// AssertionFailure otherwise.
-    ::testing::AssertionResult fromJSON(std::string& json_text) {
+    ::testing::AssertionResult fromJSON(const std::string& json_text) {
         try  {
             config_set_ = isc::data::Element::fromJSON(json_text);
         } catch (const isc::Exception &ex) {
@@ -616,7 +617,6 @@ public:
         return (::testing::AssertionSuccess());
     }
 
-
     /// @brief Compares the status in the  parse result stored in member
     /// variable answer_ to a given value.
     ///
@@ -625,9 +625,22 @@ public:
     /// @return returns AssertionSuccess if there were no parsing errors,
     /// AssertionFailure otherwise.
     ::testing::AssertionResult checkAnswer(int should_be) {
+        return (checkAnswer(answer_, should_be));
+    }
+
+    /// @brief Compares the status in the given parse result to a given value.
+    ///
+    /// @param answer Element set containing an integer response and string
+    /// comment. 
+    /// @param should_be is an integer against which to compare the status.
+    ///
+    /// @return returns AssertionSuccess if there were no parsing errors,
+    /// AssertionFailure otherwise.
+    ::testing::AssertionResult checkAnswer(isc::data::ConstElementPtr answer,
+                                           int should_be) {
         int rcode = 0;
         isc::data::ConstElementPtr comment;
-        comment = isc::config::parseAnswer(rcode, answer_);
+        comment = isc::config::parseAnswer(rcode, answer);
         if (rcode == should_be) {
             return (testing::AssertionSuccess());
         }

+ 13 - 0
src/lib/dhcp_ddns/dhcp_ddns_messages.mes

@@ -45,11 +45,24 @@ start another send after completing the send of the previous request.  While
 possible, this is highly unlikely and is probably a programmatic error.  The
 application should recover on its own.
 
+% DHCP_DDNS_NCR_UDP_RECV_CANCELED UDP socket receive was canceled while listening for DNS Update requests: %1
+This is an informational  message indicating that the listening over a UDP socket for DNS update requests has been canceled.  This is a normal part of suspending listening operations.
+
 % DHCP_DDNS_NCR_UDP_RECV_ERROR UDP socket receive error while listening for DNS Update requests: %1
 This is an error message indicating that an IO error occurred while listening
 over a UDP socket for DNS update requests. This could indicate a network
 connectivity or system resource issue.
 
+% DHCP_DDNS_NCR_UDP_SEND_CANCELED UDP socket send was canceled while sending a DNS Update request to DHCP_DDNS: %1
+This is an informational message indicating that sending requests via UDP
+socket to DHCP_DDNS has been interrupted. This is a normal part of suspending
+send operations.
+
+% DHCP_DDNS_NCR_UDP_SEND_ERROR UDP socket send error while sending a DNS Update request: %1
+This is an error message indicating that an IO error occurred while sending a
+DNS update request to DHCP_DDNS over a UDP socket.  This could indicate a
+network connectivity or system resource issue.
+
 % DHCP_DDNS_UNCAUGHT_NCR_RECV_HANDLER_ERROR unexpected exception thrown from the application receive completion handler: %1
 This is an error message that indicates that an exception was thrown but not
 caught in the application's request receive completion handler.  This is a

+ 14 - 5
src/lib/dhcp_ddns/ncr_io.cc

@@ -22,9 +22,10 @@ namespace dhcp_ddns {
 
 NameChangeListener::NameChangeListener(RequestReceiveHandler&
                                        recv_handler)
-    : listening_(false), recv_handler_(recv_handler) {
+    : listening_(false), io_pending_(false), recv_handler_(recv_handler) {
 };
 
+
 void
 NameChangeListener::startListening(isc::asiolink::IOService& io_service) {
     if (amListening()) {
@@ -45,7 +46,7 @@ NameChangeListener::startListening(isc::asiolink::IOService& io_service) {
 
     // Start the first asynchronous receive.
     try {
-        doReceive();
+        receiveNext();
     } catch (const isc::Exception& ex) {
         stopListening();
         isc_throw(NcrListenerReceiveError, "doReceive failed:" << ex.what());
@@ -53,6 +54,12 @@ NameChangeListener::startListening(isc::asiolink::IOService& io_service) {
 }
 
 void
+NameChangeListener::receiveNext() {
+    io_pending_ = true;
+    doReceive();
+}
+
+void
 NameChangeListener::stopListening() {
     try {
         // Call implementation dependent close.
@@ -77,6 +84,7 @@ NameChangeListener::invokeRecvHandler(const Result result,
     // not supposed to throw, but in the event it does we will at least
     // report it.
     try {
+        io_pending_ = false;
         recv_handler_(result, ncr);
     } catch (const std::exception& ex) {
         LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_UNCAUGHT_NCR_RECV_HANDLER_ERROR)
@@ -88,7 +96,7 @@ NameChangeListener::invokeRecvHandler(const Result result,
     // we need to check that first.
     if (amListening()) {
         try {
-            doReceive();
+            receiveNext();
         } catch (const isc::Exception& ex) {
             // It is possible though unlikely, for doReceive to fail without
             // scheduling the read. While, unlikely, it does mean the callback
@@ -105,6 +113,7 @@ NameChangeListener::invokeRecvHandler(const Result result,
             // report it.
             NameChangeRequestPtr empty;
             try {
+                io_pending_ = false;
                 recv_handler_(ERROR, empty);
             } catch (const std::exception& ex) {
                 LOG_ERROR(dhcp_ddns_logger,
@@ -159,7 +168,7 @@ NameChangeSender::stopSending() {
     } catch (const isc::Exception &ex) {
         // Swallow exceptions. If we have some sort of error we'll log
         // it but we won't propagate the throw.
-        LOG_ERROR(dhcp_ddns_logger, 
+        LOG_ERROR(dhcp_ddns_logger,
                   DHCP_DDNS_NCR_SEND_CLOSE_ERROR).arg(ex.what());
     }
 
@@ -256,7 +265,7 @@ NameChangeSender::invokeSendHandler(const NameChangeSender::Result result) {
         try {
             send_handler_(ERROR, ncr_to_send_);
         } catch (const std::exception& ex) {
-            LOG_ERROR(dhcp_ddns_logger, 
+            LOG_ERROR(dhcp_ddns_logger,
                       DHCP_DDNS_UNCAUGHT_NCR_SEND_HANDLER_ERROR).arg(ex.what());
         }
     }

+ 63 - 30
src/lib/dhcp_ddns/ncr_io.h

@@ -129,10 +129,11 @@ public:
 /// listener derivation performs the steps necessary to prepare its IO source
 /// for reception (e.g. opening a socket, connecting to a database).
 ///
-/// Assuming the open is successful, startListener will call the virtual
-/// doReceive() method.  The listener derivation uses this method to
-/// instigate an IO layer asynchronous passing in its IO layer callback to
-/// handle receive events from its IO source.
+/// Assuming the open is successful, startListener will call receiveNext, to
+/// initiate an asynchronous receive.  This method calls the virtual method,
+/// doReceive().  The listener derivation uses doReceive to instigate an IO
+/// layer asynchronous receieve passing in its IO layer callback to
+/// handle receive events from the IO source.
 ///
 /// As stated earlier, the derivation's NameChangeRequest completion handler
 /// MUST invoke the application layer handler registered with the listener.
@@ -189,8 +190,8 @@ public:
     /// @brief Prepares the IO for reception and initiates the first receive.
     ///
     /// Calls the derivation's open implementation to initialize the IO layer
-    /// source for receiving inbound requests.  If successful, it issues first
-    /// asynchronous read by calling the derivation's doReceive implementation.
+    /// source for receiving inbound requests.  If successful, it starts the
+    /// first asynchronous read by receiveNext.
     ///
     /// @param io_service is the IOService that will handle IO event processing.
     ///
@@ -204,6 +205,18 @@ public:
     /// as not listening.
     void stopListening();
 
+protected:
+    /// @brief Initiates an asynchronous receive
+    ///
+    /// Sets context information to indicate that IO is in progress and invokes
+    /// the derivation's asynchronous receive method, doReceive.  Note doReceive
+    /// should not be called outside this method to ensure context information
+    /// integrity.
+    ///
+    /// @throw Derivation's doReceive method may throw isc::Exception upon
+    /// error.
+    void receiveNext();
+
     /// @brief Calls the NCR receive handler registered with the listener.
     ///
     /// This is the hook by which the listener's caller's NCR receive handler
@@ -261,14 +274,30 @@ public:
     /// throw it as an isc::Exception or derivative.
     virtual void doReceive() = 0;
 
+public:
     /// @brief Returns true if the listener is listening, false otherwise.
     ///
     /// A true value indicates that the IO source has been opened successfully,
-    /// and that receive loop logic is active.
+    /// and that receive loop logic is active.  This implies that closing the
+    /// IO source will interrupt that operation, resulting in a callback
+    /// invocation.
     bool amListening() const {
         return (listening_);
     }
 
+    /// @brief Returns true if the listener has an IO call in progress.
+    ///
+    /// A true value indicates that the listener has an asynchronous IO in
+    /// progress which will complete at some point in the future. Completion
+    /// of the call will invoke the registered callback.  It is important to
+    /// understand that the listener and its related objects should not be
+    /// deleted while there is an IO call pending.  This can result in the
+    /// IO service attempting to invoke methods on objects that are no longer
+    /// valid.
+    bool isIoPending() const {
+        return (io_pending_);
+    }
+
 private:
     /// @brief Sets the listening indicator to the given value.
     ///
@@ -280,9 +309,12 @@ private:
         listening_ = value;
     }
 
-    /// @brief Indicates if the listener is listening.
+    /// @brief Indicates if the listener is in listening mode.
     bool listening_;
 
+    /// @brief Indicates that listener has an async IO pending completion.
+    bool io_pending_;
+
     /// @brief Application level NCR receive completion handler.
     RequestReceiveHandler& recv_handler_;
 };
@@ -478,7 +510,6 @@ public:
     ///
     /// The given request is placed at the back of the send queue and then
     /// sendNext is invoked.
-
     ///
     /// @param ncr is the NameChangeRequest to send.
     ///
@@ -487,6 +518,7 @@ public:
     /// capacity.
     void sendRequest(NameChangeRequestPtr& ncr);
 
+protected:
     /// @brief Dequeues and sends the next request on the send queue.
     ///
     /// If there is already a send in progress just return. If there is not
@@ -523,27 +555,6 @@ public:
     /// @param result contains that send outcome status.
     void invokeSendHandler(const NameChangeSender::Result result);
 
-    /// @brief Removes the request at the front of the send queue
-    ///
-    /// This method can be used to avoid further retries of a failed
-    /// send. It is provided primarily as a just-in-case measure. Since
-    /// a failed send results in the same request being retried continuously
-    /// this method makes it possible to remove that entry, causing the
-    /// subsequent entry in the queue to be attempted on the next send.
-    /// It is presumed that sends will only fail due to some sort of
-    /// communications issue. In the unlikely event that a request is
-    /// somehow tainted and causes an send failure based on its content,
-    /// this method provides a means to remove th message.
-    void skipNext();
-
-    /// @brief Flushes all entries in the send queue
-    ///
-    /// This method can be used to discard all of the NCRs currently in the
-    /// the send queue.  Note it may not be called while the sender is in
-    /// the sending state.
-    /// @throw NcrSenderError if called and sender is in sending state.
-    void clearSendQueue();
-
     /// @brief Abstract method which opens the IO sink for transmission.
     ///
     /// The derivation uses this method to perform the steps needed to
@@ -576,6 +587,28 @@ public:
     /// throw it as an isc::Exception or derivative.
     virtual void doSend(NameChangeRequestPtr& ncr) = 0;
 
+public:
+    /// @brief Removes the request at the front of the send queue
+    ///
+    /// This method can be used to avoid further retries of a failed
+    /// send. It is provided primarily as a just-in-case measure. Since
+    /// a failed send results in the same request being retried continuously
+    /// this method makes it possible to remove that entry, causing the
+    /// subsequent entry in the queue to be attempted on the next send.
+    /// It is presumed that sends will only fail due to some sort of
+    /// communications issue. In the unlikely event that a request is
+    /// somehow tainted and causes an send failure based on its content,
+    /// this method provides a means to remove th message.
+    void skipNext();
+
+    /// @brief Flushes all entries in the send queue
+    ///
+    /// This method can be used to discard all of the NCRs currently in the
+    /// the send queue.  Note it may not be called while the sender is in
+    /// the sending state.
+    /// @throw NcrSenderError if called and sender is in sending state.
+    void clearSendQueue();
+
     /// @brief Returns true if the sender is in send mode, false otherwise.
     ///
     /// A true value indicates that the IO sink has been opened successfully,

+ 57 - 28
src/lib/dhcp_ddns/ncr_udp.cc

@@ -111,6 +111,7 @@ NameChangeUDPListener::open(isc::asiolink::IOService& io_service) {
         // Bind the low level socket to our endpoint.
         asio_socket_->bind(endpoint.getASIOEndpoint());
     } catch (asio::system_error& ex) {
+        asio_socket_.reset();
         isc_throw (NcrUDPError, ex.code().message());
     }
 
@@ -131,19 +132,27 @@ NameChangeUDPListener::doReceive() {
 void
 NameChangeUDPListener::close() {
     // Whether we think we are listening or not, make sure we aren't.
-    // Since we are managing our own socket, we need to cancel and close
-    // it ourselves.
+    // Since we are managing our own socket, we need to close it ourselves.
+    // NOTE that if there is a pending receive, it will be canceled, which
+    // WILL generate an invocation of the callback with error code of
+    // "operation aborted".
     if (asio_socket_) {
-        try {
-            asio_socket_->cancel();
-            asio_socket_->close();
-        } catch (asio::system_error& ex) {
-            // It is really unlikely that this will occur.
-            // If we do reopen later it will be with a new socket instance.
-            // Repackage exception as one that is conformant with the interface.
-            isc_throw (NcrUDPError, ex.code().message());
+        if (asio_socket_->is_open()) {
+            try {
+                asio_socket_->close();
+            } catch (asio::system_error& ex) {
+                // It is really unlikely that this will occur.
+                // If we do reopen later it will be with a new socket
+                // instance. Repackage exception as one that is conformant
+                // with the interface.
+                isc_throw (NcrUDPError, ex.code().message());
+            }
         }
+
+        asio_socket_.reset();
     }
+
+    socket_.reset();
 }
 
 void
@@ -164,14 +173,21 @@ NameChangeUDPListener::receiveCompletionHandler(const bool successful,
             LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_INVALID_NCR).arg(ex.what());
 
             // Queue up the next recieve.
-            doReceive();
+            // NOTE: We must call the base class, NEVER doReceive
+            receiveNext();
             return;
         }
     } else {
         asio::error_code error_code = callback->getErrorCode();
-        LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_NCR_UDP_RECV_ERROR)
-                  .arg(error_code.message());
-        result = ERROR;
+        if (error_code.value() == asio::error::operation_aborted) {
+            LOG_INFO(dhcp_ddns_logger, DHCP_DDNS_NCR_UDP_RECV_CANCELED)
+                     .arg(error_code.message());
+            result = STOPPED;
+        } else {
+            LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_NCR_UDP_RECV_ERROR)
+                      .arg(error_code.message());
+            result = ERROR;
+        }
     }
 
     // Call the application's registered request receive handler.
@@ -245,19 +261,27 @@ NameChangeUDPSender::open(isc::asiolink::IOService& io_service) {
 void
 NameChangeUDPSender::close() {
     // Whether we think we are sending or not, make sure we aren't.
-    // Since we are managing our own socket, we need to cancel and close
-    // it ourselves.
+    // Since we are managing our own socket, we need to close it ourselves.
+    // NOTE that if there is a pending send, it will be canceled, which
+    // WILL generate an invocation of the callback with error code of
+    // "operation aborted".
     if (asio_socket_) {
-        try {
-            asio_socket_->cancel();
-            asio_socket_->close();
-        } catch (asio::system_error& ex) {
-            // It is really unlikely that this will occur.
-            // If we do reopen later it will be with a new socket instance.
-            // Repackage exception as one that is conformant with the interface.
-            isc_throw (NcrUDPError, ex.code().message());
+        if (asio_socket_->is_open()) {
+            try {
+                asio_socket_->close();
+            } catch (asio::system_error& ex) {
+                // It is really unlikely that this will occur.
+                // If we do reopen later it will be with a new socket
+                // instance. Repackage exception as one that is conformant
+                // with the interface.
+                isc_throw (NcrUDPError, ex.code().message());
+            }
         }
+
+        asio_socket_.reset();
     }
+
+    socket_.reset();
 }
 
 void
@@ -286,10 +310,15 @@ NameChangeUDPSender::sendCompletionHandler(const bool successful,
     else {
         // On a failure, log the error and set the result to ERROR.
         asio::error_code error_code = send_callback->getErrorCode();
-        LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_NCR_UDP_RECV_ERROR)
-                  .arg(error_code.message());
-
-        result = ERROR;
+        if (error_code.value() == asio::error::operation_aborted) {
+            LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_NCR_UDP_SEND_CANCELED)
+                      .arg(error_code.message());
+            result = STOPPED;
+        } else {
+            LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_NCR_UDP_SEND_ERROR)
+                      .arg(error_code.message());
+            result = ERROR;
+        }
     }
 
     // Call the application's registered request send handler.

+ 21 - 1
src/lib/dhcp_ddns/tests/ncr_udp_unittests.cc

@@ -113,7 +113,10 @@ TEST(NameChangeUDPListenerBasicTest, basicListenTests) {
 
     // Verify that we can start listening.
     EXPECT_NO_THROW(listener->startListening(io_service));
+    // Verify that we are in listening mode.
     EXPECT_TRUE(listener->amListening());
+    // Verify that a read is in progress.
+    EXPECT_TRUE(listener->isIoPending());
 
     // Verify that attempting to listen when we already are is an error.
     EXPECT_THROW(listener->startListening(io_service), NcrListenerError);
@@ -122,6 +125,14 @@ TEST(NameChangeUDPListenerBasicTest, basicListenTests) {
     EXPECT_NO_THROW(listener->stopListening());
     EXPECT_FALSE(listener->amListening());
 
+    // Verify that IO pending is still true, as IO cancel event has not yet
+    // occurred.
+    EXPECT_TRUE(listener->isIoPending());
+
+    // Verify that IO pending is false, after cancel event occurs.
+    EXPECT_NO_THROW(io_service.run_one());
+    EXPECT_FALSE(listener->isIoPending());
+
     // Verify that attempting to stop listening when we are not is ok.
     EXPECT_NO_THROW(listener->stopListening());
 
@@ -167,7 +178,7 @@ public:
 
     virtual ~NameChangeUDPListenerTest(){
     }
-    
+
 
     /// @brief Converts JSON string into an NCR and sends it to the listener.
     ///
@@ -226,6 +237,7 @@ TEST_F(NameChangeUDPListenerTest, basicReceivetest) {
     ASSERT_FALSE(listener_->amListening());
     ASSERT_NO_THROW(listener_->startListening(io_service_));
     ASSERT_TRUE(listener_->amListening());
+    ASSERT_TRUE(listener_->isIoPending());
 
     // Iterate over a series of requests, sending and receiving one
     /// at time.
@@ -247,6 +259,10 @@ TEST_F(NameChangeUDPListenerTest, basicReceivetest) {
     // Verify we can gracefully stop listening.
     EXPECT_NO_THROW(listener_->stopListening());
     EXPECT_FALSE(listener_->amListening());
+
+    // Verify that IO pending is false, after cancel event occurs.
+    EXPECT_NO_THROW(io_service_.run_one());
+    EXPECT_FALSE(listener_->isIoPending());
 }
 
 /// @brief A NOP derivation for constructor test purposes.
@@ -490,6 +506,10 @@ TEST_F (NameChangeUDPTest, roundTripTest) {
     EXPECT_NO_THROW(listener_->stopListening());
     EXPECT_FALSE(listener_->amListening());
 
+    // Verify that IO pending is false, after cancel event occurs.
+    EXPECT_NO_THROW(io_service_.run_one());
+    EXPECT_FALSE(listener_->isIoPending());
+
     // Verify that we can gracefully stop sending.
     EXPECT_NO_THROW(sender_->stopSending());
     EXPECT_FALSE(sender_->amSending());