Browse Source

[1599] Integrate the SyncUDPSocket to the tests

Actually, the SyncUDPSocket is currently a copy of the UDPSocket. It
needs to get stripped little bit.
Michal 'vorner' Vaner 13 years ago
parent
commit
f472a687b7

+ 1 - 0
src/lib/asiodns/Makefile.am

@@ -24,6 +24,7 @@ libasiodns_la_SOURCES += dns_server.h
 libasiodns_la_SOURCES += dns_service.cc dns_service.h
 libasiodns_la_SOURCES += dns_service.cc dns_service.h
 libasiodns_la_SOURCES += tcp_server.cc tcp_server.h
 libasiodns_la_SOURCES += tcp_server.cc tcp_server.h
 libasiodns_la_SOURCES += udp_server.cc udp_server.h
 libasiodns_la_SOURCES += udp_server.cc udp_server.h
+libasiodns_la_SOURCES += sync_udp_server.cc sync_udp_server.h
 libasiodns_la_SOURCES += io_fetch.cc io_fetch.h
 libasiodns_la_SOURCES += io_fetch.cc io_fetch.h
 libasiodns_la_SOURCES += logger.h logger.cc
 libasiodns_la_SOURCES += logger.h logger.cc
 
 

+ 352 - 0
src/lib/asiodns/sync_udp_server.cc

@@ -0,0 +1,352 @@
+// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <unistd.h>             // for some IPC/network system calls
+#include <errno.h>
+
+#include <boost/shared_array.hpp>
+
+#include <config.h>
+
+#include <log/dummylog.h>
+
+#include <asio.hpp>
+#include <asio/error.hpp>
+#include <asiolink/dummy_io_cb.h>
+#include <asiolink/udp_endpoint.h>
+#include <asiolink/udp_socket.h>
+#include "sync_udp_server.h"
+#include "logger.h"
+
+#include <dns/opcode.h>
+
+using namespace asio;
+using asio::ip::udp;
+using isc::log::dlog;
+
+using namespace std;
+using namespace isc::dns;
+using namespace isc::util;
+using namespace isc::asiolink;
+
+namespace isc {
+namespace asiodns {
+
+/*
+ * Some of the member variables here are shared_ptrs and some are
+ * auto_ptrs. There will be one instance of Data for the lifetime
+ * of packet. The variables that are state only for a single packet
+ * use auto_ptr, as it is more lightweight. In the case of shared
+ * configuration (eg. the callbacks, socket), we use shared_ptrs.
+ */
+struct SyncUDPServer::Data {
+    /*
+     * Constructors from parameters passed to UDPServer constructor.
+     * This instance will not be used to retrieve and answer the actual
+     * query, it will only hold parameters until we wait for the
+     * first packet. But we do initialize the socket in here.
+     */
+    Data(io_service& io_service, const ip::address& addr, const uint16_t port,
+        SimpleCallback* checkin, DNSLookup* lookup, DNSAnswer* answer) :
+        io_(io_service), done_(false),
+        checkin_callback_(checkin),lookup_callback_(lookup),
+        answer_callback_(answer)
+    {
+        // We must use different instantiations for v4 and v6;
+        // otherwise ASIO will bind to both
+        udp proto = addr.is_v4() ? udp::v4() : udp::v6();
+        socket_.reset(new udp::socket(io_service, proto));
+        socket_->set_option(socket_base::reuse_address(true));
+        if (addr.is_v6()) {
+            socket_->set_option(asio::ip::v6_only(true));
+        }
+        socket_->bind(udp::endpoint(addr, port));
+    }
+    Data(io_service& io_service, int fd, int af, SimpleCallback* checkin,
+         DNSLookup* lookup, DNSAnswer* answer) :
+         io_(io_service), done_(false),
+         checkin_callback_(checkin),lookup_callback_(lookup),
+         answer_callback_(answer)
+    {
+        if (af != AF_INET && af != AF_INET6) {
+            isc_throw(InvalidParameter, "Address family must be either AF_INET "
+                      "or AF_INET6, not " << af);
+        }
+        LOG_DEBUG(logger, DBGLVL_TRACE_BASIC, ASIODNS_FD_ADD_UDP).arg(fd);
+        try {
+            socket_.reset(new udp::socket(io_service));
+            socket_->assign(af == AF_INET6 ? udp::v6() : udp::v4(), fd);
+        } catch (const std::exception& exception) {
+            // Whatever the thing throws, it is something from ASIO and we
+            // convert it
+            isc_throw(IOError, exception.what());
+        }
+    }
+
+    /*
+     * Copy constructor. Default one would probably do, but it is unnecessary
+     * to copy many of the member variables every time we fork to handle
+     * another packet.
+     *
+     * We also allocate data for receiving the packet here.
+     */
+    Data(const Data& other) :
+        io_(other.io_), socket_(other.socket_), done_(false),
+        checkin_callback_(other.checkin_callback_),
+        lookup_callback_(other.lookup_callback_),
+        answer_callback_(other.answer_callback_)
+    {
+        // Instantiate the data buffer and endpoint that will
+        // be used by the asynchronous receive call.
+        data_.reset(new char[MAX_LENGTH]);
+        sender_.reset(new udp::endpoint());
+    }
+
+    // The ASIO service object
+    asio::io_service& io_;
+
+    // Class member variables which are dynamic, and changes to which
+    // need to accessible from both sides of a coroutine fork or from
+    // outside of the coroutine (i.e., from an asynchronous I/O call),
+    // should be declared here as pointers and allocated in the
+    // constructor or in the coroutine.  This allows state information
+    // to persist when an individual copy of the coroutine falls out
+    // scope while waiting for an event, *so long as* there is another
+    // object that is referencing the same data.  As a side-benefit, using
+    // pointers also reduces copy overhead for coroutine objects.
+    //
+    // Note: Currently these objects are allocated by "new" in the
+    // constructor, or in the function operator while processing a query.
+    // Repeated allocations from the heap for every incoming query is
+    // clearly a performance issue; this must be optimized in the future.
+    // The plan is to have a structure pre-allocate several "Data"
+    // objects which can be pulled off a free list and placed on an in-use
+    // list whenever a query comes in.  This will serve the dual purpose
+    // of improving performance and guaranteeing that state information
+    // will *not* be destroyed when any one instance of the coroutine
+    // falls out of scope while waiting for an event.
+    //
+    // Socket used to for listen for queries.  Created in the
+    // constructor and stored in a shared_ptr because socket objects
+    // are not copyable.
+    boost::shared_ptr<asio::ip::udp::socket> socket_;
+
+    // The ASIO-internal endpoint object representing the client
+    std::auto_ptr<asio::ip::udp::endpoint> sender_;
+
+    // \c IOMessage and \c Message objects to be passed to the
+    // DNS lookup and answer providers
+    std::auto_ptr<asiolink::IOMessage> io_message_;
+
+    // The original query as sent by the client
+    isc::dns::MessagePtr query_message_;
+
+    // The response message we are building
+    isc::dns::MessagePtr answer_message_;
+
+    // The buffer into which the response is written
+    isc::util::OutputBufferPtr respbuf_;
+
+    // The buffer into which the query packet is written
+    boost::shared_array<char> data_;
+
+    // State information that is entirely internal to a given instance
+    // of the coroutine can be declared here.
+    size_t bytes_;
+    bool done_;
+
+
+    // Callback functions provided by the caller
+    const SimpleCallback* checkin_callback_;
+    const DNSLookup* lookup_callback_;
+    const DNSAnswer* answer_callback_;
+
+    std::auto_ptr<IOEndpoint> peer_;
+    std::auto_ptr<IOSocket> iosock_;
+};
+
+/// The following functions implement the \c UDPServer class.
+///
+/// The constructor. It just creates new internal state object
+/// and lets it handle the initialization.
+SyncUDPServer::SyncUDPServer(io_service& io_service, const ip::address& addr,
+                     const uint16_t port, SimpleCallback* checkin,
+                     DNSLookup* lookup, DNSAnswer* answer) :
+    data_(new Data(io_service, addr, port, checkin, lookup, answer))
+{ }
+
+SyncUDPServer::SyncUDPServer(io_service& io_service, int fd, int af,
+                     SimpleCallback* checkin, DNSLookup* lookup,
+                     DNSAnswer* answer) :
+    data_(new Data(io_service, fd, af, checkin, lookup, answer))
+{ }
+
+/// The function operator is implemented with the "stackless coroutine"
+/// pattern; see internal/coroutine.h for details.
+void
+SyncUDPServer::operator()(asio::error_code ec, size_t length) {
+    /// Because the coroutine reentry block is implemented as
+    /// a switch statement, inline variable declarations are not
+    /// permitted.  Certain variables used below can be declared here.
+
+    CORO_REENTER (this) {
+        do {
+            /*
+             * This is preparation for receiving a packet. We get a new
+             * state object for the lifetime of the next packet to come.
+             * It allocates the buffers to receive data into.
+             */
+            data_.reset(new Data(*data_));
+
+            do {
+                // Begin an asynchronous receive, then yield.
+                // When the receive event is posted, the coroutine
+                // will resume immediately after this point.
+                CORO_YIELD data_->socket_->async_receive_from(
+                    buffer(data_->data_.get(), MAX_LENGTH), *data_->sender_,
+                    *this);
+
+                // Abort on fatal errors
+                // TODO: add log
+                if (ec) {
+                    using namespace asio::error;
+                    if (ec.value() != would_block && ec.value() != try_again &&
+                        ec.value() != interrupted) {
+                        return;
+                    }
+                }
+
+            } while (ec || length == 0);
+
+            data_->bytes_ = length;
+
+            /*
+             * We fork the coroutine now. One (the child) will keep
+             * the current state and handle the packet, then die and
+             * drop ownership of the state. The other (parent) will just
+             * go into the loop again and replace the current state with
+             * a new one for a new object.
+             *
+             * Actually, both of the coroutines will be a copy of this
+             * one, but that's just internal implementation detail.
+             */
+            CORO_FORK data_->io_.post(SyncUDPServer(*this));
+        } while (is_parent());
+
+        // Create an \c IOMessage object to store the query.
+        //
+        // (XXX: It would be good to write a factory function
+        // that would quickly generate an IOMessage object without
+        // all these calls to "new".)
+        data_->peer_.reset(new UDPEndpoint(*data_->sender_));
+
+        // The UDP socket class has been extended with asynchronous functions
+        // and takes as a template parameter a completion callback class.  As
+        // UDPServer does not use these extended functions (only those defined
+        // in the IOSocket base class) - but needs a UDPSocket to get hold of
+        // the underlying Boost UDP socket - DummyIOCallback is used.  This
+        // provides the appropriate operator() but is otherwise functionless.
+        data_->iosock_.reset(
+            new UDPSocket<DummyIOCallback>(*data_->socket_));
+
+        data_->io_message_.reset(new IOMessage(data_->data_.get(),
+            data_->bytes_, *data_->iosock_, *data_->peer_));
+
+        // Perform any necessary operations prior to processing an incoming
+        // query (e.g., checking for queued configuration messages).
+        //
+        // (XXX: it may be a performance issue to check in for every single
+        // incoming query; we may wish to throttle this in the future.)
+        if (data_->checkin_callback_ != NULL) {
+            (*data_->checkin_callback_)(*data_->io_message_);
+        }
+
+        // If we don't have a DNS Lookup provider, there's no point in
+        // continuing; we exit the coroutine permanently.
+        if (data_->lookup_callback_ == NULL) {
+            CORO_YIELD return;
+        }
+
+        // Instantiate objects that will be needed by the
+        // asynchronous DNS lookup and/or by the send call.
+        data_->respbuf_.reset(new OutputBuffer(0));
+        data_->query_message_.reset(new Message(Message::PARSE));
+        data_->answer_message_.reset(new Message(Message::RENDER));
+
+        // Schedule a DNS lookup, and yield.  When the lookup is
+        // finished, the coroutine will resume immediately after
+        // this point.
+        CORO_YIELD data_->io_.post(AsyncLookup<SyncUDPServer>(*this));
+
+        // The 'done_' flag indicates whether we have an answer
+        // to send back.  If not, exit the coroutine permanently.
+        if (!data_->done_) {
+            CORO_YIELD return;
+        }
+
+        // Call the DNS answer provider to render the answer into
+        // wire format
+        (*data_->answer_callback_)(*data_->io_message_, data_->query_message_,
+            data_->answer_message_, data_->respbuf_);
+
+        // Begin an asynchronous send, and then yield.  When the
+        // send completes, we will resume immediately after this point
+        // (though we have nothing further to do, so the coroutine
+        // will simply exit at that time).
+        CORO_YIELD data_->socket_->async_send_to(
+            buffer(data_->respbuf_->getData(), data_->respbuf_->getLength()),
+            *data_->sender_, *this);
+    }
+}
+
+/// Call the DNS lookup provider.  (Expected to be called by the
+/// AsyncLookup<UDPServer> handler.)
+void
+SyncUDPServer::asyncLookup() {
+    (*data_->lookup_callback_)(*data_->io_message_,
+        data_->query_message_, data_->answer_message_, data_->respbuf_, this);
+}
+
+/// Stop the UDPServer
+void
+SyncUDPServer::stop() {
+    /// Using close instead of cancel, because cancel
+    /// will only cancel the asynchornized event already submitted
+    /// to io service, the events post to io service after
+    /// cancel still can be scheduled by io service, if
+    /// the socket is cloesed, all the asynchronized event
+    /// for it won't be scheduled by io service not matter it is
+    /// submit to io serice before or after close call. And we will
+    //. get bad_descriptor error
+    data_->socket_->close();
+}
+
+/// Post this coroutine on the ASIO service queue so that it will
+/// resume processing where it left off.  The 'done' parameter indicates
+/// whether there is an answer to return to the client.
+void
+SyncUDPServer::resume(const bool done) {
+    data_->done_ = done;
+    data_->io_.post(*this);
+}
+
+bool
+SyncUDPServer::hasAnswer() {
+    return (data_->done_);
+}
+
+} // namespace asiodns
+} // namespace isc

+ 122 - 0
src/lib/asiodns/sync_udp_server.h

@@ -0,0 +1,122 @@
+// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#ifndef __SYNC_UDP_SERVER_H
+#define __SYNC_UDP_SERVER_H 1
+
+#ifndef ASIO_HPP
+#error "asio.hpp must be included before including this, see asiolink.h as to why"
+#endif
+
+#include <asiolink/simple_callback.h>
+#include <asiodns/dns_answer.h>
+#include <asiodns/dns_lookup.h>
+#include <asiodns/dns_server.h>
+
+#include <coroutine.h>
+
+namespace isc {
+namespace asiodns {
+
+//
+// Asynchronous UDP server coroutine
+//
+///
+/// \brief This class implements the coroutine to handle UDP
+///        DNS query event. As such, it is both a \c DNSServer and
+///        a \c coroutine
+///
+class SyncUDPServer : public virtual DNSServer, public virtual coroutine {
+public:
+    /// \brief Constructor
+    /// \param io_service the asio::io_service to work with
+    /// \param addr the IP address to listen for queries on
+    /// \param port the port to listen for queries on
+    /// \param checkin the callbackprovider for non-DNS events
+    /// \param lookup the callbackprovider for DNS lookup events
+    /// \param answer the callbackprovider for DNS answer events
+    explicit SyncUDPServer(asio::io_service& io_service,
+                       const asio::ip::address& addr, const uint16_t port,
+                       isc::asiolink::SimpleCallback* checkin = NULL,
+                       DNSLookup* lookup = NULL,
+                       DNSAnswer* answer = NULL);
+
+    /// \brief Constructor
+    /// \param io_service the asio::io_service to work with
+    /// \param fd the file descriptor of opened UDP socket
+    /// \param af address family, either AF_INET or AF_INET6
+    /// \param checkin the callbackprovider for non-DNS events
+    /// \param lookup the callbackprovider for DNS lookup events
+    /// \param answer the callbackprovider for DNS answer events
+    /// \throw isc::InvalidParameter if af is neither AF_INET nor AF_INET6
+    /// \throw isc::asiolink::IOError when a low-level error happens, like the
+    ///     fd is not a valid descriptor.
+    SyncUDPServer(asio::io_service& io_service, int fd, int af,
+              isc::asiolink::SimpleCallback* checkin = NULL,
+              DNSLookup* lookup = NULL, DNSAnswer* answer = NULL);
+
+    /// \brief The function operator
+    void operator()(asio::error_code ec = asio::error_code(),
+                    size_t length = 0);
+
+    /// \brief Calls the lookup callback
+    void asyncLookup();
+
+    /// \brief Stop the running server
+    /// \note once the server stopped, it can't restart
+    void stop();
+
+    /// \brief Resume operation
+    ///
+    /// \param done Set this to true if the lookup action is done and
+    ///        we have an answer
+    void resume(const bool done);
+
+    /// \brief Check if we have an answer
+    ///
+    /// \return true if we have an answer
+    bool hasAnswer();
+
+    /// \brief Returns the coroutine state value
+    ///
+    /// \return the coroutine state value
+    int value() { return (get_value()); }
+
+    /// \brief Clones the object
+    ///
+    /// \return a newly allocated copy of this object
+    DNSServer* clone() {
+        SyncUDPServer* s = new SyncUDPServer(*this);
+        return (s);
+    }
+
+private:
+    enum { MAX_LENGTH = 4096 };
+
+    /**
+     * \brief Internal state and data.
+     *
+     * We use the pimple design pattern, but not because we need to hide
+     * internal data. This class and whole header is for private use anyway.
+     * It turned out that SyncUDPServer is copied a lot, because it is a coroutine.
+     * This way the overhead of copying is lower, we copy only one shared
+     * pointer instead of about 10 of them.
+     */
+    struct Data;
+    boost::shared_ptr<Data> data_;
+};
+
+} // namespace asiodns
+} // namespace isc
+#endif // __SYNC_UDP_SERVER_H

+ 16 - 12
src/lib/asiodns/tests/dns_server_unittest.cc

@@ -19,6 +19,7 @@
 #include <asiolink/io_endpoint.h>
 #include <asiolink/io_endpoint.h>
 #include <asiolink/io_error.h>
 #include <asiolink/io_error.h>
 #include <asiodns/udp_server.h>
 #include <asiodns/udp_server.h>
+#include <asiodns/sync_udp_server.h>
 #include <asiodns/tcp_server.h>
 #include <asiodns/tcp_server.h>
 #include <asiodns/dns_answer.h>
 #include <asiodns/dns_answer.h>
 #include <asiodns/dns_lookup.h>
 #include <asiodns/dns_lookup.h>
@@ -314,10 +315,10 @@ class TCPClient : public SimpleClient {
 // two servers, UDP client will only communicate with UDP server, same for TCP
 // two servers, UDP client will only communicate with UDP server, same for TCP
 // client
 // client
 //
 //
-// This is only the active part of the test. We run the test case twice, once
+// This is only the active part of the test. We run the test case four times, once
 // for each type of initialization (once when giving it the address and port,
 // for each type of initialization (once when giving it the address and port,
-// once when giving the file descriptor), to ensure it works both ways exactly
-// the same.
+// once when giving the file descriptor) multiplied by once for each type of UDP
+// server (UDPServer and SyncUDPServer), to ensure it works exactly the same.
 template<class UDPServerClass>
 template<class UDPServerClass>
 class DNSServerTestBase : public::testing::Test {
 class DNSServerTestBase : public::testing::Test {
     protected:
     protected:
@@ -422,7 +423,8 @@ protected:
 };
 };
 
 
 // Initialization by the file descriptor
 // Initialization by the file descriptor
-class FdInit : public DNSServerTestBase<UDPServer> {
+template<class UDPServerClass>
+class FdInit : public DNSServerTestBase<UDPServerClass> {
 private:
 private:
     // Opens the file descriptor for us
     // Opens the file descriptor for us
     // It uses the low-level C api, as it seems to be the easiest way to get
     // It uses the low-level C api, as it seems to be the easiest way to get
@@ -470,12 +472,14 @@ protected:
     void SetUp() {
     void SetUp() {
         const int fdUDP(getFd(SOCK_DGRAM));
         const int fdUDP(getFd(SOCK_DGRAM));
         ASSERT_NE(-1, fdUDP) << strerror(errno);
         ASSERT_NE(-1, fdUDP) << strerror(errno);
-        udp_server_ = new UDPServer(service, fdUDP, AF_INET6, checker_,
-                                    lookup_, answer_);
+        this->udp_server_ = new UDPServerClass(this->service, fdUDP, AF_INET6,
+                                               this->checker_, this->lookup_,
+                                               this->answer_);
         const int fdTCP(getFd(SOCK_STREAM));
         const int fdTCP(getFd(SOCK_STREAM));
         ASSERT_NE(-1, fdTCP) << strerror(errno);
         ASSERT_NE(-1, fdTCP) << strerror(errno);
-        tcp_server_ = new TCPServer(service, fdTCP, AF_INET6, checker_,
-                                    lookup_, answer_);
+        this->tcp_server_ = new TCPServer(this->service, fdTCP, AF_INET6,
+                                          this->checker_, this->lookup_,
+                                          this->answer_);
     }
     }
 };
 };
 
 
@@ -483,12 +487,12 @@ protected:
 template<class Parent>
 template<class Parent>
 class DNSServerTest : public Parent { };
 class DNSServerTest : public Parent { };
 
 
-// TODO: Add the new SyncUDPServer class here. This might need some changes,
-// as it is not as generic, though
-typedef ::testing::Types<AddrPortInit<UDPServer>, FdInit> ServerTypes;
+typedef ::testing::Types<AddrPortInit<UDPServer>, AddrPortInit<SyncUDPServer>,
+                         FdInit<UDPServer>, FdInit<SyncUDPServer> >
+    ServerTypes;
 TYPED_TEST_CASE(DNSServerTest, ServerTypes);
 TYPED_TEST_CASE(DNSServerTest, ServerTypes);
 
 
-typedef ::testing::Types<UDPServer> UDPServerTypes;
+typedef ::testing::Types<UDPServer, SyncUDPServer> UDPServerTypes;
 TYPED_TEST_CASE(DNSServerTestBase, UDPServerTypes);
 TYPED_TEST_CASE(DNSServerTestBase, UDPServerTypes);
 
 
 template<class UDPServerClass>
 template<class UDPServerClass>