Browse Source

[master] Merge branch 'trac3052'

Adds initial implementation of D2QueueMgr class to DHCP_DDNS.
This class is manages the queue of inbound DHCP_DDNS requests.
Thomas Markwalder 11 years ago
parent
commit
a970f6c525

+ 1 - 0
src/bin/d2/Makefile.am

@@ -54,6 +54,7 @@ b10_dhcp_ddns_SOURCES += d2_controller.cc d2_controller.h
 b10_dhcp_ddns_SOURCES += d_cfg_mgr.cc d_cfg_mgr.h
 b10_dhcp_ddns_SOURCES += d2_config.cc d2_config.h
 b10_dhcp_ddns_SOURCES += d2_cfg_mgr.cc d2_cfg_mgr.h
+b10_dhcp_ddns_SOURCES += d2_queue_mgr.cc d2_queue_mgr.h
 b10_dhcp_ddns_SOURCES += d2_update_message.cc d2_update_message.h
 b10_dhcp_ddns_SOURCES += d2_zone.cc d2_zone.h
 b10_dhcp_ddns_SOURCES += dns_client.cc dns_client.h

+ 13 - 0
src/bin/d2/d2_messages.mes

@@ -138,6 +138,19 @@ request for the FQDN cannot be processed.
 This is a debug message issued when the Dhcp-Ddns application enters
 its init method.
 
+% DHCP_DDNS_QUEUE_MGR_QUEUE_FULL application request queue has reached maximum number of entries: %1
+This an error message indicating that DHCP-DDNS is receiving DNS update
+requests faster than they can be processed.  This may mean the maximum queue
+needs to be increased, the DHCP-DDNS clients are simply generating too many
+requests too quickly, or perhaps upstream DNS servers are experiencing
+load issues.
+
+% DHCP_DDNS_QUEUE_MGR_RECV_ERROR application's queue manager was notified of a request receive error by its listener.
+This is an error message indicating that the NameChangeRequest listener used by
+DHCP-DDNS to receive requests encountered a IO error.  There should be
+corresponding log messages from the listener layer with more details. This may
+indicate a network connectivity or system resource issue.
+
 % DHCP_DDNS_RUN_ENTER application has entered the event loop
 This is a debug message issued when the Dhcp-Ddns application enters
 its run method.

+ 217 - 0
src/bin/d2/d2_queue_mgr.cc

@@ -0,0 +1,217 @@
+// Copyright (C) 2013 Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#include <d2/d2_log.h>
+#include <d2/d2_queue_mgr.h>
+#include <dhcp_ddns/ncr_udp.h>
+
+namespace isc {
+namespace d2 {
+
+// Makes constant visible to Google test macros.
+const size_t D2QueueMgr::MAX_QUEUE_DEFAULT;
+
+D2QueueMgr::D2QueueMgr(isc::asiolink::IOService& io_service,
+                       const size_t max_queue_size)
+    : io_service_(io_service), max_queue_size_(max_queue_size),
+      mgr_state_(NOT_INITTED) {
+    // Use setter to do validation.
+    setMaxQueueSize(max_queue_size);
+}
+
+D2QueueMgr::~D2QueueMgr() {
+    // clean up
+    try {
+        stopListening();
+    } catch (...) {
+        // This catch is strictly for safety's sake, in case a future
+        // implementation isn't tidy or careful. 
+    }
+}
+
+void
+D2QueueMgr::operator()(const dhcp_ddns::NameChangeListener::Result result,
+                       dhcp_ddns::NameChangeRequestPtr& ncr) {
+    // Note that error conditions must be handled here without throwing
+    // exceptions. Remember this is the application level "link" in the
+    // callback chain.  Throwing an exception here will "break" the
+    // io_service "run" we are operating under.  With that in mind,
+    // if we hit a problem, we will stop the listener transition to
+    // the appropriate stopped state.  Upper layer(s) must monitor our
+    // state as well as our queue size.
+
+    // If the receive was successful, attempt to queue the request.
+    if (result == dhcp_ddns::NameChangeListener::SUCCESS) {
+        if (getQueueSize() < getMaxQueueSize()) {
+            // There's room on the queue, add to the end
+            enqueue(ncr);
+            return;
+        }
+
+        // Queue is full, stop the listener.
+        stopListening(STOPPED_QUEUE_FULL);
+        LOG_ERROR(dctl_logger, DHCP_DDNS_QUEUE_MGR_QUEUE_FULL)
+                  .arg(max_queue_size_);
+    } else {
+        // Receive failed, stop the listener.
+        stopListening(STOPPED_RECV_ERROR);
+        LOG_ERROR(dctl_logger, DHCP_DDNS_QUEUE_MGR_RECV_ERROR);
+    }
+}
+
+void
+D2QueueMgr::initUDPListener(const isc::asiolink::IOAddress& ip_address,
+                            const uint32_t port,
+                            const dhcp_ddns::NameChangeFormat format,
+                            const bool reuse_address) {
+
+    if (listener_) {
+        isc_throw(D2QueueMgrError,
+                  "D2QueueMgr listener is already initialized");
+    }
+
+    // Instantiate a UDP listener and set state to INITTED.
+    // Note UDP listener constructor does not throw.
+    listener_.reset(new dhcp_ddns::
+                    NameChangeUDPListener(ip_address, port, format, *this,
+                                          reuse_address));
+    mgr_state_ = INITTED;
+}
+
+void
+D2QueueMgr::startListening() {
+    // We can't listen if we haven't initialized the listener yet.
+    if (!listener_) {
+        isc_throw(D2QueueMgrError, "D2QueueMgr "
+                  "listener is not initialized, cannot start listening");
+    }
+
+    // If we are already listening, we do not want to "reopen" the listener
+    // and really we shouldn't be trying.
+    if (mgr_state_ == RUNNING) {
+        isc_throw(D2QueueMgrError, "D2QueueMgr "
+                  "cannot call startListening from the RUNNING state");
+    }
+
+    // Instruct the listener to start listening and set state accordingly.
+    try {
+        listener_->startListening(io_service_);
+        mgr_state_ = RUNNING;
+    } catch (const isc::Exception& ex) {
+        isc_throw(D2QueueMgrError, "D2QueueMgr listener start failed: "
+                  << ex.what());
+    }
+}
+
+void
+D2QueueMgr::stopListening(const State stop_state) {
+    // Note, stopListening is guaranteed not to throw.
+    if (listener_) {
+        listener_->stopListening();
+    }
+
+    // Enforce only valid "stop" states.
+    if (stop_state != STOPPED && stop_state != STOPPED_QUEUE_FULL &&
+        stop_state != STOPPED_RECV_ERROR) {
+        // This is purely a programmatic error and should never happen.
+        isc_throw(D2QueueMgrError, "D2QueueMgr invalid value for stop state: "
+                  << stop_state);
+    }
+
+    mgr_state_ = stop_state;
+}
+
+void
+D2QueueMgr::removeListener() {
+    // Force our managing layer(s) to stop us properly first.
+    if (mgr_state_ == RUNNING) {
+        isc_throw(D2QueueMgrError,
+                  "D2QueueMgr cannot delete listener while state is RUNNING");
+    }
+
+    listener_.reset();
+    mgr_state_ = NOT_INITTED;
+}
+
+const dhcp_ddns::NameChangeRequestPtr&
+D2QueueMgr::peek() const {
+    if (getQueueSize() ==  0) {
+        isc_throw(D2QueueMgrQueueEmpty,
+                  "D2QueueMgr peek attempted on an empty queue");
+    }
+
+    return (ncr_queue_.front());
+}
+
+const dhcp_ddns::NameChangeRequestPtr&
+D2QueueMgr::peekAt(const size_t index) const {
+    if (index >= getQueueSize()) {
+        isc_throw(D2QueueMgrInvalidIndex,
+                  "D2QueueMgr peek beyond end of queue attempted"
+                  << " index: " << index << " queue size: " << getQueueSize());
+    }
+
+    return (ncr_queue_.at(index));
+}
+
+void
+D2QueueMgr::dequeueAt(const size_t index) {
+    if (index >= getQueueSize()) {
+        isc_throw(D2QueueMgrInvalidIndex,
+                  "D2QueueMgr dequeue beyond end of queue attempted"
+                  << " index: " << index << " queue size: " << getQueueSize());
+    }
+
+    RequestQueue::iterator pos = ncr_queue_.begin() + index;
+    ncr_queue_.erase(pos);
+}
+
+
+void
+D2QueueMgr::dequeue() {
+    if (getQueueSize() ==  0) {
+        isc_throw(D2QueueMgrQueueEmpty,
+                  "D2QueueMgr dequeue attempted on an empty queue");
+    }
+
+    ncr_queue_.pop_front();
+}
+
+void
+D2QueueMgr::enqueue(dhcp_ddns::NameChangeRequestPtr& ncr) {
+    ncr_queue_.push_back(ncr);
+}
+
+void
+D2QueueMgr::clearQueue() {
+    ncr_queue_.clear();
+}
+
+void
+D2QueueMgr::setMaxQueueSize(const size_t new_queue_max) {
+    if (new_queue_max < 1) {
+        isc_throw(D2QueueMgrError,
+                  "D2QueueMgr maximum queue size must be greater than zero");
+    }
+
+    if (new_queue_max < getQueueSize()) {
+        isc_throw(D2QueueMgrError, "D2QueueMgr maximum queue size value cannot"
+                  " be less than the current queue size :" << getQueueSize());
+    }
+
+    max_queue_size_ = new_queue_max;
+}
+
+} // namespace isc::d2
+} // namespace isc

+ 335 - 0
src/bin/d2/d2_queue_mgr.h

@@ -0,0 +1,335 @@
+// Copyright (C) 2013 Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#ifndef D2_QUEUE_MGR_H
+#define D2_QUEUE_MGR_H
+
+/// @file d2_queue_mgr.h This file defines the class D2QueueMgr.
+
+#include <asiolink/io_address.h>
+#include <asiolink/io_service.h>
+#include <exceptions/exceptions.h>
+#include <dhcp_ddns/ncr_msg.h>
+#include <dhcp_ddns/ncr_io.h>
+
+#include <boost/noncopyable.hpp>
+#include <deque>
+
+namespace isc {
+namespace d2 {
+
+/// @brief Defines a queue of requests.
+/// @todo This may be replaced with an actual class in the future.
+typedef std::deque<dhcp_ddns::NameChangeRequestPtr> RequestQueue;
+
+/// @brief Thrown if the queue manager encounters a general error.
+class D2QueueMgrError : public isc::Exception {
+public:
+    D2QueueMgrError(const char* file, size_t line, const char* what) :
+        isc::Exception(file, line, what) { };
+};
+
+/// @brief Thrown if the queue manager's receive handler is passed
+/// a failure result.
+class D2QueueMgrReceiveError : public isc::Exception {
+public:
+    D2QueueMgrReceiveError(const char* file, size_t line, const char* what) :
+        isc::Exception(file, line, what) { };
+};
+
+
+/// @brief Thrown if the request queue is full when an enqueue is attempted.
+class D2QueueMgrQueueFull : public isc::Exception {
+public:
+    D2QueueMgrQueueFull(const char* file, size_t line, const char* what) :
+        isc::Exception(file, line, what) { };
+};
+
+/// @brief Thrown if the request queue empty and a read is attempted.
+class D2QueueMgrQueueEmpty : public isc::Exception {
+public:
+    D2QueueMgrQueueEmpty(const char* file, size_t line, const char* what) :
+        isc::Exception(file, line, what) { };
+};
+
+/// @brief Thrown if a queue index is beyond the end of the queue
+class D2QueueMgrInvalidIndex : public isc::Exception {
+public:
+    D2QueueMgrInvalidIndex(const char* file, size_t line, const char* what) :
+        isc::Exception(file, line, what) { };
+};
+
+
+/// @brief D2QueueMgr creates and manages a queue of DNS update requests.
+///
+/// D2QueueMgr is class specifically designed as an integral part of DHCP-DDNS.
+/// Its primary responsibility is to listen for NameChangeRequests from
+/// DHCP-DDNS clients (e.g. DHCP servers) and queue them for processing. In
+/// addition it may provide a number services to locate entries in the queue
+/// such as by FQDN or DHCID.  These services may eventually be used
+/// for processing optimization.  The initial implementation will support
+/// simple FIFO access.
+///
+/// D2QueueMgr uses a NameChangeListener to asynchronously receive requests.
+/// It derives from NameChangeListener::RequestReceiveHandler and supplies an
+/// implementation of the operator()(Result, NameChangeRequestPtr).  It is
+/// through this operator() that D2QueueMgr is passed inbound NCRs. D2QueueMgr
+/// will add each newly received request onto the back of the request queue
+///
+/// D2QueueMgr defines a simple state model constructed around the status of
+/// its NameChangeListener, consisting of the following states:
+///
+///     * NOT_INITTED - D2QueueMgr has been constructed, but its listener has
+///     not been initialized.
+///
+///     * INITTED - The listener has been initialized, but it is not open for
+///     listening.   To move from NOT_INITTED to INITTED, one of the D2QueueMgr
+///     listener initialization methods must be invoked.  Currently there is
+///     only one type of listener, NameChangeUDPListener, hence there is only
+///     one listener initialization method, initUDPListener.  As more listener
+///     types are created, listener initialization methods will need to be
+///     added.
+///
+///     * RUNNING - The listener is open and listening for requests.
+///     Once initialized, in order to begin listening for requests, the
+///     startListener() method must be invoked.  Upon successful completion of
+///     of this call, D2QueueMgr will begin receiving requests as they arrive
+///     without any further steps.   This method may be called from the INITTED
+///     or one of the STOPPED states.
+///
+///     * STOPPED - The listener has been listening but has been stopped
+///     without error. To return to listening, startListener() must be invoked.
+///
+///     * STOPPED_QUEUE_FULL - Request queue is full, the listener has been
+///     stopped.  D2QueueMgr will enter this state when the request queue
+///     reaches the maximum queue size.  Once this limit is reached, the
+///     listener will be closed and no further requests will be received.
+///     To return to listening, startListener() must be invoked.  Note that so
+///     long as the queue is full, any attempt to queue a request will fail.
+///
+///     * STOPPED_RECV_ERROR - The listener has experienced a receive error
+///     and has been stopped.  D2QueueMgr will enter this state when it is
+///     passed a failed status into the request completion handler.  To return
+///     to listening, startListener() must be invoked.
+///
+/// D2QueueMgr does not attempt to recover from stopped conditions, this is left
+/// to upper layers.
+///
+/// It is important to note that the queue contents are preserved between
+/// state transitions.  In other words entries in the queue remain there
+/// until they are removed explicitly via the deque() or implicitly by
+/// via the clearQueue() method.
+///
+class D2QueueMgr : public dhcp_ddns::NameChangeListener::RequestReceiveHandler,
+                   boost::noncopyable {
+public:
+    /// @brief Maximum number of entries allowed in the request queue.
+    /// NOTE that 1024 is an arbitrary choice picked for the initial
+    /// implementation.
+    static const size_t MAX_QUEUE_DEFAULT = 1024;
+
+    /// @brief Defines the list of possible states for D2QueueMgr.
+    enum State {
+      NOT_INITTED,
+      INITTED,
+      RUNNING,
+      STOPPED_QUEUE_FULL,
+      STOPPED_RECV_ERROR,
+      STOPPED,
+    };
+
+    /// @brief Constructor
+    ///
+    /// Creates a D2QueueMgr instance.  Note that the listener is not created
+    /// in the constructor. The initial state will be NOT_INITTED.
+    ///
+    /// @param io_service IOService instance to be passed into the listener for
+    /// IO management.
+    /// @param max_queue_size the maximum number of entries allowed in the
+    /// queue.
+    /// This value must be greater than zero. It defaults to MAX_QUEUE_DEFAULT.
+    ///
+    /// @throw D2QueueMgrError if max_queue_size is zero.
+    D2QueueMgr(isc::asiolink::IOService& io_service,
+               const size_t max_queue_size = MAX_QUEUE_DEFAULT);
+
+    /// @brief Destructor
+    virtual ~D2QueueMgr();
+
+    /// @brief Initializes the listener as a UDP listener.
+    ///
+    /// Instantiates the listener_ member as NameChangeUDPListener passing
+    /// the given parameters.  Upon successful completion, the D2QueueMgr state
+    /// will be INITTED.
+    ///
+    /// @param ip_address is the network address on which to listen
+    /// @param port is the IP port on which to listen
+    /// @param format is the wire format of the inbound requests.
+    /// @param reuse_address enables IP address sharing when true
+    /// It defaults to false.
+    void initUDPListener(const isc::asiolink::IOAddress& ip_address,
+                         const uint32_t port,
+                         const dhcp_ddns::NameChangeFormat format,
+                         const bool reuse_address = false);
+
+    /// @brief Starts actively listening for requests.
+    ///
+    /// Invokes the listener's startListening method passing in our
+    /// IOService instance.
+    ///
+    /// @throw D2QueueMgrError if the listener has not been initialized,
+    /// state is already RUNNING, or the listener fails to actually start.
+    void startListening();
+
+    /// @brief Function operator implementing the NCR receive callback.
+    ///
+    /// This method is invoked by the listener as part of its receive
+    /// completion callback and is how the inbound NameChangeRequests are
+    /// passed up to the D2QueueMgr for queueing.
+    /// If the given result indicates a successful receive completion and
+    /// there is room left in the queue, the given request is queued.
+    ///
+    /// If the queue is at maximum capacity, stopListening() is invoked and
+    /// the state is set to STOPPED_QUEUE_FULL.
+    ///
+    /// If the result indicates a failed receive, stopListening() is invoked
+    /// and the state is set to STOPPED_RECV_ERROR.
+    ///
+    /// This method specifically avoids throwing on an error as any such throw
+    /// would surface at the io_service::run (or run variant) method invocation
+    /// site. The upper layers are expected to monitor D2QueueMgr's state and
+    /// act accordingly.
+    ///
+    /// @param result contains that receive outcome status.
+    /// @param ncr is a pointer to the newly received NameChangeRequest if
+    /// result is NameChangeListener::SUCCESS.  It is indeterminate other
+    /// wise.
+    virtual void operator ()(const dhcp_ddns::NameChangeListener::Result result,
+                             dhcp_ddns::NameChangeRequestPtr& ncr);
+
+    /// @brief Stops listening for requests.
+    ///
+    /// Invokes the listener's stopListening method which should cause it to
+    /// cancel any pending IO and close its IO source.  It the sets the state
+    /// to the given value.
+    ///
+    /// @param stop_state is one of the three stopped state values.
+    ///
+    /// @throw D2QueueMgrError if stop_state is a valid stop state.
+    void stopListening(const State stop_state = STOPPED);
+
+    /// @brief Deletes the current listener
+    ///
+    /// This method will delete the current listener and returns the manager
+    /// to the NOT_INITTED state.  This is provided to support reconfiguring
+    /// a new listener without losing queued requests.
+    ///
+    /// @throw D2QueMgrError if called when the manager state is RUNNING.
+    void removeListener();
+
+    /// @brief Returns the number of entries in the queue.
+    size_t getQueueSize() const {
+        return (ncr_queue_.size());
+    };
+
+    /// @brief Returns the maximum number of entries allowed in the queue.
+    size_t getMaxQueueSize() const {
+        return (max_queue_size_);
+    }
+
+    /// @brief Sets the maximum number of entries allowed in the queue.
+    ///
+    /// @param max_queue_size is the new maximum size of the queue.
+    ///
+    /// @throw D2QueueMgrError if the new value is less than one or if
+    /// the new value is less than the number of entries currently in the
+    /// queue.
+    void setMaxQueueSize(const size_t max_queue_size);
+
+    /// @brief Returns the current state.
+    State getMgrState() const {
+        return (mgr_state_);
+    }
+
+    /// @brief Returns the entry at the front of the queue.
+    ///
+    /// The entry returned is next in line to be processed, assuming a FIFO
+    /// approach to task selection.  Note, the entry is not removed from the
+    /// queue.
+    ///
+    /// @return Pointer reference to the queue entry.
+    ///
+    /// @throw D2QueueMgrQueEmpty if there are no entries in the queue.
+    const dhcp_ddns::NameChangeRequestPtr& peek() const;
+
+    /// @brief Returns the entry at a given position in the queue.
+    ///
+    /// Note that the entry is not removed from the queue.
+    /// @param index the index of the entry in the queue to fetch.
+    /// Valid values are 0 (front of the queue) to (queue size - 1).
+    ///
+    /// @return Pointer reference to the queue entry.
+    ///
+    /// @throw D2QueueMgrInvalidIndex if the given index is beyond the
+    /// end of the queue.
+    const dhcp_ddns::NameChangeRequestPtr& peekAt(const size_t index) const;
+
+    /// @brief Removes the entry at a given position in the queue.
+    ///
+    /// @param index the index of the entry in the queue to remove.
+    /// Valid values are 0 (front of the queue) to (queue size - 1).
+    ///
+    /// @throw D2QueueMgrInvalidIndex if the given index is beyond the
+    /// end of the queue.
+    void dequeueAt(const size_t index);
+
+    /// @brief Removes the entry at the front of the queue.
+    ///
+    /// @throw D2QueueMgrQueEmpty if there are no entries in the queue.
+    void dequeue();
+
+    /// @brief Adds a request to the end of the queue.
+    ///
+    /// @param ncr pointer to the NameChangeRequest to add to the queue.
+    void enqueue(dhcp_ddns::NameChangeRequestPtr& ncr);
+
+    /// @brief Removes all entries from the queue.
+    void clearQueue();
+
+  private:
+    /// @brief IOService that our listener should use for IO management.
+    isc::asiolink::IOService& io_service_;
+
+    /// @brief Dictates the maximum number of entries allowed in the queue.
+    size_t max_queue_size_;
+
+    /// @brief Queue of received NameChangeRequests.
+    RequestQueue ncr_queue_;
+
+    /// @brief Listener instance from which requests are received.
+    boost::shared_ptr<dhcp_ddns::NameChangeListener> listener_;
+
+    /// @brief Current state of the manager.
+    State mgr_state_;
+
+
+};
+
+/// @brief Defines a pointer for manager instances.
+typedef boost::shared_ptr<D2QueueMgr> D2QueueMgrPtr;
+
+} // namespace isc::d2
+} // namespace isc
+
+#endif

+ 2 - 0
src/bin/d2/tests/Makefile.am

@@ -59,6 +59,7 @@ d2_unittests_SOURCES += ../d2_controller.cc ../d2_controller.h
 d2_unittests_SOURCES += ../d_cfg_mgr.cc ../d_cfg_mgr.h
 d2_unittests_SOURCES += ../d2_config.cc ../d2_config.h
 d2_unittests_SOURCES += ../d2_cfg_mgr.cc ../d2_cfg_mgr.h
+d2_unittests_SOURCES += ../d2_queue_mgr.cc ../d2_queue_mgr.h
 d2_unittests_SOURCES += ../d2_update_message.cc ../d2_update_message.h
 d2_unittests_SOURCES += ../d2_zone.cc ../d2_zone.h
 d2_unittests_SOURCES += ../dns_client.cc ../dns_client.h
@@ -69,6 +70,7 @@ d2_unittests_SOURCES += d_controller_unittests.cc
 d2_unittests_SOURCES += d2_controller_unittests.cc
 d2_unittests_SOURCES += d_cfg_mgr_unittests.cc
 d2_unittests_SOURCES += d2_cfg_mgr_unittests.cc
+d2_unittests_SOURCES += d2_queue_mgr_unittests.cc
 d2_unittests_SOURCES += d2_update_message_unittests.cc
 d2_unittests_SOURCES += d2_zone_unittests.cc
 d2_unittests_SOURCES += dns_client_unittests.cc

+ 430 - 0
src/bin/d2/tests/d2_queue_mgr_unittests.cc

@@ -0,0 +1,430 @@
+// Copyright (C) 2013  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#include <asiolink/interval_timer.h>
+#include <d2/d2_queue_mgr.h>
+#include <dhcp_ddns/ncr_udp.h>
+#include <util/time_utilities.h>
+
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
+#include <gtest/gtest.h>
+#include <gtest/gtest.h>
+#include <algorithm>
+#include <vector>
+
+using namespace std;
+using namespace isc;
+using namespace isc::dhcp_ddns;
+using namespace isc::d2;
+
+namespace {
+
+/// @brief Defines a list of valid JSON NameChangeRequest test messages.
+const char *valid_msgs[] =
+{
+    // Valid Add.
+     "{"
+     " \"change_type\" : 0 , "
+     " \"forward_change\" : true , "
+     " \"reverse_change\" : false , "
+     " \"fqdn\" : \"walah.walah.com\" , "
+     " \"ip_address\" : \"192.168.2.1\" , "
+     " \"dhcid\" : \"010203040A7F8E3D\" , "
+     " \"lease_expires_on\" : \"20130121132405\" , "
+     " \"lease_length\" : 1300 "
+     "}",
+    // Valid Remove.
+     "{"
+     " \"change_type\" : 1 , "
+     " \"forward_change\" : true , "
+     " \"reverse_change\" : false , "
+     " \"fqdn\" : \"walah.walah.com\" , "
+     " \"ip_address\" : \"192.168.2.1\" , "
+     " \"dhcid\" : \"010203040A7F8E3D\" , "
+     " \"lease_expires_on\" : \"20130121132405\" , "
+     " \"lease_length\" : 1300 "
+     "}",
+     // Valid Add with IPv6 address
+     "{"
+     " \"change_type\" : 0 , "
+     " \"forward_change\" : true , "
+     " \"reverse_change\" : false , "
+     " \"fqdn\" : \"walah.walah.com\" , "
+     " \"ip_address\" : \"fe80::2acf:e9ff:fe12:e56f\" , "
+     " \"dhcid\" : \"010203040A7F8E3D\" , "
+     " \"lease_expires_on\" : \"20130121132405\" , "
+     " \"lease_length\" : 1300 "
+     "}"
+};
+
+static const  int VALID_MSG_CNT = sizeof(valid_msgs)/sizeof(char*);
+
+const char* TEST_ADDRESS = "127.0.0.1";
+const uint32_t LISTENER_PORT = 5301;
+const uint32_t SENDER_PORT = LISTENER_PORT+1;
+const long TEST_TIMEOUT = 5 * 1000;
+
+/// @brief Tests that construction with max queue size of zero is not allowed.
+TEST(D2QueueMgrBasicTest, construction1) {
+    isc::asiolink::IOService io_service;
+
+    // Verify that constructing with max queue size of zero is not allowed.
+    EXPECT_THROW(D2QueueMgr(io_service, 0), D2QueueMgrError);
+}
+
+/// @brief Tests default construction works.
+TEST(D2QueueMgrBasicTest, construction2) {
+    isc::asiolink::IOService io_service;
+
+    // Verify that valid constructor works.
+    D2QueueMgrPtr queue_mgr;
+    ASSERT_NO_THROW(queue_mgr.reset(new D2QueueMgr(io_service)));
+    // Verify queue max is defaulted correctly.
+    EXPECT_EQ(D2QueueMgr::MAX_QUEUE_DEFAULT, queue_mgr->getMaxQueueSize());
+}
+
+/// @brief Tests construction with custom queue size works properly
+TEST(D2QueueMgrBasicTest, construction3) {
+    isc::asiolink::IOService io_service;
+
+    // Verify that custom queue size constructor works.
+    D2QueueMgrPtr queue_mgr;
+    ASSERT_NO_THROW(queue_mgr.reset(new D2QueueMgr(io_service, 100)));
+    // Verify queue max is the custom value.
+    EXPECT_EQ(100, queue_mgr->getMaxQueueSize());
+}
+
+/// @brief Tests  QueueMgr's basic queue functions
+/// This test verifies that:
+/// 1. Following construction queue is empty
+/// 2. Attempting to peek at an empty queue is not allowed
+/// 3. Attempting to dequeue an empty queue is not allowed
+/// 4. Peek returns the first entry on the queue without altering queue content
+/// 5. Dequeue removes the first entry on the queue
+TEST(D2QueueMgrBasicTest, basicQueue) {
+    isc::asiolink::IOService io_service;
+
+    // Construct the manager with max queue size set to number of messages
+    // we'll use.
+    D2QueueMgrPtr queue_mgr;
+    ASSERT_NO_THROW(queue_mgr.reset(new D2QueueMgr(io_service, VALID_MSG_CNT)));
+    ASSERT_EQ(VALID_MSG_CNT, queue_mgr->getMaxQueueSize());
+
+    // Verify queue is empty after construction.
+    EXPECT_EQ(0, queue_mgr->getQueueSize());
+
+    // Verify that peek and dequeue both throw when queue is empty.
+    EXPECT_THROW(queue_mgr->peek(), D2QueueMgrQueueEmpty);
+    EXPECT_THROW(queue_mgr->dequeue(), D2QueueMgrQueueEmpty);
+
+    // Vector to keep track of the NCRs we que.
+    std::vector<NameChangeRequestPtr>ref_msgs;
+    NameChangeRequestPtr ncr;
+
+    // Iterate over the list of requests and add each to the queue.
+    for (int i = 0; i < VALID_MSG_CNT; i++) {
+        // Create the ncr and add to our reference list.
+        ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[i]));
+        ref_msgs.push_back(ncr);
+
+        // Verify that the request can be added to the queue and queue
+        // size increments accordingly.
+        EXPECT_NO_THROW(queue_mgr->enqueue(ncr));
+        EXPECT_EQ(i+1, queue_mgr->getQueueSize());
+    }
+
+    // Loop through and verify that the queue contents match the
+    // reference list.
+    for (int i = 0; i < VALID_MSG_CNT; i++) {
+        // Verify that peek on a non-empty queue returns first entry
+        // without altering queue content.
+        EXPECT_NO_THROW(ncr = queue_mgr->peek());
+
+        // Verify the peeked entry is the one it should be.
+        ASSERT_TRUE(ncr);
+        EXPECT_TRUE (*(ref_msgs[i]) == *ncr);
+
+        // Verify that peek did not alter the queue size.
+        EXPECT_EQ(VALID_MSG_CNT - i, queue_mgr->getQueueSize());
+
+        // Verify the dequeueing from non-empty queue works
+        EXPECT_NO_THROW(queue_mgr->dequeue());
+
+        // Verify queue size decrements following dequeue.
+        EXPECT_EQ(VALID_MSG_CNT - (i + 1), queue_mgr->getQueueSize());
+    }
+
+    // Iterate over the list of requests and add each to the queue.
+    for (int i = 0; i < VALID_MSG_CNT; i++) {
+        // Create the ncr and add to our reference list.
+        ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[i]));
+        ref_msgs.push_back(ncr);
+        EXPECT_NO_THROW(queue_mgr->enqueue(ncr));
+    }
+
+    // Verify queue count is correct.
+    EXPECT_EQ(VALID_MSG_CNT, queue_mgr->getQueueSize());
+
+    // Verfiy that peekAt returns the correct entry.
+    EXPECT_NO_THROW(ncr = queue_mgr->peekAt(1));
+    EXPECT_TRUE (*(ref_msgs[1]) == *ncr);
+
+    // Verfiy that dequeueAt removes the correct entry.
+    // Removing it, this should shift the queued entries forward by one.
+    EXPECT_NO_THROW(queue_mgr->dequeueAt(1));
+    EXPECT_NO_THROW(ncr = queue_mgr->peekAt(1));
+    EXPECT_TRUE (*(ref_msgs[2]) == *ncr);
+
+    // Verify the peekAt and dequeueAt throw when given indexes beyond the end.
+    EXPECT_THROW(queue_mgr->peekAt(VALID_MSG_CNT + 1), D2QueueMgrInvalidIndex);
+    EXPECT_THROW(queue_mgr->dequeueAt(VALID_MSG_CNT + 1),
+                 D2QueueMgrInvalidIndex);
+}
+
+/// @brief Compares two NameChangeRequests for equality.
+bool checkSendVsReceived(NameChangeRequestPtr sent_ncr,
+                         NameChangeRequestPtr received_ncr) {
+    return ((sent_ncr && received_ncr) &&
+        (*sent_ncr == *received_ncr));
+}
+
+/// @brief Text fixture that allows testing a listener and sender together
+/// It derives from both the receive and send handler classes and contains
+/// and instance of UDP listener and UDP sender.
+class QueueMgrUDPTest : public virtual ::testing::Test,
+                        NameChangeSender::RequestSendHandler {
+public:
+    isc::asiolink::IOService io_service_;
+    NameChangeSenderPtr   sender_;
+    isc::asiolink::IntervalTimer test_timer_;
+    D2QueueMgrPtr queue_mgr_;
+
+    NameChangeSender::Result send_result_;
+    std::vector<NameChangeRequestPtr> sent_ncrs_;
+    std::vector<NameChangeRequestPtr> received_ncrs_;
+
+    QueueMgrUDPTest() : io_service_(), test_timer_(io_service_) {
+        isc::asiolink::IOAddress addr(TEST_ADDRESS);
+        // Create our sender instance. Note that reuse_address is true.
+        sender_.reset(new NameChangeUDPSender(addr, SENDER_PORT,
+                                              addr, LISTENER_PORT,
+                                              FMT_JSON, *this, 100, true));
+
+        // Set the test timeout to break any running tasks if they hang.
+        test_timer_.setup(boost::bind(&QueueMgrUDPTest::testTimeoutHandler,
+                                      this),
+                          TEST_TIMEOUT);
+    }
+
+    void reset_results() {
+        sent_ncrs_.clear();
+        received_ncrs_.clear();
+    }
+
+    /// @brief Implements the send completion handler.
+    virtual void operator ()(const NameChangeSender::Result result,
+                             NameChangeRequestPtr& ncr) {
+        // save the result and the NCR sent.
+        send_result_ = result;
+        sent_ncrs_.push_back(ncr);
+    }
+
+    /// @brief Handler invoked when test timeout is hit.
+    ///
+    /// This callback stops all running (hanging) tasks on IO service.
+    void testTimeoutHandler() {
+        io_service_.stop();
+        FAIL() << "Test timeout hit.";
+    }
+};
+
+/// @brief Tests D2QueueMgr's state model.
+/// This test verifies that:
+/// 1. Upon construction, initial state is NOT_INITTED.
+/// 2. Cannot start listening from while state is NOT_INITTED.
+/// 3. Successful listener initialization transitions from NOT_INITTED
+/// to INITTED.
+/// 4. Attempting to initialize the listener from INITTED state is not
+/// allowed.
+/// 5. Starting listener from INITTED transitions to RUNNING.
+/// 6. Stopping the  listener transitions from RUNNING to STOPPED.
+/// 7. Starting listener from STOPPED transitions to RUNNING.
+TEST_F (QueueMgrUDPTest, stateModel) {
+    // Create the queue manager.
+    ASSERT_NO_THROW(queue_mgr_.reset(new D2QueueMgr(io_service_,
+                                     VALID_MSG_CNT)));
+
+    // Verify that the initial state is NOT_INITTED.
+    EXPECT_EQ(D2QueueMgr::NOT_INITTED, queue_mgr_->getMgrState());
+
+    // Verify that trying to listen before when not initialized fails.
+    EXPECT_THROW(queue_mgr_->startListening(), D2QueueMgrError);
+
+    // Verify that initializing the listener moves us to INITTED state.
+    isc::asiolink::IOAddress addr(TEST_ADDRESS);
+    EXPECT_NO_THROW(queue_mgr_->initUDPListener(addr, LISTENER_PORT,
+                                              FMT_JSON, true));
+    EXPECT_EQ(D2QueueMgr::INITTED, queue_mgr_->getMgrState());
+
+    // Verify that attempting to initialize the listener, from INITTED
+    // is not allowed.
+    EXPECT_THROW(queue_mgr_->initUDPListener(addr, LISTENER_PORT,
+                                              FMT_JSON, true),
+                 D2QueueMgrError);
+
+    // Verify that we can enter the RUNNING from INITTED by starting the
+    // listener.
+    EXPECT_NO_THROW(queue_mgr_->startListening());
+    EXPECT_EQ(D2QueueMgr::RUNNING, queue_mgr_->getMgrState());
+
+    // Verify that we can move from RUNNING to STOPPED by stopping the
+    // listener.
+    EXPECT_NO_THROW(queue_mgr_->stopListening());
+    EXPECT_EQ(D2QueueMgr::STOPPED, queue_mgr_->getMgrState());
+
+    // Verify that we can re-enter the RUNNING from STOPPED by starting the
+    // listener.
+    EXPECT_NO_THROW(queue_mgr_->startListening());
+    EXPECT_EQ(D2QueueMgr::RUNNING, queue_mgr_->getMgrState());
+
+    // Verify that we cannot remove the listener in the RUNNING state
+    EXPECT_THROW(queue_mgr_->removeListener(), D2QueueMgrError);
+
+    // Stop the listener.
+    EXPECT_NO_THROW(queue_mgr_->stopListening());
+    EXPECT_EQ(D2QueueMgr::STOPPED, queue_mgr_->getMgrState());
+
+    // Verify that we can remove the listener in the STOPPED state and
+    // end up back in NOT_INITTED.
+    EXPECT_NO_THROW(queue_mgr_->removeListener());
+    EXPECT_EQ(D2QueueMgr::NOT_INITTED, queue_mgr_->getMgrState());
+}
+
+/// @brief Tests D2QueueMgr's ability to manage received requests
+/// This test verifies that:
+/// 1. Requests can be received, queued, and dequeued
+/// 2. Once the queue is full, a subsequent request transitions
+/// manager to STOPPED_QUEUE_FULL state.
+/// 3. Starting listener returns manager to the RUNNING state.
+/// 4. Queue contents are preserved across state transitions.
+/// 5. Clearing the queue via the clearQueue() method works.
+/// 6. Requests can be received and queued normally after the queue
+/// has been emptied.
+/// 7. setQueueMax disallows values of 0 or less than current queue size.
+TEST_F (QueueMgrUDPTest, liveFeed) {
+    NameChangeRequestPtr send_ncr;
+    NameChangeRequestPtr received_ncr;
+
+    // Create the queue manager and start listening..
+    ASSERT_NO_THROW(queue_mgr_.reset(new D2QueueMgr(io_service_,
+                                                    VALID_MSG_CNT)));
+    ASSERT_EQ(D2QueueMgr::NOT_INITTED, queue_mgr_->getMgrState());
+
+    // Verify that setting max queue size to 0 is not allowed.
+    EXPECT_THROW(queue_mgr_->setMaxQueueSize(0), D2QueueMgrError);
+    EXPECT_EQ(VALID_MSG_CNT, queue_mgr_->getMaxQueueSize());
+
+    isc::asiolink::IOAddress addr(TEST_ADDRESS);
+    ASSERT_NO_THROW(queue_mgr_->initUDPListener(addr, LISTENER_PORT,
+                                              FMT_JSON, true));
+    ASSERT_EQ(D2QueueMgr::INITTED, queue_mgr_->getMgrState());
+
+    ASSERT_NO_THROW(queue_mgr_->startListening());
+    ASSERT_EQ(D2QueueMgr::RUNNING, queue_mgr_->getMgrState());
+
+    // Place the sender into sending state.
+    ASSERT_NO_THROW(sender_->startSending(io_service_));
+    ASSERT_TRUE(sender_->amSending());
+
+    // Iterate over the list of requests sending and receiving
+    // each one.  Verify and dequeue as they arrive.
+    for (int i = 0; i < VALID_MSG_CNT; i++) {
+        // Create the ncr and add to our reference list.
+        ASSERT_NO_THROW(send_ncr = NameChangeRequest::fromJSON(valid_msgs[i]));
+        ASSERT_NO_THROW(sender_->sendRequest(send_ncr));
+
+        // running two should do the send then the receive
+        io_service_.run_one();
+        io_service_.run_one();
+
+        // Verify that the request can be added to the queue and queue
+        // size increments accordingly.
+        EXPECT_EQ(1, queue_mgr_->getQueueSize());
+
+        // Verify that peek shows the NCR we just sent
+        EXPECT_NO_THROW(received_ncr = queue_mgr_->peek());
+        EXPECT_TRUE(checkSendVsReceived(send_ncr, received_ncr));
+
+        // Verify that we and dequeue the request.
+        EXPECT_NO_THROW(queue_mgr_->dequeue());
+        EXPECT_EQ(0, queue_mgr_->getQueueSize());
+    }
+
+    // Iterate over the list of requests, sending and receiving
+    // each one. Allow them to accumulate in the queue.
+    for (int i = 0; i < VALID_MSG_CNT; i++) {
+        // Create the ncr and add to our reference list.
+        ASSERT_NO_THROW(send_ncr = NameChangeRequest::fromJSON(valid_msgs[i]));
+        ASSERT_NO_THROW(sender_->sendRequest(send_ncr));
+
+        // running two should do the send then the receive
+        EXPECT_NO_THROW(io_service_.run_one());
+        EXPECT_NO_THROW(io_service_.run_one());
+        EXPECT_EQ(i+1, queue_mgr_->getQueueSize());
+    }
+
+    // Verify that the queue is at max capacity.
+    EXPECT_EQ(queue_mgr_->getMaxQueueSize(), queue_mgr_->getQueueSize());
+
+    // Send another. The send should succeed.
+    ASSERT_NO_THROW(sender_->sendRequest(send_ncr));
+    EXPECT_NO_THROW(io_service_.run_one());
+
+    // Now execute the receive which should not throw but should move us
+    // to STOPPED_QUEUE_FULL state.
+    EXPECT_NO_THROW(io_service_.run_one());
+    EXPECT_EQ(D2QueueMgr::STOPPED_QUEUE_FULL, queue_mgr_->getMgrState());
+
+    // Verify queue size did not increase beyond max.
+    EXPECT_EQ(VALID_MSG_CNT, queue_mgr_->getQueueSize());
+
+    // Verify that setting max queue size to a value less than current size of
+    // the queue is not allowed.
+    EXPECT_THROW(queue_mgr_->setMaxQueueSize(VALID_MSG_CNT-1), D2QueueMgrError);
+    EXPECT_EQ(VALID_MSG_CNT, queue_mgr_->getQueueSize());
+
+    // Verify that we can re-enter RUNNING from STOPPED_QUEUE_FULL.
+    EXPECT_NO_THROW(queue_mgr_->startListening());
+    EXPECT_EQ(D2QueueMgr::RUNNING, queue_mgr_->getMgrState());
+
+    // Verify that the queue contents were preserved.
+    EXPECT_EQ(queue_mgr_->getMaxQueueSize(), queue_mgr_->getQueueSize());
+
+    // Verify that clearQueue works.
+    EXPECT_NO_THROW(queue_mgr_->clearQueue());
+    EXPECT_EQ(0, queue_mgr_->getQueueSize());
+
+
+    // Verify that we can again receive requests.
+    // Send should be fine.
+    ASSERT_NO_THROW(sender_->sendRequest(send_ncr));
+    EXPECT_NO_THROW(io_service_.run_one());
+
+    // Receive should succeed.
+    EXPECT_NO_THROW(io_service_.run_one());
+    EXPECT_EQ(1, queue_mgr_->getQueueSize());
+}
+
+} // end of anonymous namespace