Browse Source

Merge branch 'trac554'

Stephen Morris 14 years ago
parent
commit
9739cbce2e
37 changed files with 2137 additions and 505 deletions
  1. 18 14
      src/lib/asiolink/Makefile.am
  2. 80 1
      src/lib/asiolink/README
  3. 1 14
      src/lib/asiolink/asiolink.h
  4. 9 7
      src/lib/asiolink/dns_service.cc
  5. 6 0
      src/lib/asiolink/dns_service.h
  6. 51 0
      src/lib/asiolink/dummy_io_cb.h
  7. 57 20
      src/lib/asiolink/udp_query.h
  8. 5 5
      src/lib/asiolink/interval_timer.cc
  9. 4 1
      src/lib/asiolink/io_address.cc
  10. 3 3
      src/lib/asiolink/io_address.h
  11. 309 0
      src/lib/asiolink/io_asio_socket.h
  12. 2 1
      src/lib/asiolink/io_endpoint.cc
  13. 3 3
      src/lib/asiolink/io_endpoint.h
  14. 35 0
      src/lib/asiolink/io_error.h
  15. 193 0
      src/lib/asiolink/io_fetch.cc
  16. 226 0
      src/lib/asiolink/io_fetch.h
  17. 4 3
      src/lib/asiolink/io_message.h
  18. 5 4
      src/lib/asiolink/io_service.cc
  19. 4 7
      src/lib/asiolink/io_socket.h
  20. 25 25
      src/lib/asiolink/recursive_query.cc
  21. 14 7
      src/lib/asiolink/tcp_server.cc
  22. 241 16
      src/lib/asiolink/tcp_socket.h
  23. 14 8
      src/lib/asiolink/tests/Makefile.am
  24. 1 0
      src/lib/asiolink/tests/interval_timer_unittest.cc
  25. 7 1
      src/lib/asiolink/tests/ioaddress_unittest.cc
  26. 2 1
      src/lib/asiolink/tests/ioendpoint_unittest.cc
  27. 188 0
      src/lib/asiolink/tests/io_fetch_unittest.cc
  28. 1 0
      src/lib/asiolink/tests/io_service_unittest.cc
  29. 4 1
      src/lib/asiolink/tests/iosocket_unittest.cc
  30. 6 1
      src/lib/asiolink/tests/recursive_query_unittest.cc
  31. 55 0
      src/lib/asiolink/tests/udp_endpoint_unittest.cc
  32. 0 145
      src/lib/asiolink/tests/udp_query_unittest.cc
  33. 287 0
      src/lib/asiolink/tests/udp_socket_unittest.cc
  34. 21 8
      src/lib/asiolink/udp_endpoint.h
  35. 0 189
      src/lib/asiolink/udp_query.cc
  36. 17 9
      src/lib/asiolink/udp_server.cc
  37. 239 11
      src/lib/asiolink/udp_socket.h

+ 18 - 14
src/lib/asiolink/Makefile.am

@@ -12,24 +12,28 @@ CLEANFILES = *.gcno *.gcda
 # have some code fragments that would hit gcc's unused-parameter warning,
 # which would make the build fail with -Werror (our default setting).
 lib_LTLIBRARIES = libasiolink.la
-libasiolink_la_SOURCES = asiolink.h
-libasiolink_la_SOURCES += io_service.cc io_service.h
-libasiolink_la_SOURCES += dns_service.cc dns_service.h
-libasiolink_la_SOURCES += dns_server.h
-libasiolink_la_SOURCES += dns_lookup.h
+libasiolink_la_SOURCES  = asiolink.h
 libasiolink_la_SOURCES += dns_answer.h
-libasiolink_la_SOURCES += simple_callback.h
+libasiolink_la_SOURCES += dns_lookup.h
+libasiolink_la_SOURCES += dns_server.h
+libasiolink_la_SOURCES += dns_service.h dns_service.cc
+libasiolink_la_SOURCES += dummy_io_cb.h
 libasiolink_la_SOURCES += interval_timer.h interval_timer.cc
-libasiolink_la_SOURCES += recursive_query.h recursive_query.cc
-libasiolink_la_SOURCES += io_socket.cc io_socket.h
+libasiolink_la_SOURCES += io_address.h io_address.cc
+libasiolink_la_SOURCES += io_endpoint.h io_endpoint.cc
+libasiolink_la_SOURCES += io_error.h
+libasiolink_la_SOURCES += io_fetch.h io_fetch.cc
 libasiolink_la_SOURCES += io_message.h
-libasiolink_la_SOURCES += io_address.cc io_address.h
-libasiolink_la_SOURCES += io_endpoint.cc io_endpoint.h
-libasiolink_la_SOURCES += udp_endpoint.h udp_socket.h
-libasiolink_la_SOURCES += udp_server.h udp_server.cc
-libasiolink_la_SOURCES += udp_query.h udp_query.cc
-libasiolink_la_SOURCES += tcp_endpoint.h tcp_socket.h
+libasiolink_la_SOURCES += io_service.h io_service.cc
+libasiolink_la_SOURCES += io_socket.h io_socket.cc
+libasiolink_la_SOURCES += recursive_query.h recursive_query.cc
+libasiolink_la_SOURCES += simple_callback.h
+libasiolink_la_SOURCES += tcp_endpoint.h
 libasiolink_la_SOURCES += tcp_server.h tcp_server.cc
+libasiolink_la_SOURCES += tcp_socket.h
+libasiolink_la_SOURCES += udp_endpoint.h
+libasiolink_la_SOURCES += udp_server.h udp_server.cc
+libasiolink_la_SOURCES += udp_socket.h
 # Note: the ordering matters: -Wno-... must follow -Wextra (defined in
 # B10_CXXFLAGS)
 libasiolink_la_CXXFLAGS = $(AM_CXXFLAGS)

+ 80 - 1
src/lib/asiolink/README

@@ -33,7 +33,7 @@ This is intended to simplify development a bit, since it allows the
 routines to be written in a straightfowrard step-step-step fashion rather
 than as a complex chain of separate handler functions.
 
-Coroutine objects (i.e., UDPServer, TCPServer and UDPQuery) are objects
+Coroutine objects (i.e., UDPServer, TCPServer and IOFetch) are objects
 with reenterable operator() members.  When an instance of one of these
 classes is called as a function, it resumes at the position where it left
 off.  Thus, a UDPServer can issue an asynchronous I/O call and specify
@@ -101,3 +101,82 @@ when the answer has arrived.  In simplified form, the DNSQuery routine is:
 Currently, DNSQuery is only implemented for UDP queries.  In future work
 it will be necessary to write code to fall back to TCP when circumstances
 require it.
+
+
+Upstream Fetches
+================
+Upstream fetches (queries by the resolver on behalf of a client) are made
+using a slightly-modified version of the pattern described above.
+
+Sockets
+-------
+First, it will be useful to understand the class hierarchy used in the
+fetch logic:
+
+        IOSocket
+           |
+      IOAsioSocket
+           |
+     +-----+-----+                
+     |           |
+UDPSocket    TCPSocket
+
+IOSocket is a wrapper class for a socket and is used by the authoritative
+server code.  It is an abstract base class, providing little more that the ability to hold the socket and to return the protocol in use.
+
+Built on this is IOAsioSocket, which adds the open, close, asyncSend and
+asyncReceive methods.  This is a template class, which takes as template
+argument the class of the object that will be used as the callback when the
+asynchronous operation completes. This object can be of any type, but must
+include an operator() method with the signature:
+
+   operator()(asio::error_code ec, size_t length)
+
+... the two arguments being the status of the completed I/O operation and
+the number of bytes transferred. (In the case of the open method, the second
+argument will be zero.)
+
+Finally, the TCPSocket and UDPSocket classes provide the body of the
+asynchronous operations.
+
+Fetch Sequence
+--------------
+The fetch is implemented by the IOFetch class, which takes as argument the
+protocol to use.  The sequence is:
+
+  REENTER:
+    render the question into a wire-format query packet
+    open()                           // Open socket and optionally connect
+    if (! synchronous) {
+        YIELD;
+    }
+    YIELD asyncSend(query)           // Send query 
+    do {
+        YIELD asyncReceive(response) // Read response
+    } while (! complete(response))
+    close()                          // Drop connection and close socket
+    server->resume
+
+The open() method opens a socket for use.  On TCP, it also makes a
+connection to the remote end.  So under UDP the operation will complete
+immediately, but under TCP it could take a long time.  One solution would be
+for the open operation to post an event to the I/O queue; then both cases
+could be regarded as being equivalent, with the completion being signalled
+by the posting of the completion event.  However UDP is the most common case
+and that would involve extra overhead.  So the open() returns a status
+indicating whether the operation completed asynchronously.  If it did, the
+code yields back to the coroutine; if not the yield is bypassed.
+
+The asynchronous send is straightforward, invoking the underlying ASIO
+function.  (Note that the address/port is supplied to both the open() and
+asyncSend() methods - it is used by the TCPSocket in open() and by the
+UDPSocket in asyncSend().)
+
+The asyncReceive() method issues an asynchronous read and waits for completion.
+The fetch object keeps track of the amount of data received so far and when
+the receive completes it calls a method on the socket to determine if the
+entire message has been received.  (This will always be the case for UDP.  On
+TCP though, the message is preceded by a count field as several reads may be
+required to read all the data.)  The fetch loops until all the data is read.
+
+Finally, the socket is closed and the server called to resume operation.

+ 1 - 14
src/lib/asiolink/asiolink.h

@@ -32,6 +32,7 @@
 #include <asiolink/io_endpoint.h>
 #include <asiolink/io_message.h>
 #include <asiolink/io_socket.h>
+#include <asiolink/io_error.h>
 
 /// \namespace asiolink
 /// \brief A wrapper interface for the ASIO library.
@@ -83,20 +84,6 @@
 /// the placeholder of callback handlers:
 /// http://think-async.com/Asio/asio-1.3.1/doc/asio/reference/asio_handler_allocate.html
 
-namespace asiolink {
-
-
-/// \brief An exception that is thrown if an error occurs within the IO
-/// module.  This is mainly intended to be a wrapper exception class for
-/// ASIO specific exceptions.
-class IOError : public isc::Exception {
-public:
-    IOError(const char* file, size_t line, const char* what) :
-        isc::Exception(file, line, what) {}
-};
-
-
-}      // asiolink
 #endif // __ASIOLINK_H
 
 // Local Variables: 

+ 9 - 7
src/lib/asiolink/dns_service.cc

@@ -12,21 +12,23 @@
 // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
 // PERFORMANCE OF THIS SOFTWARE.
 
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <unistd.h>             // for some IPC/network system calls
+
+#include <boost/lexical_cast.hpp>
+
 #include <config.h>
 
-// unistd is needed for asio.hpp with SunStudio
-#include <unistd.h>
+#include <log/dummylog.h>
 
 #include <asio.hpp>
-
+#include <asiolink/dns_service.h>
+#include <asiolink/io_service.h>
 #include <asiolink/io_service.h>
 #include <asiolink/tcp_server.h>
 #include <asiolink/udp_server.h>
 
-#include <log/dummylog.h>
-
-#include <boost/lexical_cast.hpp>
-
 using isc::log::dlog;
 
 namespace asiolink {

+ 6 - 0
src/lib/asiolink/dns_service.h

@@ -97,6 +97,12 @@ public:
     /// It will eventually be removed once the wrapper interface is
     /// generalized.
     asio::io_service& get_io_service() { return io_service_.get_io_service(); }
+
+    /// \brief Return the IO Service Object
+    ///
+    /// \return IOService object for this DNS service.
+    asiolink::IOService& getIOService() { return (io_service_);}
+
 private:
     DNSServiceImpl* impl_;
     IOService& io_service_;

+ 51 - 0
src/lib/asiolink/dummy_io_cb.h

@@ -0,0 +1,51 @@
+// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#ifndef __DUMMY_IO_CB_H
+#define __DUMMY_IO_CB_H
+
+#include <iostream>
+
+#include <asio/error.hpp>
+#include <asio/error_code.hpp>
+
+namespace asiolink {
+
+/// \brief Asynchronous I/O Completion Callback
+///
+/// The two socket classes (UDPSocket and TCPSocket) require that the I/O
+/// completion callback function have an operator() method with the appropriate
+/// signature.  The classes are templates, any class with that method and
+/// signature can be passed as the callback object - there is no need for a
+/// base class defining the interface.  However, some users of the socket
+/// classes do not use the asynchronous I/O operations, yet have to supply a
+/// template parameter.  This is the reason for this class - it is the dummy
+/// template parameter.
+
+class DummyIOCallback {
+public:
+
+    /// \brief Asynchronous I/O callback method
+    ///
+    /// \param error Unused
+    /// \param length Unused
+    void operator()(asio::error_code, size_t)
+    {
+        // TODO: log an error if this method ever gets called.
+    }
+};
+
+} // namespace asiolink
+
+#endif // __DUMMY_IO_CB_H

+ 57 - 20
src/lib/asiolink/udp_query.h

@@ -1,4 +1,4 @@
-// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2010  Internet Systems Consortium, Inc. ("ISC")
 //
 // Permission to use, copy, modify, and/or distribute this software for any
 // purpose with or without fee is hereby granted, provided that the above
@@ -12,26 +12,34 @@
 // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
 // PERFORMANCE OF THIS SOFTWARE.
 
-#ifndef __UDP_QUERY_H
-#define __UDP_QUERY_H 1
+#ifndef __IOFETCH_H
+#define __IOFETCH_H 1
 
-#ifndef ASIO_HPP
-#error "asio.hpp must be included before including this, see asiolink.h as to why"
-#endif
+#include <config.h>
+
+#include <asio.hpp>
+#include <boost/shared_array.hpp>
+#include <boost/shared_ptr.hpp>
 
 #include <dns/buffer.h>
+#include <dns/message.h>
+#include <dns/messagerenderer.h>
 
-#include <asiolink/io_address.h>
-#include <coroutine.h>
+#include <asiolink/asiolink.h>
+#include <asiolink/internal/coroutine.h>
 
-namespace asiolink {
+// This file contains TCP/UDP-specific implementations of generic classes 
+// defined in asiolink.h.  It is *not* intended to be part of the public
+// API.
 
+namespace asiolink {
 //
-// Asynchronous UDP coroutine for upstream queries
+// Asynchronous UDP/TCP coroutine for upstream fetches
 //
-class UDPQuery : public coroutine {
+//class IOFetch : public coroutine, public UdpFetch, public TcpFetch {
+class IOFetch : public coroutine {
 public:
-    // TODO Maybe this should be more generic than just for UDPQuery?
+    // TODO Maybe this should be more generic than just for IOFetch?
     ///
     /// \brief Result of the query
     ///
@@ -43,12 +51,12 @@ public:
         TIME_OUT,
         STOPPED
     };
-    /// Abstract callback for the UDPQuery.
+    /// Abstract callback for the IOFetch.
     class Callback {
     public:
         virtual ~Callback() {}
 
-        /// This will be called when the UDPQuery is completed
+        /// This will be called when the IOFetch is completed
         virtual void operator()(Result result) = 0;
     };
     ///
@@ -59,11 +67,12 @@ public:
     ///        delete it if allocated on heap.
     ///@param timeout in ms.
     ///
-    explicit UDPQuery(asio::io_service& io_service,
+    IOFetch(asio::io_service& io_service,
                       const isc::dns::Question& q,
                       const IOAddress& addr, uint16_t port,
                       isc::dns::OutputBufferPtr buffer,
-                      Callback* callback, int timeout = -1);
+                      Callback* callback, int timeout = -1, 
+                      int protocol = IPPROTO_UDP);
     void operator()(asio::error_code ec = asio::error_code(),
                     size_t length = 0);
     /// Terminate the query.
@@ -80,9 +89,37 @@ private:
     /// to many async_*() functions) and we want keep the same data. Some of
     /// the data is not copyable too.
     ///
-    struct PrivateData;
-    boost::shared_ptr<PrivateData> data_;
+    //struct IOFetchProtocol;
+    //boost::shared_ptr<IOFetchProtocol> data_;
+    //struct UdpData;
+    //struct TcpData;
+    boost::shared_ptr<UdpFetch> data_;
+    boost::shared_ptr<TcpFetch> tcp_data_;
+};
+class UdpFetch : public IOFetch {
+    public:
+        struct UdpData;
+        explicit UdpFetch(asio::io_service& io_service,
+                          const isc::dns::Question& q,
+                          const IOAddress& addr,
+                          uint16_t port,
+                          isc::dns::OutputBufferPtr buffer,
+                          IOFetch::Callback *callback,
+                          int timeout);
 };
+class TcpFetch : public IOFetch {
+    public:
+        struct TcpData;
+        explicit TcpFetch(io_service& io_service, const Question& q,
+                 const IOAddress& addr, uint16_t port,
+                 OutputBufferPtr buffer, Callback *callback, int timeout);
+};
+
+}
+
+
+#endif // __IOFETCH_H
 
-}      // namespace asiolink
-#endif // __UDP_QUERY_H
+// Local Variables: 
+// mode: c++
+// End: 

+ 5 - 5
src/lib/asiolink/interval_timer.cc

@@ -14,18 +14,18 @@
 
 #include <config.h>
 
-// unistd is needed for asio.hpp with SunStudio
-#include <unistd.h>
+#include <unistd.h>             // for some IPC/network system calls
+#include <sys/socket.h>
+#include <netinet/in.h>
 
-#include <asio.hpp>
+#include <boost/bind.hpp>
 
 #include <exceptions/exceptions.h>
 
+#include <asio.hpp>
 #include <asiolink/interval_timer.h>
 #include <asiolink/io_service.h>
 
-#include <boost/bind.hpp>
-
 namespace asiolink {
 
 class IntervalTimerImpl {

+ 4 - 1
src/lib/asiolink/io_address.cc

@@ -20,7 +20,10 @@
 
 #include <asio.hpp>
 
-#include <asiolink/asiolink.h>
+#include <exceptions/exceptions.h>
+#include <asiolink/io_address.h>
+#include <asiolink/io_error.h>
+
 
 using namespace asio;
 using asio::ip::udp;

+ 3 - 3
src/lib/asiolink/io_address.h

@@ -12,8 +12,8 @@
 // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
 // PERFORMANCE OF THIS SOFTWARE.
 
-#ifndef __IOADDRESS_H
-#define __IOADDRESS_H 1
+#ifndef __IO_ADDRESS_H
+#define __IO_ADDRESS_H 1
 
 // IMPORTANT NOTE: only very few ASIO headers files can be included in
 // this file.  In particular, asio.hpp should never be included here.
@@ -120,7 +120,7 @@ private:
 };
 
 }      // asiolink
-#endif // __IOADDRESS_H
+#endif // __IO_ADDRESS_H
 
 // Local Variables: 
 // mode: c++

+ 309 - 0
src/lib/asiolink/io_asio_socket.h

@@ -0,0 +1,309 @@
+// Copyright (C) 2010  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#ifndef __IO_ASIO_SOCKET_H
+#define __IO_ASIO_SOCKET_H 1
+
+// IMPORTANT NOTE: only very few ASIO headers files can be included in
+// this file.  In particular, asio.hpp should never be included here.
+// See the description of the namespace below.
+#include <unistd.h>             // for some network system calls
+
+#include <functional>
+#include <string>
+
+#include <exceptions/exceptions.h>
+#include <coroutine.h>
+
+#include <asiolink/io_error.h>
+#include <asiolink/io_socket.h>
+
+
+namespace asiolink {
+
+/// \brief Socket not open
+///
+/// Thrown on an attempt to do read/write to a socket that is not open.
+class SocketNotOpen : public IOError {
+public:
+    SocketNotOpen(const char* file, size_t line, const char* what) :
+        IOError(file, line, what) {}
+};
+
+
+
+/// Forward declaration of an IOEndpoint
+class IOEndpoint;
+
+
+/// \brief I/O Socket with asynchronous operations
+///
+/// This class is a wrapper for the ASIO socket classes such as
+/// \c ip::tcp::socket and \c ip::udp::socket.
+///
+/// This is the basic IOSocket with additional operations - open, send, receive
+/// and close.  Depending on how the asiolink code develops, it may be a
+/// temporary class: its main use is to add the template parameter needed for
+/// the derived classes UDPSocket and TCPSocket but without changing the
+/// signature of the more basic IOSocket class.
+///
+/// We may revisit this decision when we generalize the wrapper and more
+/// modules use it.  Also, at that point we may define a separate (visible)
+/// derived class for testing purposes rather than providing factory methods
+/// (i.e., getDummy variants below).
+///
+/// TODO: Check if IOAsioSocket class is still needed
+///
+/// \param C Template parameter identifying type of the callback object.
+
+template <typename C>
+class IOAsioSocket : public IOSocket {
+    ///
+    /// \name Constructors and Destructor
+    ///
+    /// Note: The copy constructor and the assignment operator are
+    /// intentionally defined as private, making this class non-copyable.
+    //@{
+private:
+    IOAsioSocket(const IOAsioSocket<C>& source);
+    IOAsioSocket& operator=(const IOAsioSocket<C>& source);
+protected:
+    /// \brief The default constructor.
+    ///
+    /// This is intentionally defined as \c protected as this base class
+    /// should never be instantiated (except as part of a derived class).
+    IOAsioSocket() {}
+public:
+    /// The destructor.
+    virtual ~IOAsioSocket() {}
+    //@}
+
+    /// \brief Return the "native" representation of the socket.
+    ///
+    /// In practice, this is the file descriptor of the socket for
+    /// UNIX-like systems so the current implementation simply uses
+    /// \c int as the type of the return value.
+    /// We may have to need revisit this decision later.
+    ///
+    /// In general, the application should avoid using this method;
+    /// it essentially discloses an implementation specific "handle" that
+    /// can change the internal state of the socket (consider the
+    /// application closes it, for example).
+    /// But we sometimes need to perform very low-level operations that
+    /// requires the native representation.  Passing the file descriptor
+    /// to a different process is one example.
+    /// This method is provided as a necessary evil for such limited purposes.
+    ///
+    /// This method never throws an exception.
+    ///
+    /// \return The native representation of the socket.  This is the socket
+    /// file descriptor for UNIX-like systems.
+    virtual int getNative() const = 0;
+
+    /// \brief Return the transport protocol of the socket.
+    ///
+    /// Currently, it returns \c IPPROTO_UDP for UDP sockets, and
+    /// \c IPPROTO_TCP for TCP sockets.
+    ///
+    /// This method never throws an exception.
+    ///
+    /// \return IPPROTO_UDP for UDP sockets
+    /// \return IPPROTO_TCP for TCP sockets
+    virtual int getProtocol() const = 0;
+
+    /// \brief Open AsioSocket
+    ///
+    /// Opens the socket for asynchronous I/O.  On a UDP socket, this is merely
+    /// an "open()" on the underlying socket (so completes immediately), but on
+    /// a TCP socket it also connects to the remote end (which is done as an
+    /// asynchronous operation).
+    ///
+    /// For TCP, signalling of the completion of the operation is done by
+    /// by calling the callback function in the normal way.  This could be done
+    /// for UDP (by posting en event on the event queue); however, that will
+    /// incur additional overhead in the most common case.  Instead, the return
+    /// value indicates whether the operation was asynchronous or not. If yes,
+    /// (i.e. TCP) the callback has been posted to the event queue: if no (UDP),
+    /// no callback has been posted (in which case it is up to the caller as to
+    /// whether they want to manually post the callback themself.)
+    ///
+    /// \param endpoint Pointer to the endpoint object.  This is ignored for
+    /// a UDP socket (the target is specified in the send call), but should
+    /// be of type TCPEndpoint for a TCP connection.
+    /// \param callback I/O Completion callback, called when the operation has
+    /// completed, but only if the operation was asynchronous.
+    ///
+    /// \return true if an asynchronous operation was started and the caller
+    /// should yield and wait for completion, false if the operation was
+    /// completed synchronously and no callback was queued.
+    virtual bool open(const IOEndpoint* endpoint, C& callback) = 0;
+
+    /// \brief Send Asynchronously
+    ///
+    /// This corresponds to async_send_to() for UDP sockets and async_send()
+    /// for TCP.  In both cases an endpoint argument is supplied indicating the
+    /// target of the send - this is ignored for TCP.
+    ///
+    /// \param data Data to send
+    /// \param length Length of data to send
+    /// \param endpoint Target of the send
+    /// \param callback Callback object.
+    virtual void asyncSend(const void* data, size_t length,
+        const IOEndpoint* endpoint, C& callback) = 0;
+
+    /// \brief Receive Asynchronously
+    ///
+    /// This correstponds to async_receive_from() for UDP sockets and
+    /// async_receive() for TCP.  In both cases, an endpoint argument is
+    /// supplied to receive the source of the communication.  For TCP it will
+    /// be filled in with details of the connection.
+    ///
+    /// \param data Buffer to receive incoming message
+    /// \param length Length of the data buffer
+    /// \param cumulative Amount of data that should already be in the buffer.
+    /// \param endpoint Source of the communication
+    /// \param callback Callback object
+    virtual void asyncReceive(void* data, size_t length, size_t cumulative,
+        IOEndpoint* endpoint, C& callback) = 0;
+
+    /// \brief Checks if the data received is complete.
+    ///
+    /// This applies to TCP receives, where the data is a byte stream and a
+    /// receive is not guaranteed to receive the entire message.  DNS messages
+    /// over TCP are prefixed by a two-byte count field.  This method takes the
+    /// amount received so far and the amount received in this I/O and checks
+    /// if the message is complete, returning the appropriate indication.  As
+    /// a side-effect, it also updates the amount received.
+    ///
+    /// For a UDP receive, all the data is received in one I/O, so this is
+    /// effectively a no-op (although it does update the amount received).
+    ///
+    /// \param data Data buffer containing data to date
+    /// \param length Amount of data received in last asynchronous I/O
+    /// \param cumulative On input, amount of data received before the last
+    /// I/O.  On output, the total amount of data received to date.
+    ///
+    /// \return true if the receive is complete, false if another receive is
+    /// needed.
+    virtual bool receiveComplete(void* data, size_t length,
+        size_t& cumulative) = 0;
+
+    /// \brief Cancel I/O On AsioSocket
+    virtual void cancel() = 0;
+
+    /// \brief Close socket
+    virtual void close() = 0;
+};
+
+
+#include "io_socket.h"
+
+/// \brief The \c DummyAsioSocket class is a concrete derived class of
+/// \c IOAsioSocket that is not associated with any real socket.
+///
+/// This main purpose of this class is tests, where it may be desirable to
+/// instantiate an \c IOAsioSocket object without involving system resource
+/// allocation such as real network sockets.
+///
+/// \param C Template parameter identifying type of the callback object.
+
+template <typename C>
+class DummyAsioSocket : public IOAsioSocket<C> {
+private:
+    DummyAsioSocket(const DummyAsioSocket<C>& source);
+    DummyAsioSocket& operator=(const DummyAsioSocket<C>& source);
+public:
+    /// \brief Constructor from the protocol number.
+    ///
+    /// The protocol must validly identify a standard network protocol.
+    /// For example, to specify TCP \c protocol must be \c IPPROTO_TCP.
+    ///
+    /// \param protocol The network protocol number for the socket.
+    DummyAsioSocket(const int protocol) : protocol_(protocol) {}
+
+    /// \brief A dummy derived method of \c IOAsioSocket::getNative().
+    ///
+    /// \return Always returns -1 as the object is not associated with a real
+    /// (native) socket.
+    virtual int getNative() const { return (-1); }
+
+    /// \brief A dummy derived method of \c IOAsioSocket::getProtocol().
+    ///
+    /// \return Protocol socket was created with
+    virtual int getProtocol() const { return (protocol_); }
+
+
+    /// \brief Open AsioSocket
+    ///
+    /// A call that is a no-op on UDP sockets, this opens a connection to the
+    /// system identified by the given endpoint.
+    ///
+    /// \param endpoint Unused
+    /// \param callback Unused.
+    ///false indicating that the operation completed synchronously.
+    virtual bool open(const IOEndpoint*, C&) {
+        return (false);
+    }
+
+    /// \brief Send Asynchronously
+    ///
+    /// Must be supplied as it is abstract in the base class.
+    ///
+    /// \param data Unused
+    /// \param length Unused
+    /// \param endpoint Unused
+    /// \param callback Unused
+    virtual void asyncSend(const void*, size_t, const IOEndpoint*, C&) {
+    }
+
+    /// \brief Receive Asynchronously
+    ///
+    /// Must be supplied as it is abstract in the base class.
+    ///
+    /// \param data Unused
+    /// \param length Unused
+    /// \param cumulative Unused
+    /// \param endpoint Unused
+    /// \param callback Unused
+    virtual void asyncReceive(void* data, size_t, size_t, IOEndpoint*, C&) { } 
+    /// \brief Checks if the data received is complete.
+    ///
+    /// \param data Unused
+    /// \param length Unused
+    /// \param cumulative Unused
+    ///
+    /// \return Always true
+    virtual bool receiveComplete(void*, size_t, size_t&) {
+        return (true);
+    }
+
+    /// \brief Cancel I/O On AsioSocket
+    ///
+    /// Must be supplied as it is abstract in the base class.
+    virtual void cancel() {
+    }
+
+    /// \brief Close socket
+    ///
+    /// Must be supplied as it is abstract in the base class.
+    virtual void close() {
+    }
+
+private:
+    const int protocol_;
+};
+
+} // namespace asiolink
+
+#endif // __IO_ASIO_SOCKET_H

+ 2 - 1
src/lib/asiolink/io_endpoint.cc

@@ -20,7 +20,8 @@
 
 #include <asio.hpp>
 
-#include <asiolink/asiolink.h>
+#include <asiolink/io_address.h>
+#include <asiolink/io_error.h>
 #include <asiolink/tcp_endpoint.h>
 #include <asiolink/udp_endpoint.h>
 

+ 3 - 3
src/lib/asiolink/io_endpoint.h

@@ -12,8 +12,8 @@
 // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
 // PERFORMANCE OF THIS SOFTWARE.
 
-#ifndef __IOENDPOINT_H
-#define __IOENDPOINT_H 1
+#ifndef __IO_ENDPOINT_H
+#define __IO_ENDPOINT_H 1
 
 // IMPORTANT NOTE: only very few ASIO headers files can be included in
 // this file.  In particular, asio.hpp should never be included here.
@@ -115,7 +115,7 @@ public:
 };
 
 }      // asiolink
-#endif // __IOENDPOINT_H
+#endif // __IO_ENDPOINT_H
 
 // Local Variables: 
 // mode: c++

+ 35 - 0
src/lib/asiolink/io_error.h

@@ -0,0 +1,35 @@
+// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+
+#ifndef __IO_ERROR_H
+#define __IO_ERROR_H
+
+#include <exceptions/exceptions.h>
+
+namespace asiolink {
+
+/// \brief An exception that is thrown if an error occurs within the IO
+/// module.  This is mainly intended to be a wrapper exception class for
+/// ASIO specific exceptions.
+class IOError : public isc::Exception {
+public:
+    IOError(const char* file, size_t line, const char* what) :
+        isc::Exception(file, line, what) {}
+};
+
+
+}      // asiolink
+
+#endif // __IO_ERROR_H

+ 193 - 0
src/lib/asiolink/io_fetch.cc

@@ -0,0 +1,193 @@
+// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#include <config.h>
+
+#include <unistd.h>             // for some IPC/network system calls
+#include <sys/socket.h>
+#include <netinet/in.h>
+
+#include <boost/bind.hpp>
+
+#include <dns/message.h>
+#include <dns/messagerenderer.h>
+#include <dns/opcode.h>
+#include <dns/rcode.h>
+#include <log/dummylog.h>
+
+#include <asio.hpp>
+#include <asiolink/io_fetch.h>
+
+using namespace asio;
+using namespace isc::dns;
+using namespace isc::log;
+using namespace std;
+
+namespace asiolink {
+
+/// IOFetch Constructor - just initialize the private data
+
+IOFetch::IOFetch(int protocol, IOService& service,
+    const isc::dns::Question& question, const IOAddress& address, uint16_t port,
+    isc::dns::OutputBufferPtr& buff, Callback* cb, int wait)
+    :
+    data_(new IOFetch::IOFetchData(protocol, service, question, address,
+        port, buff, cb, wait))
+{
+}
+
+/// The function operator is implemented with the "stackless coroutine"
+/// pattern; see internal/coroutine.h for details.
+
+void
+IOFetch::operator()(error_code ec, size_t length) {
+    if (ec || data_->stopped) {
+        return;
+    }
+
+    CORO_REENTER (this) {
+
+        /// Generate the upstream query and render it to wire format
+        /// This is done in a different scope to allow inline variable
+        /// declarations.
+        {
+            Message msg(Message::RENDER);
+            
+            // TODO: replace with boost::random or some other suitable PRNG
+            msg.setQid(0);
+            msg.setOpcode(Opcode::QUERY());
+            msg.setRcode(Rcode::NOERROR());
+            msg.setHeaderFlag(Message::HEADERFLAG_RD);
+            msg.addQuestion(data_->question);
+            MessageRenderer renderer(*data_->msgbuf);
+            msg.toWire(renderer);
+
+            // As this is a new fetch, clear the amount of data received
+            data_->cumulative = 0;
+
+            dlog("Sending " + msg.toText() + " to " +
+                data_->remote->getAddress().toText());
+        }
+
+
+        // If we timeout, we stop, which will shutdown everything and
+        // cancel all other attempts to run inside the coroutine
+        if (data_->timeout != -1) {
+            data_->timer.expires_from_now(boost::posix_time::milliseconds(
+                data_->timeout));
+            data_->timer.async_wait(boost::bind(&IOFetch::stop, *this,
+                TIME_OUT));
+        }
+
+        // Open a connection to the target system.  For speed, if the operation
+        // was completed synchronously (i.e. UDP operation) we bypass the yield.
+        if (data_->socket->open(data_->remote.get(), *this)) {
+            CORO_YIELD;
+        }
+
+        // Begin an asynchronous send, and then yield.  When the send completes
+        // send completes, we will resume immediately after this point.
+        CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(),
+            data_->msgbuf->getLength(), data_->remote.get(), *this);
+
+        // Now receive the response.  Since TCP may not receive the entire
+        // message in one operation, we need to loop until we have received
+        // it. (This can't be done within the asyncReceive() method because
+        // each I/O operation will be done asynchronously and between each one
+        // we need to yield ... and we *really* don't want to set up another
+        // coroutine within that method.)  So after each receive (and yield),
+        // we check if the operation is complete and if not, loop to read again.
+        do {
+            CORO_YIELD data_->socket->asyncReceive(data_->data.get(),
+                static_cast<size_t>(MAX_LENGTH), data_->cumulative,
+                data_->remote.get(), *this);
+        } while (!data_->socket->receiveComplete(data_->data.get(), length,
+            data_->cumulative));
+
+        // The message is not rendered yet, so we can't print it easily
+        dlog("Received response from " + data_->remote->getAddress().toText());
+
+        /// Copy the answer into the response buffer.  (TODO: If the
+        /// OutputBuffer object were made to meet the requirements of
+        /// a MutableBufferSequence, then it could be written to directly
+        /// by async_receive_from() and this additional copy step would
+        /// be unnecessary.)
+        data_->buffer->writeData(data_->data.get(), length);
+
+        // Finished with this socket, so close it.
+        data_->socket->close();
+
+        /// We are done
+        stop(SUCCESS);
+    }
+}
+
+// Function that stops the coroutine sequence.  It is called either when the
+// query finishes or when the timer times out.  Either way, it sets the
+// "stopped_" flag and cancels anything that is in progress.
+//
+// As the function may be entered multiple times as things wind down, the
+// stopped_ flag checks if stop() has already been called.  If it has,
+// subsequent calls are no-ops.
+
+void
+IOFetch::stop(Result result) {
+
+    if (!data_->stopped) {
+
+        // Mark the fetch as stopped to prevent other completion callbacks
+        // (invoked because of the calls to cancel()) from executing the
+        // cancel calls again.
+        //
+        // In a single threaded environment, the callbacks won't be invoked
+        // until this one completes. In a multi-threaded environment, they may
+        // well be, in which case the testing (and setting) of the stopped_
+        // variable should be done inside a mutex (and the stopped_ variable
+        // declared as "volatile").
+        //
+        // TODO: Update testing of stopped_ if threads are used.
+        data_->stopped = true;
+
+        switch (result) {
+            case TIME_OUT:
+                dlog("Query timed out");
+                break;
+
+            case STOPPED:
+                dlog("Query stopped");
+                break;
+
+            default:
+                ;
+        }
+
+        // Stop requested, cancel and I/O's on the socket and shut it down,
+        // and cancel the timer.
+        data_->socket->cancel();
+        data_->socket->close();
+
+        data_->timer.cancel();
+
+        // Execute the I/O completion callback (if present).
+        if (data_->callback) {
+            (*(data_->callback))(result);
+        }
+
+        // Mark that stop() has now been called.
+
+    }
+}
+
+} // namespace asiolink
+

+ 226 - 0
src/lib/asiolink/io_fetch.h

@@ -0,0 +1,226 @@
+// Copyright (C) 2010  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#ifndef __IO_FETCH_H
+#define __IO_FETCH_H 1
+
+#include <config.h>
+
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <unistd.h>             // for some IPC/network system calls
+
+#include <boost/shared_array.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/date_time/posix_time/posix_time_types.hpp>
+#include <asio/deadline_timer.hpp>
+
+#include <coroutine.h>
+
+#include <dns/buffer.h>
+#include <dns/question.h>
+
+#include <asiolink/io_asio_socket.h>
+#include <asiolink/io_endpoint.h>
+#include <asiolink/io_service.h>
+#include <asiolink/tcp_socket.h>
+#include <asiolink/tcp_endpoint.h>
+#include <asiolink/udp_socket.h>
+#include <asiolink/udp_endpoint.h>
+
+
+namespace asiolink {
+
+
+/// \brief Upstream Fetch Processing
+///
+/// IOFetch is the class used to send upstream fetches and to handle responses.
+///
+/// \param E Endpoint type to use.
+
+class IOFetch : public coroutine {
+public:
+
+    /// \brief Result of Upstream Fetch
+    ///
+    /// Note that this applies to the status of I/Os in the fetch - a fetch
+    /// that resulted in a packet being received from the server is a SUCCESS,
+    /// even if the contents of the packet indicate that some error occurred.
+    enum Result {
+        SUCCESS = 0,        ///< Success, fetch completed
+        TIME_OUT,           ///< Failure, fetch timed out
+        STOPPED,            ///< Control code, fetch has been stopped
+        NOTSET              ///< For testing, indicates value not set
+    };
+
+    // The next enum is a "trick" to allow constants to be defined in a class
+    // declaration.
+
+    /// \brief Integer Constants
+    enum {
+        MAX_LENGTH = 4096   ///< Maximum size of receive buffer
+    };
+
+    /// \brief I/O Fetch Callback
+    ///
+    /// Class of callback object for when the fetch itself has completed - an
+    /// object of this class is passed to the IOFetch constructor and its
+    /// operator() method called when the fetch completes.
+    ///
+    /// Note the difference between the two operator() methods:
+    /// - IOFetch::operator() callback is called when an asynchronous I/O has
+    ///   completed.
+    /// - IOFetch::Callback::operator() is called when an upstream fetch - which
+    ///   may have involved several asynchronous I/O operations - has completed.
+    ///
+    /// This is an abstract class.
+    class Callback {
+    public:
+        /// \brief Default Constructor
+        Callback()
+        {}
+
+        /// \brief Virtual Destructor
+        virtual ~Callback()
+        {}
+
+        /// \brief Callback method called when the fetch completes
+        ///
+        /// \brief result Result of the fetch
+        virtual void operator()(Result result) = 0;
+    };
+
+    /// \brief IOFetch Data
+    ///
+    /// The data for IOFetch is held in a separate struct pointed to by a
+    /// shared_ptr object.  This is because the IOFetch object will be copied
+    /// often (it is used as a coroutine and passed as callback to many
+    /// async_*() functions) and we want keep the same data).  Organising the
+    /// data in this way keeps copying to a minimum.
+    struct IOFetchData {
+
+        // The next two members are shared pointers to a base class because what
+        // is actually instantiated depends on whether the fetch is over UDP or
+        // TCP, which is not known until construction of the IOFetch.  Use of
+        // a shared pointer here is merely to ensure deletion when the data
+        // object is deleted.
+        boost::shared_ptr<IOAsioSocket<IOFetch> > socket;
+                                                ///< Socket to use for I/O
+        boost::shared_ptr<IOEndpoint> remote;   ///< Where the fetch was sent
+        isc::dns::Question          question;   ///< Question to be asked
+        isc::dns::OutputBufferPtr   msgbuf;     ///< Wire buffer for question
+        isc::dns::OutputBufferPtr   buffer;     ///< Received data held here
+        boost::shared_array<char>   data;       ///< Temporary array for data
+        IOFetch::Callback*          callback;   ///< Called on I/O Completion
+        size_t                      cumulative; ///< Cumulative received amount
+        bool                        stopped;    ///< Have we stopped running?
+        asio::deadline_timer        timer;      ///< Timer to measure timeouts
+        int                         timeout;    ///< Timeout in ms
+
+        /// \brief Constructor
+        ///
+        /// Just fills in the data members of the IOFetchData structure
+        ///
+        /// \param protocol either IPPROTO_UDP or IPPROTO_TCP
+        /// \param service I/O Service object to handle the asynchronous
+        ///     operations.
+        /// \param query DNS question to send to the upstream server.
+        /// \param address IP address of upstream server
+        /// \param port Port to use for the query
+        /// \param buff Output buffer into which the response (in wire format)
+        ///     is written (if a response is received).
+        /// \param cb Callback object containing the callback to be called
+        ///     when we terminate.  The caller is responsible for managing this
+        ///     object and deleting it if necessary.
+        /// \param wait Timeout for the fetch (in ms).
+        ///
+        /// TODO: May need to alter constructor (see comment 4 in Trac ticket #554)
+        IOFetchData(int protocol, IOService& service,
+            const isc::dns::Question& query, const IOAddress& address,
+            uint16_t port, isc::dns::OutputBufferPtr& buff, Callback* cb,
+            int wait)
+            :
+            socket((protocol == IPPROTO_UDP) ?
+                static_cast<IOAsioSocket<IOFetch>*>(
+                    new UDPSocket<IOFetch>(service)) :
+                static_cast<IOAsioSocket<IOFetch>*>(
+                    new TCPSocket<IOFetch>(service))
+                ),
+            remote((protocol == IPPROTO_UDP) ?
+                static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
+                static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
+                ),
+            question(query),
+            msgbuf(new isc::dns::OutputBuffer(512)),
+            buffer(buff),
+            data(new char[IOFetch::MAX_LENGTH]),
+            callback(cb),
+            cumulative(0),
+            stopped(false),
+            timer(service.get_io_service()),
+            timeout(wait)
+        {}
+    };
+
+    /// \brief Constructor.
+    ///
+    /// Creates the object that will handle the upstream fetch.
+    ///
+    /// TODO: Need to randomise the source port
+    ///
+    /// \param protocol Fetch protocol, either IPPROTO_UDP or IPPROTO_TCP
+    /// \param service I/O Service object to handle the asynchronous
+    ///     operations.
+    /// \param question DNS question to send to the upstream server.
+    /// \param buff Output buffer into which the response (in wire format)
+    ///     is written (if a response is received).
+    /// \param cb Callback object containing the callback to be called
+    ///     when we terminate.  The caller is responsible for managing this
+    ///     object and deleting it if necessary.
+    /// \param address IP address of upstream server
+    /// \param port Port to which to connect on the upstream server
+    /// (default = 53)
+    /// \param wait Timeout for the fetch (in ms).  The default value of
+    ///     -1 indicates no timeout.
+    IOFetch(int protocol, IOService& service,
+        const isc::dns::Question& question, const IOAddress& address,
+        uint16_t port, isc::dns::OutputBufferPtr& buff, Callback* cb,
+        int wait = -1);
+    
+    /// \brief Coroutine entry point
+    ///
+    /// The operator() method is the method in which the coroutine code enters
+    /// this object when an operation has been completed.
+    ///
+    /// \param ec Error code, the result of the last asynchronous I/O operation.
+    /// \param length Amount of data received on the last asynchronous read
+    void operator()(asio::error_code ec = asio::error_code(),
+        size_t length = 0);
+
+    /// \brief Terminate query
+    ///
+    /// This method can be called at any point.  It terminates the current
+    /// query with the specified reason.
+    ///
+    /// \param reason Reason for terminating the query
+    void stop(Result reason = STOPPED);
+
+private:
+    boost::shared_ptr<IOFetchData>  data_;   ///< Private data
+
+};
+
+} // namespace asiolink
+
+#endif // __IO_FETCH_H

+ 4 - 3
src/lib/asiolink/io_message.h

@@ -12,8 +12,8 @@
 // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
 // PERFORMANCE OF THIS SOFTWARE.
 
-#ifndef __IOMESSAGE_H
-#define __IOMESSAGE_H 1
+#ifndef __IO_MESSAGE_H
+#define __IO_MESSAGE_H 1
 
 // IMPORTANT NOTE: only very few ASIO headers files can be included in
 // this file.  In particular, asio.hpp should never be included here.
@@ -46,6 +46,7 @@ class IOMessage {
     ///
     /// \name Constructors and Destructor
     ///
+
     /// Note: The copy constructor and the assignment operator are
     /// intentionally defined as private, making this class non-copyable.
     //@{
@@ -96,7 +97,7 @@ private:
 
 
 }      // asiolink
-#endif // __IOMESSAGE_H
+#endif // __IO_MESSAGE_H
 
 // Local Variables: 
 // mode: c++

+ 5 - 4
src/lib/asiolink/io_service.cc

@@ -11,13 +11,14 @@
 // LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
 // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
 // PERFORMANCE OF THIS SOFTWARE.
-#include <config.h>
 
-// unistd is needed for asio.hpp with SunStudio
-#include <unistd.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <unistd.h>             // for some IPC/network system calls
 
-#include <asio.hpp>
+#include <config.h>
 
+#include <asio.hpp>
 #include <asiolink/io_service.h>
 
 namespace asiolink {

+ 4 - 7
src/lib/asiolink/io_socket.h

@@ -12,8 +12,8 @@
 // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
 // PERFORMANCE OF THIS SOFTWARE.
 
-#ifndef __IOSOCKET_H
-#define __IOSOCKET_H 1
+#ifndef __IO_SOCKET_H
+#define __IO_SOCKET_H 1
 
 // IMPORTANT NOTE: only very few ASIO headers files can be included in
 // this file.  In particular, asio.hpp should never be included here.
@@ -119,9 +119,6 @@ public:
     static IOSocket& getDummyTCPSocket();
 };
 
-}      // asiolink
-#endif // __IOSOCKET_H
+} // namespace asiolink
 
-// Local Variables: 
-// mode: c++
-// End: 
+#endif // __IO_SOCKET_H

+ 25 - 25
src/lib/asiolink/recursive_query.cc

@@ -12,29 +12,29 @@
 // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
 // PERFORMANCE OF THIS SOFTWARE.
 
-#include <config.h>
-
+#include <netinet/in.h>
 #include <stdlib.h>
+#include <sys/socket.h>
+#include <unistd.h>             // for some IPC/network system calls
 
-// unistd is needed for asio.hpp with SunStudio
-#include <unistd.h>
-
-#include <asio.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/bind.hpp>
 
-#include <asiolink/recursive_query.h>
-#include <asiolink/dns_service.h>
-#include <asiolink/udp_query.h>
+#include <config.h>
 
 #include <log/dummylog.h>
 
-#include <boost/lexical_cast.hpp>
-#include <boost/bind.hpp>
-
 #include <dns/question.h>
 #include <dns/message.h>
 
 #include <resolve/resolve.h>
 
+#include <asio.hpp>
+#include <asiolink/dns_service.h>
+#include <asiolink/io_fetch.h>
+#include <asiolink/io_service.h>
+#include <asiolink/recursive_query.h>
+
 using isc::log::dlog;
 using namespace isc::dns;
 
@@ -68,10 +68,10 @@ typedef std::pair<std::string, uint16_t> addr_t;
  *
  * Used by RecursiveQuery::sendQuery.
  */
-class RunningQuery : public UDPQuery::Callback {
+class RunningQuery : public IOFetch::Callback {
 private:
     // The io service to handle async calls
-    asio::io_service& io_;
+    IOService& io_;
 
     // Info for (re)sending the query (the question and destination)
     Question question_;
@@ -141,22 +141,22 @@ private:
             int serverIndex = rand() % uc;
             dlog("Sending upstream query (" + question_.toText() +
                 ") to " + upstream_->at(serverIndex).first);
-            UDPQuery query(io_, question_,
+            IOFetch query(IPPROTO_UDP, io_, question_,
                 upstream_->at(serverIndex).first,
                 upstream_->at(serverIndex).second, buffer_, this,
                 query_timeout_);
             ++queries_out_;
-            io_.post(query);
+            io_.get_io_service().post(query);
         } else if (zs > 0) {
             int serverIndex = rand() % zs;
             dlog("Sending query to zone server (" + question_.toText() +
                 ") to " + zone_servers_.at(serverIndex).first);
-            UDPQuery query(io_, question_,
+            IOFetch query(IPPROTO_IDP, io_, question_,
                 zone_servers_.at(serverIndex).first,
                 zone_servers_.at(serverIndex).second, buffer_, this,
                 query_timeout_);
             ++queries_out_;
-            io_.post(query);
+            io_.get_io_service().post(query);
         } else {
             dlog("Error, no upstream servers to send to.");
         }
@@ -283,7 +283,7 @@ private:
     }
     
 public:
-    RunningQuery(asio::io_service& io,
+    RunningQuery(IOService& io,
         const Question &question,
         MessagePtr answer_message,
         boost::shared_ptr<AddressVector> upstream,
@@ -302,8 +302,8 @@ public:
         cname_count_(0),
         query_timeout_(query_timeout),
         retries_(retries),
-        client_timer(io),
-        lookup_timer(io),
+        client_timer(io.get_io_service()),
+        lookup_timer(io.get_io_service()),
         queries_out_(0),
         done_(false),
         answer_sent_(false)
@@ -386,10 +386,10 @@ public:
     }
 
     // This function is used as callback from DNSQuery.
-    virtual void operator()(UDPQuery::Result result) {
+    virtual void operator()(IOFetch::Result result) {
         // XXX is this the place for TCP retry?
         --queries_out_;
-        if (!done_ && result != UDPQuery::TIME_OUT) {
+        if (!done_ && result != IOFetch::TIME_OUT) {
             // we got an answer
             Message incoming(Message::PARSE);
             InputBuffer ibuf(buffer_->getData(), buffer_->getLength());
@@ -423,7 +423,7 @@ void
 RecursiveQuery::resolve(const QuestionPtr& question,
     const isc::resolve::ResolverInterface::CallbackPtr callback)
 {
-    asio::io_service& io = dns_service_.get_io_service();
+    IOService& io = dns_service_.getIOService();
 
     MessagePtr answer_message(new Message(Message::RENDER));
     OutputBufferPtr buffer(new OutputBuffer(0));
@@ -444,7 +444,7 @@ RecursiveQuery::resolve(const Question& question,
     // the message should be sent via TCP or UDP, or sent initially via
     // UDP and then fall back to TCP on failure, but for the moment
     // we're only going to handle UDP.
-    asio::io_service& io = dns_service_.get_io_service();
+    IOService& io = dns_service_.getIOService();
 
     isc::resolve::ResolverInterface::CallbackPtr crs(
         new isc::resolve::ResolverCallbackServer(server));

+ 14 - 7
src/lib/asiolink/tcp_server.cc

@@ -14,18 +14,18 @@
 
 #include <config.h>
 
-#include <boost/shared_array.hpp>
-
-// unistd is needed for asio.hpp with SunStudio
-#include <unistd.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <unistd.h>             // for some IPC/network system calls
 
-#include <asio.hpp>
+#include <boost/shared_array.hpp>
 
 #include <log/dummylog.h>
 
+#include <asio.hpp>
+#include <asiolink/dummy_io_cb.h>
 #include <asiolink/tcp_endpoint.h>
 #include <asiolink/tcp_socket.h>
-
 #include <asiolink/tcp_server.h>
 
 
@@ -118,7 +118,14 @@ TCPServer::operator()(error_code ec, size_t length) {
         // that would quickly generate an IOMessage object without
         // all these calls to "new".)
         peer_.reset(new TCPEndpoint(socket_->remote_endpoint()));
-        iosock_.reset(new TCPSocket(*socket_));
+
+        // The TCP socket class has been extended with asynchronous functions
+        // and takes as a template parameter a completion callback class.  As
+        // TCPServer does not use these extended functions (only those defined
+        // in the IOSocket base class) - but needs a TCPSocket to get hold of
+        // the underlying Boost TCP socket - DummyIOCallback is used.  This
+        // provides the appropriate operator() but is otherwise functionless.
+        iosock_.reset(new TCPSocket<DummyIOCallback>(*socket_));
         io_message_.reset(new IOMessage(data_.get(), length, *iosock_, *peer_));
         bytes_ = length;
 

+ 241 - 16
src/lib/asiolink/tcp_socket.h

@@ -19,34 +19,259 @@
 #error "asio.hpp must be included before including this, see asiolink.h as to why"
 #endif
 
-#include <asiolink/io_socket.h>
+#include <log/dummylog.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <unistd.h>             // for some IPC/network system calls
+
+#include <iostream>
+#include <cstddef>
+
+#include <config.h>
+
+#include <asiolink/io_asio_socket.h>
+#include <asiolink/io_endpoint.h>
+#include <asiolink/io_service.h>
+#include <asiolink/tcp_endpoint.h>
 
 namespace asiolink {
 
-/// \brief The \c TCPSocket class is a concrete derived class of
-/// \c IOSocket that represents a TCP socket.
+/// \brief The \c TCPSocket class is a concrete derived class of \c IOAsioSocket
+/// that represents a TCP socket.
 ///
-/// In the current implementation, an object of this class is always
-/// instantiated within the wrapper routines.  Applications are expected to
-/// get access to the object via the abstract base class, \c IOSocket.
-/// This design may be changed when we generalize the wrapper interface.
-class TCPSocket : public IOSocket {
+/// \param C Callback type
+template <typename C>
+class TCPSocket : public IOAsioSocket<C> {
 private:
-    TCPSocket(const TCPSocket& source);
-    TCPSocket& operator=(const TCPSocket& source);
+    /// \brief Class is non-copyable
+    TCPSocket(const TCPSocket&);
+    TCPSocket& operator=(const TCPSocket&);
+
 public:
+    
     /// \brief Constructor from an ASIO TCP socket.
     ///
-    /// \param socket The ASIO representation of the TCP socket.
-    TCPSocket(asio::ip::tcp::socket& socket) : socket_(socket) {}
+    /// \param socket The ASIO representation of the TCP socket.  It
+    /// is assumed that the caller will open and close the socket, so
+    /// these operations are a no-op for that socket.
+    TCPSocket(asio::ip::tcp::socket& socket);
+
+    /// \brief Constructor
+    ///
+    /// Used when the TCPSocket is being asked to manage its own internal
+    /// socket.  It is assumed that open() and close() will not be used.
+    ///
+    /// \param service I/O Service object used to manage the socket.
+    TCPSocket(IOService& service);
+
+    /// \brief Destructor
+    virtual ~TCPSocket();
+
+    virtual int getNative() const { return (socket_.native()); }
+    virtual int getProtocol() const { return (IPPROTO_TCP); }
+
+    /// \brief Open Socket
+    ///
+    /// Opens the TCP socket.  In the model for transport-layer agnostic I/O,
+    /// an "open" operation includes a connection to the remote end (which
+    /// may take time).  This does not happen for TCP, so the method returns
+    /// "false" to indicate that the operation completed synchronously.
+    ///
+    /// \param endpoint Endpoint to which the socket will connect to.
+    /// \param callback Unused.
+    ///
+    /// \return false to indicate that the "operation" completed synchronously.
+    virtual bool open(const IOEndpoint* endpoint, C&);
+
+    /// \brief Send Asynchronously
+    ///
+    /// This corresponds to async_send_to() for TCP sockets and async_send()
+    /// for TCP.  In both cases an endpoint argument is supplied indicating the
+    /// target of the send - this is ignored for TCP.
+    ///
+    /// \param data Data to send
+    /// \param length Length of data to send
+    /// \param endpoint Target of the send
+    /// \param callback Callback object.
+    virtual void asyncSend(const void* data, size_t length,
+        const IOEndpoint* endpoint, C& callback);
+
+    /// \brief Receive Asynchronously
+    ///
+    /// This correstponds to async_receive_from() for TCP sockets and
+    /// async_receive() for TCP.  In both cases, an endpoint argument is
+    /// supplied to receive the source of the communication.  For TCP it will
+    /// be filled in with details of the connection.
+    ///
+    /// \param data Buffer to receive incoming message
+    /// \param length Length of the data buffer
+    /// \param cumulative Amount of data that should already be in the buffer.
+    /// (This is ignored - every UPD receive fills the buffer from the start.)
+    /// \param endpoint Source of the communication
+    /// \param callback Callback object
+    virtual void asyncReceive(void* data, size_t length, size_t cumulative,
+        IOEndpoint* endpoint, C& callback);
+
+    /// \brief Checks if the data received is complete.
+    ///
+    /// As all the data is received in one I/O, so this is, this is effectively
+    /// a no-op (although it does update the amount of data received).
+    ///
+    /// \param data Data buffer containing data to date.  (This is ignored
+    /// for TCP receives.)
+    /// \param length Amount of data received in last asynchronous I/O
+    /// \param cumulative On input, amount of data received before the last
+    /// I/O.  On output, the total amount of data received to date.
+    ///
+    /// \return true if the receive is complete, false if another receive is
+    /// needed.
+    virtual bool receiveComplete(void*, size_t length, size_t& cumulative) {
+        cumulative = length;
+        return (true);
+    }
+
+    /// \brief Cancel I/O On Socket
+    virtual void cancel();
+
+    /// \brief Close socket
+    virtual void close();
 
-    int getNative() const { return (socket_.native()); }
-    int getProtocol() const { return (IPPROTO_TCP); }
 
 private:
-    asio::ip::tcp::socket& socket_;
+    // Two variables to hold the socket - a socket and a pointer to it.  This
+    // handles the case where a socket is passed to the TCPSocket on
+    // construction, or where it is asked to manage its own socket.
+    asio::ip::tcp::socket*      socket_ptr_;    ///< Pointer to own socket
+    asio::ip::tcp::socket&      socket_;        ///< Socket
+    bool                        isopen_;        ///< true when socket is open
 };
 
+// Constructor - caller manages socket
+
+template <typename C>
+TCPSocket<C>::TCPSocket(asio::ip::tcp::socket& socket) :
+    socket_ptr_(NULL), socket_(socket), isopen_(true)
+{
+}
+
+// Constructor - create socket on the fly
+
+template <typename C>
+TCPSocket<C>::TCPSocket(IOService& service) :
+    socket_ptr_(new asio::ip::tcp::socket(service.get_io_service())),
+    socket_(*socket_ptr_), isopen_(false)
+{
+}
+
+// Destructor.  Only delete the socket if we are managing it.
+
+template <typename C>
+TCPSocket<C>::~TCPSocket()
+{
+    delete socket_ptr_;
+}
+
+// Open the socket.  Throws an error on failure
+// TODO: Make the open more resilient
+
+template <typename C> bool
+TCPSocket<C>::open(const IOEndpoint* endpoint, C&) {
+
+    // Ignore opens on already-open socket.  Don't throw a failure because
+    // of uncertainties as to what precedes whan when using asynchronous I/O.
+    // At also allows us a treat a passed-in socket as a self-managed socket.
+
+    if (!isopen_) {
+        if (endpoint->getFamily() == AF_INET) {
+            socket_.open(asio::ip::tcp::v4());
+        }
+        else {
+            socket_.open(asio::ip::tcp::v6());
+        }
+        isopen_ = true;
+
+        // TODO: Complete TCPSocket::open()
+
+    }
+    return (false);
+}
+
+// Send a message.  Should never do this if the socket is not open, so throw
+// an exception if this is the case.
+
+template <typename C> void
+TCPSocket<C>::asyncSend(const void* data, size_t length,
+    const IOEndpoint* endpoint, C& callback)
+{
+    if (isopen_) {
+
+        // Upconvert to a TCPEndpoint.  We need to do this because although
+        // IOEndpoint is the base class of TCPEndpoint and TCPEndpoint, it
+        // doing cont contain a method for getting at the underlying endpoint
+        // type - those are in the derived class and the two classes differ on
+        // return type.
+
+        assert(endpoint->getProtocol() == IPPROTO_TCP);
+        const TCPEndpoint* tcp_endpoint =
+            static_cast<const TCPEndpoint*>(endpoint);
+        std::cerr << "TCPSocket::asyncSend(): sending to " <<
+            tcp_endpoint->getAddress().toText() <<
+            ", port " << tcp_endpoint->getPort() << "\n";
+
+        // TODO: Complete TCPSocket::asyncSend()
+
+    } else {
+        isc_throw(SocketNotOpen,
+            "attempt to send on a TCP socket that is not open");
+    }
+}
+
+// Receive a message. Note that the "cumulative" argument is ignored - every TCP
+// receive is put into the buffer beginning at the start - there is no concept
+// receiving a subsequent part of a message.  Same critera as before concerning
+// the need for the socket to be open.
+
+template <typename C> void
+TCPSocket<C>::asyncReceive(void* data, size_t length, size_t,
+    IOEndpoint* endpoint, C& callback)
+{
+    if (isopen_) {
+
+        // Upconvert the endpoint again.
+        assert(endpoint->getProtocol() == IPPROTO_TCP);
+        const TCPEndpoint* tcp_endpoint =
+            static_cast<const TCPEndpoint*>(endpoint);
+        std::cerr << "TCPSocket::asyncReceive(): receiving from " <<
+            tcp_endpoint->getAddress().toText() <<
+            ", port " << tcp_endpoint->getPort() << "\n";
+
+        // TODO: Complete TCPSocket::asyncReceive()
+
+    } else {
+        isc_throw(SocketNotOpen,
+            "attempt to receive from a TCP socket that is not open");
+    }
+}
+
+// Cancel I/O on the socket.  No-op if the socket is not open.
+template <typename C> void
+TCPSocket<C>::cancel() {
+    if (isopen_) {
+        socket_.cancel();
+    }
+}
+
+// Close the socket down.  Can only do this if the socket is open and we are
+// managing it ourself.
+
+template <typename C> void
+TCPSocket<C>::close() {
+    if (isopen_ && socket_ptr_) {
+        socket_.close();
+        isopen_ = false;
+    }
+}
+
+} // namespace asiolink
 
-}      // namespace asiolink
 #endif // __TCP_SOCKET_H

+ 14 - 8
src/lib/asiolink/tests/Makefile.am

@@ -15,24 +15,30 @@ CLEANFILES = *.gcno *.gcda
 TESTS =
 if HAVE_GTEST
 TESTS += run_unittests
-run_unittests_SOURCES = $(top_srcdir)/src/lib/dns/tests/unittest_util.h
+run_unittests_SOURCES  = run_unittests.cc
+run_unittests_SOURCES += $(top_srcdir)/src/lib/dns/tests/unittest_util.h
 run_unittests_SOURCES += $(top_srcdir)/src/lib/dns/tests/unittest_util.cc
-run_unittests_SOURCES += udp_query_unittest.cc
-run_unittests_SOURCES += ioaddress_unittest.cc
-run_unittests_SOURCES += ioendpoint_unittest.cc
-run_unittests_SOURCES += iosocket_unittest.cc
+run_unittests_SOURCES += io_address_unittest.cc
+run_unittests_SOURCES += io_endpoint_unittest.cc
+run_unittests_SOURCES += io_fetch_unittest.cc
+run_unittests_SOURCES += io_socket_unittest.cc
 run_unittests_SOURCES += io_service_unittest.cc
 run_unittests_SOURCES += interval_timer_unittest.cc
 run_unittests_SOURCES += recursive_query_unittest.cc
-run_unittests_SOURCES += run_unittests.cc
+run_unittests_SOURCES += udp_endpoint_unittest.cc
+run_unittests_SOURCES += udp_socket_unittest.cc
+
 run_unittests_CPPFLAGS = $(AM_CPPFLAGS) $(GTEST_INCLUDES)
-run_unittests_LDFLAGS = $(AM_LDFLAGS) $(GTEST_LDFLAGS) $(LOG4CXX_LDFLAGS)
-run_unittests_LDADD = $(GTEST_LDADD)
+
+run_unittests_LDADD  = $(GTEST_LDADD)
 run_unittests_LDADD += $(SQLITE_LIBS)
 run_unittests_LDADD +=  $(top_builddir)/src/lib/dns/libdns++.la
 run_unittests_LDADD += $(top_builddir)/src/lib/exceptions/libexceptions.la
 run_unittests_LDADD += $(top_builddir)/src/lib/asiolink/libasiolink.la
 run_unittests_LDADD += $(top_builddir)/src/lib/log/liblog.la
+
+run_unittests_LDFLAGS = $(AM_LDFLAGS) $(GTEST_LDFLAGS) $(LOG4CXX_LDFLAGS)
+
 # Note: the ordering matters: -Wno-... must follow -Wextra (defined in
 # B10_CXXFLAGS)
 run_unittests_CXXFLAGS = $(AM_CXXFLAGS)

+ 1 - 0
src/lib/asiolink/tests/interval_timer_unittest.cc

@@ -15,6 +15,7 @@
 #include <config.h>
 #include <gtest/gtest.h>
 
+#include <asio.hpp>
 #include <asiolink/asiolink.h>
 
 #include <boost/date_time/posix_time/posix_time_types.hpp>

+ 7 - 1
src/lib/asiolink/tests/ioaddress_unittest.cc

@@ -15,7 +15,8 @@
 #include <config.h>
 #include <gtest/gtest.h>
 
-#include <asiolink/asiolink.h>
+#include <asiolink/io_error.h>
+#include <asiolink/io_address.h>
 
 using namespace asiolink;
 
@@ -55,3 +56,8 @@ TEST(IOAddressTest, Equality) {
     EXPECT_TRUE(IOAddress("2001:db8::1234") != IOAddress("192.0.2.3"));
     EXPECT_FALSE(IOAddress("2001:db8::1234") == IOAddress("192.0.2.3"));
 }
+
+TEST(IOAddressTest, Family) {
+    EXPECT_EQ(AF_INET, IOAddress("192.0.2.1").getFamily());
+    EXPECT_EQ(AF_INET6, IOAddress("2001:0DB8:0:0::0012").getFamily());
+}

+ 2 - 1
src/lib/asiolink/tests/ioendpoint_unittest.cc

@@ -15,7 +15,8 @@
 #include <config.h>
 #include <gtest/gtest.h>
 
-#include <asiolink/asiolink.h>
+#include <asiolink/io_endpoint.h>
+#include <asiolink/io_error.h>
 
 using namespace asiolink;
 

+ 188 - 0
src/lib/asiolink/tests/io_fetch_unittest.cc

@@ -0,0 +1,188 @@
+// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#include <gtest/gtest.h>
+#include <boost/bind.hpp>
+#include <cstdlib>
+#include <string>
+
+#include <string.h>
+
+#include <asio.hpp>
+
+#include <dns/buffer.h>
+#include <dns/question.h>
+#include <dns/message.h>
+#include <dns/messagerenderer.h>
+#include <dns/opcode.h>
+#include <dns/name.h>
+#include <dns/rcode.h>
+
+#include <asiolink/io_fetch.h>
+#include <asiolink/io_service.h>
+
+using namespace asio;
+using namespace isc::dns;
+using asio::ip::udp;
+
+namespace asiolink {
+
+const asio::ip::address TEST_HOST(asio::ip::address::from_string("127.0.0.1"));
+const uint16_t TEST_PORT(5301);
+// FIXME Shouldn't we send something that is real message?
+const char TEST_DATA[] = "TEST DATA";
+
+/// \brief Test fixture for the asiolink::IOFetch.
+class IOFetchTest : public virtual ::testing::Test, public virtual IOFetch::Callback
+{
+public:
+    IOService       service_;       ///< Service to run the query
+    IOFetch::Result expected_;      ///< Expected result of the callback
+    bool            run_;           ///< Did the callback run already?
+    Question        question_;      ///< What to ask
+    OutputBufferPtr buff_;          ///< Buffer to hold result
+    IOFetch         udp_fetch_;     ///< For UDP query test
+    //IOFetch         tcp_fetch_;     ///< For TCP query test
+
+    // The next member is the buffer iin which the "server" (implemented by the
+    // response handler method) receives the question sent by the fetch object.
+    char            server_buff_[512];  ///< Server buffer
+
+    /// \brief Constructor
+    IOFetchTest() :
+        service_(),
+        expected_(IOFetch::NOTSET),
+        run_(false),
+        question_(Name("example.net"), RRClass::IN(), RRType::A()),
+        buff_(new OutputBuffer(512)),
+        udp_fetch_(IPPROTO_UDP, service_, question_, IOAddress(TEST_HOST),
+            TEST_PORT, buff_, this, 100)
+        // tcp_fetch_(service_, question_, IOAddress(TEST_HOST), TEST_PORT,
+        //    buff_, this, 100, IPPROTO_UDP)
+        { }
+
+    /// \brief Fetch completion callback
+    ///
+    /// This is the callback's operator() method which is called when the fetch
+    /// is complete.  Check that the data received is the wire format of the
+    /// question, then send back an arbitrary response.
+    void operator()(IOFetch::Result result) {
+        EXPECT_EQ(expected_, result);   // Check correct result returned
+        EXPECT_FALSE(run_);             // Check it is run only once
+        run_ = true;                    // Note success
+        service_.stop();                // ... and exit run loop
+    }
+
+    /// \brief Response handler, pretending to be remote DNS server
+    ///
+    /// This checks that the data sent is what we expected to receive, and
+    /// sends back a test answer.
+    void respond(udp::endpoint* remote, udp::socket* socket,
+            asio::error_code ec = asio::error_code(), size_t length = 0) {
+
+        // Construct the data buffer for question we expect to receive.
+        OutputBuffer msgbuf(512);
+        Message msg(Message::RENDER);
+        msg.setQid(0);
+        msg.setOpcode(Opcode::QUERY());
+        msg.setRcode(Rcode::NOERROR());
+        msg.setHeaderFlag(Message::HEADERFLAG_RD);
+        msg.addQuestion(question_);
+        MessageRenderer renderer(msgbuf);
+        msg.toWire(renderer);
+
+        // The QID in the incoming data is random so set it to 0 for the
+        // data comparison check. (It was set to 0 when the buffer containing
+        // the expected data was constructed above.)
+        server_buff_[0] = server_buff_[1] = 0;
+
+        // Check that lengths are identical.
+        EXPECT_EQ(msgbuf.getLength(), length);
+        EXPECT_TRUE(memcmp(msgbuf.getData(), server_buff_, length) == 0);
+
+        // ... and return a message back.
+        socket->send_to(asio::buffer(TEST_DATA, sizeof TEST_DATA), *remote);
+    }
+};
+
+
+/// Test that when we run the query and stop it after it was run,
+/// it returns "stopped" correctly.
+///
+/// That is why stop() is posted to the service_ as well instead
+/// of calling it.
+TEST_F(IOFetchTest, UdpStop) {
+    expected_ = IOFetch::STOPPED;
+
+    // Post the query
+    service_.get_io_service().post(udp_fetch_);
+
+    // Post query_.stop() (yes, the boost::bind thing is just
+    // query_.stop()).
+    service_.get_io_service().post(
+       boost::bind(&IOFetch::stop, udp_fetch_, IOFetch::STOPPED));
+
+    // Run both of them.  run() returns when everything in the I/O service
+    // queue has completed.
+    service_.run();
+    EXPECT_TRUE(run_);
+}
+
+// Test that when we queue the query to service_ and call stop() before it gets
+// executed, it acts sanely as well (eg. has the same result as running stop()
+// after - calls the callback).
+TEST_F(IOFetchTest, UdpPrematureStop) {
+    expected_ = IOFetch::STOPPED;
+
+    // Stop before it is started
+    udp_fetch_.stop();
+    service_.get_io_service().post(udp_fetch_);
+
+    service_.run();
+    EXPECT_TRUE(run_);
+}
+
+// Test that it will timeout when no answer arrives.
+TEST_F(IOFetchTest, UdpTimeout) {
+    expected_ = IOFetch::TIME_OUT;
+
+    service_.get_io_service().post(udp_fetch_);
+    service_.run();
+    EXPECT_TRUE(run_);
+}
+
+// Test that it will succeed when we fake an answer and stores the same data we
+// send.  This is done through a real socket on the loopback address.
+TEST_F(IOFetchTest, UdpReceive) {
+    expected_ = IOFetch::SUCCESS;
+
+    udp::socket socket(service_.get_io_service(), udp::v4());
+    socket.set_option(socket_base::reuse_address(true));
+    socket.bind(udp::endpoint(TEST_HOST, TEST_PORT));
+
+    udp::endpoint remote;
+    socket.async_receive_from(asio::buffer(server_buff_, sizeof(server_buff_)),
+        remote,
+        boost::bind(&IOFetchTest::respond, this, &remote, &socket, _1, _2));
+    service_.get_io_service().post(udp_fetch_);
+    service_.run();
+
+    socket.close();
+
+    EXPECT_TRUE(run_);
+    ASSERT_EQ(sizeof TEST_DATA, buff_->getLength());
+    EXPECT_EQ(0, memcmp(TEST_DATA, buff_->getData(), sizeof TEST_DATA));
+}
+
+} // namespace asiolink

+ 1 - 0
src/lib/asiolink/tests/io_service_unittest.cc

@@ -15,6 +15,7 @@
 #include <config.h>
 #include <gtest/gtest.h>
 
+#include <asio.hpp>
 #include <asiolink/asiolink.h>
 
 using namespace asiolink;

+ 4 - 1
src/lib/asiolink/tests/iosocket_unittest.cc

@@ -15,7 +15,10 @@
 #include <config.h>
 #include <gtest/gtest.h>
 
-#include <asiolink/asiolink.h>
+#include <netinet/in.h>
+
+#include <asio.hpp>
+#include <asiolink/io_socket.h>
 
 using namespace asiolink;
 

+ 6 - 1
src/lib/asiolink/tests/recursive_query_unittest.cc

@@ -41,8 +41,13 @@
 // if we include asio.hpp unless we specify a special compiler option.
 // If we need to test something at the level of underlying ASIO and need
 // their definition, that test should go to asiolink/internal/tests.
-#include <asiolink/asiolink.h>
+#include <asiolink/recursive_query.h>
 #include <asiolink/io_socket.h>
+#include <asiolink/io_service.h>
+#include <asiolink/io_message.h>
+#include <asiolink/io_error.h>
+#include <asiolink/dns_lookup.h>
+#include <asiolink/simple_callback.h>
 
 using isc::UnitTestUtil;
 using namespace std;

+ 55 - 0
src/lib/asiolink/tests/udp_endpoint_unittest.cc

@@ -0,0 +1,55 @@
+// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#include <config.h>
+
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include <asio.hpp>
+#include <asiolink/io_address.h>
+#include <asiolink/udp_endpoint.h>
+
+using namespace asiolink;
+using namespace std;
+
+// This test checks that the endpoint can manage its own internal
+// asio::ip::udp::endpoint object.
+
+TEST(UDPEndpointTest, v4Address) {
+    const string test_address("192.0.2.1");
+    const unsigned short test_port = 5301;
+
+    IOAddress address(test_address);
+    UDPEndpoint endpoint(address, test_port);
+
+    EXPECT_TRUE(address == endpoint.getAddress());
+    EXPECT_EQ(test_port, endpoint.getPort());
+    EXPECT_EQ(IPPROTO_UDP, endpoint.getProtocol());
+    EXPECT_EQ(AF_INET, endpoint.getFamily());
+}
+
+TEST(UDPEndpointTest, v6Address) {
+    const string test_address("2001:db8::1235");
+    const unsigned short test_port = 5302;
+
+    IOAddress address(test_address);
+    UDPEndpoint endpoint(address, test_port);
+
+    EXPECT_TRUE(address == endpoint.getAddress());
+    EXPECT_EQ(test_port, endpoint.getPort());
+    EXPECT_EQ(IPPROTO_UDP, endpoint.getProtocol());
+    EXPECT_EQ(AF_INET6, endpoint.getFamily());
+}

+ 0 - 145
src/lib/asiolink/tests/udp_query_unittest.cc

@@ -1,145 +0,0 @@
-// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
-//
-// Permission to use, copy, modify, and/or distribute this software for any
-// purpose with or without fee is hereby granted, provided that the above
-// copyright notice and this permission notice appear in all copies.
-//
-// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
-// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
-// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
-// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
-// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
-// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
-// PERFORMANCE OF THIS SOFTWARE.
-
-#include <gtest/gtest.h>
-#include <asio.hpp>
-#include <boost/bind.hpp>
-#include <cstdlib>
-
-#include <dns/question.h>
-
-#include <asiolink/udp_query.h>
-
-using namespace asio;
-using namespace isc::dns;
-using asio::ip::udp;
-
-namespace {
-
-const asio::ip::address TEST_HOST(asio::ip::address::from_string("127.0.0.1"));
-const uint16_t TEST_PORT(5301);
-// FIXME Shouldn't we send something that is real message?
-const char TEST_DATA[] = "TEST DATA";
-
-// Test fixture for the asiolink::UDPQuery.
-class UDPQueryTest : public ::testing::Test,
-    public asiolink::UDPQuery::Callback
-{
-    public:
-        // Expected result of the callback
-        asiolink::UDPQuery::Result expected_;
-        // Did the callback run already?
-        bool run_;
-        // We use an io_service to run the query
-        io_service service_;
-        // Something to ask
-        Question question_;
-        // Buffer where the UDPQuery will store response
-        OutputBufferPtr buffer_;
-        // The query we are testing
-        asiolink::UDPQuery query_;
-
-        UDPQueryTest() :
-            run_(false),
-            question_(Name("example.net"), RRClass::IN(), RRType::A()),
-            buffer_(new OutputBuffer(512)),
-            query_(service_, question_, asiolink::IOAddress(TEST_HOST),
-                TEST_PORT, buffer_, this, 100)
-        { }
-
-        // This is the callback's (), so it can be called.
-        void operator()(asiolink::UDPQuery::Result result) {
-            // We check the query returns the correct result
-            EXPECT_EQ(expected_, result);
-            // Check it is called only once
-            EXPECT_FALSE(run_);
-            // And mark the callback was called
-            run_ = true;
-        }
-        // A response handler, pretending to be remote DNS server
-        void respond(udp::endpoint* remote, udp::socket* socket) {
-            // Some data came, just send something back.
-            socket->send_to(asio::buffer(TEST_DATA, sizeof TEST_DATA),
-                *remote);
-            socket->close();
-        }
-};
-
-/*
- * Test that when we run the query and stop it after it was run,
- * it returns "stopped" correctly.
- *
- * That is why stop() is posted to the service_ as well instead
- * of calling it.
- */
-TEST_F(UDPQueryTest, stop) {
-    expected_ = asiolink::UDPQuery::STOPPED;
-    // Post the query
-    service_.post(query_);
-    // Post query_.stop() (yes, the boost::bind thing is just
-    // query_.stop()).
-    service_.post(boost::bind(&asiolink::UDPQuery::stop, query_,
-        asiolink::UDPQuery::STOPPED));
-    // Run both of them
-    service_.run();
-    EXPECT_TRUE(run_);
-}
-
-/*
- * Test that when we queue the query to service_ and call stop()
- * before it gets executed, it acts sanely as well (eg. has the
- * same result as running stop() after - calls the callback).
- */
-TEST_F(UDPQueryTest, prematureStop) {
-    expected_ = asiolink::UDPQuery::STOPPED;
-    // Stop before it is started
-    query_.stop();
-    service_.post(query_);
-    service_.run();
-    EXPECT_TRUE(run_);
-}
-
-/*
- * Test that it will timeout when no answer will arrive.
- */
-TEST_F(UDPQueryTest, timeout) {
-    expected_ = asiolink::UDPQuery::TIME_OUT;
-    service_.post(query_);
-    service_.run();
-    EXPECT_TRUE(run_);
-}
-
-/*
- * Test that it will succeed when we fake an answer and
- * stores the same data we send.
- *
- * This is done through a real socket on loopback address.
- */
-TEST_F(UDPQueryTest, receive) {
-    expected_ = asiolink::UDPQuery::SUCCESS;
-    udp::socket socket(service_, udp::v4());
-    socket.set_option(socket_base::reuse_address(true));
-    socket.bind(udp::endpoint(TEST_HOST, TEST_PORT));
-    char inbuff[512];
-    udp::endpoint remote;
-    socket.async_receive_from(asio::buffer(inbuff, 512), remote, boost::bind(
-        &UDPQueryTest::respond, this, &remote, &socket));
-    service_.post(query_);
-    service_.run();
-    EXPECT_TRUE(run_);
-    ASSERT_EQ(sizeof TEST_DATA, buffer_->getLength());
-    EXPECT_EQ(0, memcmp(TEST_DATA, buffer_->getData(), sizeof TEST_DATA));
-}
-
-}

+ 287 - 0
src/lib/asiolink/tests/udp_socket_unittest.cc

@@ -0,0 +1,287 @@
+// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+
+/// \brief Test of UDPSocket
+///
+/// Tests the fuctionality of a UDPSocket by working through an open-send-
+/// receive-close sequence and checking that the asynchronous notifications
+/// work.
+
+#include <string>
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#include <algorithm>
+#include <cstdlib>
+#include <cstddef>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
+
+#include <asio.hpp>
+
+#include <asiolink/io_service.h>
+#include <asiolink/udp_endpoint.h>
+#include <asiolink/udp_socket.h>
+
+using namespace asio;
+using namespace asiolink;
+using namespace std;
+
+namespace {
+
+const char SERVER_ADDRESS[] = "127.0.0.1";
+const unsigned short SERVER_PORT = 5301;
+
+// TODO: Shouldn't we send something that is real message?
+const char OUTBOUND_DATA[] = "Data sent from client to server";
+const char INBOUND_DATA[] = "Returned data from server to client";
+}
+
+///
+/// An instance of this object is passed to the asynchronous I/O functions
+/// and the operator() method is called when when an asynchronous I/O
+/// completes.  The arguments to the completion callback are stored for later
+/// retrieval.
+class UDPCallback {
+public:
+
+    struct PrivateData {
+        PrivateData() :
+            error_code_(), length_(0), called_(false), name_("")
+        {}
+
+        asio::error_code    error_code_;    ///< Completion error code
+        size_t              length_;        ///< Number of bytes transferred
+        bool                called_;        ///< Set true when callback called
+        std::string         name_;          ///< Which of the objects this is
+    };
+
+    /// \brief Constructor
+    ///
+    /// Constructs the object.  It also creates the data member pointed to by
+    /// a shared pointer.  When used as a callback object, this is copied as it
+    /// is passed into the asynchronous function.  This means that there are two
+    /// objects and inspecting the one we passed in does not tell us anything.
+    ///
+    /// Therefore we use a boost::shared_ptr.  When the object is copied, the
+    /// shared pointer is copied, which leaves both objects pointing to the same
+    /// data.
+    ///
+    /// \param which Which of the two callback objects this is
+    UDPCallback(std::string which) : ptr_(new PrivateData())
+    {
+        setName(which);
+    }
+
+    /// \brief Destructor
+    ///
+    /// No code needed, destroying the shared pointer destroys the private data.
+    virtual ~UDPCallback()
+    {}
+
+    /// \brief Callback Function
+    ///
+    /// Called when an asynchronous I/O completes, this stores the
+    /// completion error code and the number of bytes transferred.
+    ///
+    /// \param ec I/O completion error code passed to callback function.
+    /// \param length Number of bytes transferred
+    virtual void operator()(asio::error_code ec, size_t length = 0) {
+        ptr_->error_code_ = ec;
+        setLength(length);
+        setCalled(true);
+    }
+
+    /// \brief Get I/O completion error code
+    int getCode() {
+        return (ptr_->error_code_.value());
+    }
+
+    /// \brief Set I/O completion code
+    ///
+    /// \param code New value of completion code
+    void setCode(int code) {
+        ptr_->error_code_ = asio::error_code(code, asio::error_code().category());
+    }
+
+    /// \brief Get number of bytes transferred in I/O
+    size_t getLength() {
+        return (ptr_->length_);
+    }
+
+    /// \brief Set number of bytes transferred in I/O
+    ///
+    /// \param length New value of length parameter
+    void setLength(size_t length) {
+        ptr_->length_ = length;
+    }
+
+    /// \brief Get flag to say when callback was called
+    bool getCalled() {
+        return (ptr_->called_);
+    }
+
+    /// \brief Set flag to say when callback was called
+    ///
+    /// \param called New value of called parameter
+    void setCalled(bool called) {
+        ptr_->called_ = called;
+    }
+
+    /// \brief Return instance of callback name
+    std::string getName() {
+        return (ptr_->name_);
+    }
+
+    /// \brief Set callback name
+    ///
+    /// \param name New value of the callback name
+    void setName(const std::string& name) {
+        ptr_->name_ = name;
+    }
+
+private:
+    boost::shared_ptr<PrivateData>  ptr_;   ///< Pointer to private data
+};
+
+// TODO: Need to add a test to check the cancel() method
+
+// Tests the operation of a UDPSocket by opening it, sending an asynchronous
+// message to a server, receiving an asynchronous message from the server and
+// closing.
+TEST(UDPSocket, SequenceTest) {
+
+    // Common objects.
+    IOService   service;                    // Service object for async control
+
+    // Server
+    IOAddress   server_address(SERVER_ADDRESS); // Address of target server
+    UDPCallback server_cb("Server");        // Server callback
+    UDPEndpoint server_endpoint(            // Endpoint describing server
+        server_address, SERVER_PORT);
+    UDPEndpoint server_remote_endpoint;     // Address where server received message from
+
+    // The client - the UDPSocket being tested
+    UDPSocket<UDPCallback>  client(service);// Socket under test
+    UDPCallback client_cb("Client");        // Async I/O callback function
+    UDPEndpoint client_remote_endpoint;     // Where client receives message from
+    size_t      client_cumulative = 0;      // Cumulative data received
+
+    // The server - with which the client communicates.  For convenience, we
+    // use the same io_service, and use the endpoint object created for
+    // the client to send to as the endpoint object in the constructor.
+    asio::ip::udp::socket server(service.get_io_service(),
+        server_endpoint.getASIOEndpoint());
+    server.set_option(socket_base::reuse_address(true));
+
+    // Assertion to ensure that the server buffer is large enough
+    char data[UDPSocket<UDPCallback>::MAX_SIZE];
+    ASSERT_GT(sizeof(data), sizeof(OUTBOUND_DATA));
+
+    // Open the client socket - the operation should be synchronous
+    EXPECT_FALSE(client.open(&server_endpoint, client_cb));
+
+    // Issue read on the server.  Completion callback should not have run.
+    server_cb.setCalled(false);
+    server_cb.setCode(42); // Answer to Life, the Universe and Everything!
+    server.async_receive_from(buffer(data, sizeof(data)),
+        server_remote_endpoint.getASIOEndpoint(), server_cb);
+    EXPECT_FALSE(server_cb.getCalled());
+
+    // Write something to the server using the client - the callback should not
+    // be called until we call the io_service.run() method.
+    client_cb.setCalled(false);
+    client_cb.setCode(7);  // Arbitrary number
+    client.asyncSend(OUTBOUND_DATA, sizeof(OUTBOUND_DATA), &server_endpoint, client_cb);
+    EXPECT_FALSE(client_cb.getCalled());
+
+    // Execute the two callbacks.
+    service.run_one();
+    service.run_one();
+
+    EXPECT_TRUE(client_cb.getCalled());
+    EXPECT_EQ(0, client_cb.getCode());
+    EXPECT_EQ(sizeof(OUTBOUND_DATA), client_cb.getLength());
+
+    EXPECT_TRUE(server_cb.getCalled());
+    EXPECT_EQ(0, server_cb.getCode());
+    EXPECT_EQ(sizeof(OUTBOUND_DATA), server_cb.getLength());
+
+    EXPECT_TRUE(equal(&data[0], &data[server_cb.getLength() - 1], OUTBOUND_DATA));
+
+    // Now return data from the server to the client.  Issue the read on the
+    // client.
+    client_cb.setLength(12345);             // Arbitrary number
+    client_cb.setCalled(false);
+    client_cb.setCode(32);                  // Arbitrary number
+    client.asyncReceive(data, sizeof(data), client_cumulative,
+        &client_remote_endpoint, client_cb);
+
+    // Issue the write on the server side to the source of the data it received.
+    server_cb.setLength(22345);             // Arbitrary number
+    server_cb.setCalled(false);
+    server_cb.setCode(232);                 // Arbitrary number
+    server.async_send_to(buffer(INBOUND_DATA, sizeof(INBOUND_DATA)),
+        server_remote_endpoint.getASIOEndpoint(), server_cb);
+
+    // Expect two callbacks to run
+    service.get_io_service().poll();
+    //service.run_one();
+
+    EXPECT_TRUE(client_cb.getCalled());
+    EXPECT_EQ(0, client_cb.getCode());
+    EXPECT_EQ(sizeof(INBOUND_DATA), client_cb.getLength());
+
+    EXPECT_TRUE(server_cb.getCalled());
+    EXPECT_EQ(0, server_cb.getCode());
+    EXPECT_EQ(sizeof(INBOUND_DATA), server_cb.getLength());
+
+    EXPECT_TRUE(equal(&data[0], &data[server_cb.getLength() - 1], INBOUND_DATA));
+
+    // Check that the address/port received by the client corresponds to the
+    // address and port the server is listening on.
+    EXPECT_TRUE(server_address == client_remote_endpoint.getAddress());
+    EXPECT_EQ(SERVER_PORT, client_remote_endpoint.getPort());
+
+    // Finally, check that the receive received a complete buffer's worth of data.
+    EXPECT_TRUE(client.receiveComplete(&data[0], client_cb.getLength(),
+        client_cumulative));
+    EXPECT_EQ(client_cb.getLength(), client_cumulative);
+
+    // Close client and server.
+    EXPECT_NO_THROW(client.close());
+    EXPECT_NO_THROW(server.close());
+}

+ 21 - 8
src/lib/asiolink/udp_endpoint.h

@@ -33,6 +33,16 @@ public:
     /// \name Constructors and Destructor.
     ///
     //@{
+
+    /// \brief Default Constructor
+    ///
+    /// Creates an internal endpoint.  This is expected to be set by some
+    /// external call.
+    UDPEndpoint() :
+        asio_endpoint_placeholder_(new asio::ip::udp::endpoint()),
+        asio_endpoint_(*asio_endpoint_placeholder_)
+    {}
+
     /// \brief Constructor from a pair of address and port.
     ///
     /// \param address The IP address of the endpoint.
@@ -50,27 +60,27 @@ public:
     /// corresponding ASIO class, \c udp::endpoint.
     ///
     /// \param asio_endpoint The ASIO representation of the UDP endpoint.
-    UDPEndpoint(const asio::ip::udp::endpoint& asio_endpoint) :
+    UDPEndpoint(asio::ip::udp::endpoint& asio_endpoint) :
         asio_endpoint_placeholder_(NULL), asio_endpoint_(asio_endpoint)
     {}
 
     /// \brief The destructor.
-    ~UDPEndpoint() { delete asio_endpoint_placeholder_; }
+    virtual ~UDPEndpoint() { delete asio_endpoint_placeholder_; }
     //@}
 
-    inline IOAddress getAddress() const {
+    virtual IOAddress getAddress() const {
         return (asio_endpoint_.address());
     }
 
-    inline uint16_t getPort() const {
+    virtual uint16_t getPort() const {
         return (asio_endpoint_.port());
     }
 
-    inline short getProtocol() const {
+    virtual short getProtocol() const {
         return (asio_endpoint_.protocol().protocol());
     }
 
-    inline short getFamily() const {
+    virtual short getFamily() const {
         return (asio_endpoint_.protocol().family());
     }
 
@@ -79,10 +89,13 @@ public:
     inline const asio::ip::udp::endpoint& getASIOEndpoint() const {
         return (asio_endpoint_);
     }
+    inline asio::ip::udp::endpoint& getASIOEndpoint() {
+        return (asio_endpoint_);
+    }
 
 private:
-    const asio::ip::udp::endpoint* asio_endpoint_placeholder_;
-    const asio::ip::udp::endpoint& asio_endpoint_;
+    asio::ip::udp::endpoint* asio_endpoint_placeholder_;
+    asio::ip::udp::endpoint& asio_endpoint_;
 };
 
 }      // namespace asiolink

+ 0 - 189
src/lib/asiolink/udp_query.cc

@@ -1,189 +0,0 @@
-// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
-//
-// Permission to use, copy, modify, and/or distribute this software for any
-// purpose with or without fee is hereby granted, provided that the above
-// copyright notice and this permission notice appear in all copies.
-//
-// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
-// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
-// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
-// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
-// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
-// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
-// PERFORMANCE OF THIS SOFTWARE.
-
-#include <config.h>
-
-// unistd is needed for asio.hpp with SunStudio
-#include <unistd.h>
-
-#include <asio.hpp>
-
-#include <boost/bind.hpp>
-#include <boost/shared_array.hpp>
-
-#include <dns/messagerenderer.h>
-#include <dns/opcode.h>
-#include <dns/rcode.h>
-
-#include <log/dummylog.h>
-
-#include <asio.hpp>
-
-#include <asiolink.h>
-
-#include <coroutine.h>
-#include <asiolink/udp_endpoint.h>
-
-#include <asiolink/udp_query.h>
-
-using namespace asio;
-using asio::ip::udp;
-using asio::ip::tcp;
-using isc::log::dlog;
-
-using namespace std;
-using namespace isc::dns;
-
-namespace asiolink {
-
-
-// Private UDPQuery data (see internal/udpdns.h for reasons)
-struct UDPQuery::PrivateData {
-    // Socket we send query to and expect reply from there
-    udp::socket socket;
-    // Where was the query sent
-    udp::endpoint remote;
-    // What we ask the server
-    Question question;
-    // We will store the answer here
-    OutputBufferPtr buffer;
-    OutputBufferPtr msgbuf;
-    // Temporary buffer for answer
-    boost::shared_array<char> data;
-    // This will be called when the data arrive or timeouts
-    Callback* callback;
-    // Did we already stop operating (data arrived, we timed out, someone
-    // called stop). This can be so when we are cleaning up/there are
-    // still pointers to us.
-    bool stopped;
-    // Timer to measure timeouts.
-    deadline_timer timer;
-    // How many milliseconds are we willing to wait for answer?
-    int timeout;
-
-    PrivateData(io_service& service,
-        const udp::socket::protocol_type& protocol, const Question &q,
-        OutputBufferPtr b, Callback *c) :
-        socket(service, protocol),
-        question(q),
-        buffer(b),
-        msgbuf(new OutputBuffer(512)),
-        callback(c),
-        stopped(false),
-        timer(service)
-    { }
-};
-
-/// The following functions implement the \c UDPQuery class.
-///
-/// The constructor
-UDPQuery::UDPQuery(io_service& io_service,
-                   const Question& q, const IOAddress& addr, uint16_t port,
-                   OutputBufferPtr buffer, Callback *callback, int timeout) :
-    data_(new PrivateData(io_service,
-        addr.getFamily() == AF_INET ? udp::v4() : udp::v6(), q, buffer,
-        callback))
-{
-    data_->remote = UDPEndpoint(addr, port).getASIOEndpoint();
-    data_->timeout = timeout;
-}
-
-/// The function operator is implemented with the "stackless coroutine"
-/// pattern; see internal/coroutine.h for details.
-void
-UDPQuery::operator()(error_code ec, size_t length) {
-    if (ec || data_->stopped) {
-        return;
-    }
-
-    CORO_REENTER (this) {
-        /// Generate the upstream query and render it to wire format
-        /// This is done in a different scope to allow inline variable
-        /// declarations.
-        {
-            Message msg(Message::RENDER);
-            
-            // XXX: replace with boost::random or some other suitable PRNG
-            msg.setQid(0);
-            msg.setOpcode(Opcode::QUERY());
-            msg.setRcode(Rcode::NOERROR());
-            msg.setHeaderFlag(Message::HEADERFLAG_RD);
-            msg.addQuestion(data_->question);
-            MessageRenderer renderer(*data_->msgbuf);
-            msg.toWire(renderer);
-            dlog("Sending " + msg.toText() + " to " +
-                data_->remote.address().to_string());
-        }
-
-
-        // If we timeout, we stop, which will shutdown everything and
-        // cancel all other attempts to run inside the coroutine
-        if (data_->timeout != -1) {
-            data_->timer.expires_from_now(boost::posix_time::milliseconds(
-                data_->timeout));
-            data_->timer.async_wait(boost::bind(&UDPQuery::stop, *this,
-                TIME_OUT));
-        }
-
-        // Begin an asynchronous send, and then yield.  When the
-        // send completes, we will resume immediately after this point.
-        CORO_YIELD data_->socket.async_send_to(buffer(data_->msgbuf->getData(),
-            data_->msgbuf->getLength()), data_->remote, *this);
-
-        /// Allocate space for the response.  (XXX: This should be
-        /// optimized by maintaining a free list of pre-allocated blocks)
-        data_->data.reset(new char[MAX_LENGTH]);
-
-        /// Begin an asynchronous receive, and yield.  When the receive
-        /// completes, we will resume immediately after this point.
-        CORO_YIELD data_->socket.async_receive_from(buffer(data_->data.get(),
-            MAX_LENGTH), data_->remote, *this);
-        // The message is not rendered yet, so we can't print it easilly
-        dlog("Received response from " + data_->remote.address().to_string());
-
-        /// Copy the answer into the response buffer.  (XXX: If the
-        /// OutputBuffer object were made to meet the requirements of
-        /// a MutableBufferSequence, then it could be written to directly
-        /// by async_recieve_from() and this additional copy step would
-        /// be unnecessary.)
-        data_->buffer->writeData(data_->data.get(), length);
-
-        /// We are done
-        stop(SUCCESS);
-    }
-}
-
-void
-UDPQuery::stop(Result result) {
-    if (!data_->stopped) {
-        switch (result) {
-            case TIME_OUT:
-                dlog("Query timed out");
-                break;
-            case STOPPED:
-                dlog("Query stopped");
-                break;
-            default:;
-        }
-        data_->stopped = true;
-        data_->socket.cancel();
-        data_->socket.close();
-        data_->timer.cancel();
-        if (data_->callback) {
-            (*data_->callback)(result);
-        }
-    }
-}
-
-} // namespace asiolink

+ 17 - 9
src/lib/asiolink/udp_server.cc

@@ -12,25 +12,24 @@
 // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
 // PERFORMANCE OF THIS SOFTWARE.
 
-#include <config.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <unistd.h>             // for some IPC/network system calls
 
 #include <boost/shared_array.hpp>
 
-// unistd is needed for asio.hpp with SunStudio
-#include <unistd.h>
-
-#include <asio.hpp>
+#include <config.h>
 
 #include <log/dummylog.h>
 
+#include <asio.hpp>
+#include <asiolink/dummy_io_cb.h>
 #include <asiolink/udp_endpoint.h>
-#include <asiolink/udp_socket.h>
-
 #include <asiolink/udp_server.h>
+#include <asiolink/udp_socket.h>
 
 using namespace asio;
 using asio::ip::udp;
-using asio::ip::tcp;
 using isc::log::dlog;
 
 using namespace std;
@@ -206,7 +205,16 @@ UDPServer::operator()(error_code ec, size_t length) {
         // that would quickly generate an IOMessage object without
         // all these calls to "new".)
         data_->peer_.reset(new UDPEndpoint(*data_->sender_));
-        data_->iosock_.reset(new UDPSocket(*data_->socket_));
+
+        // The UDP socket class has been extended with asynchronous functions
+        // and takes as a template parameter a completion callback class.  As
+        // UDPServer does not use these extended functions (only those defined
+        // in the IOSocket base class) - but needs a UDPSocket to get hold of
+        // the underlying Boost UDP socket - DummyIOCallback is used.  This
+        // provides the appropriate operator() but is otherwise functionless.
+        data_->iosock_.reset(
+            new UDPSocket<DummyIOCallback>(*data_->socket_));
+
         data_->io_message_.reset(new IOMessage(data_->data_.get(),
             data_->bytes_, *data_->iosock_, *data_->peer_));
 

+ 239 - 11
src/lib/asiolink/udp_socket.h

@@ -19,30 +19,258 @@
 #error "asio.hpp must be included before including this, see asiolink.h as to why"
 #endif
 
-#include <asiolink/io_socket.h>
+#include <log/dummylog.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <unistd.h>             // for some IPC/network system calls
+
+#include <cstddef>
+
+#include <config.h>
+
+
+#include <asiolink/io_asio_socket.h>
+#include <asiolink/io_endpoint.h>
+#include <asiolink/io_service.h>
+#include <asiolink/udp_endpoint.h>
 
 namespace asiolink {
 
-/// \brief The \c UDPSocket class is a concrete derived class of
-/// \c IOSocket that represents a UDP socket.
+/// \brief The \c UDPSocket class is a concrete derived class of \c IOAsioSocket
+/// that represents a UDP socket.
 ///
-/// Other notes about \c TCPSocket applies to this class, too.
-class UDPSocket : public IOSocket {
+/// \param C Callback type
+template <typename C>
+class UDPSocket : public IOAsioSocket<C> {
 private:
-    UDPSocket(const UDPSocket& source);
-    UDPSocket& operator=(const UDPSocket& source);
+    /// \brief Class is non-copyable
+    UDPSocket(const UDPSocket&);
+    UDPSocket& operator=(const UDPSocket&);
+
 public:
+    enum {
+        MAX_SIZE = 4096         // Send and receive size
+    };
+    
     /// \brief Constructor from an ASIO UDP socket.
     ///
-    /// \param socket The ASIO representation of the UDP socket.
-    UDPSocket(asio::ip::udp::socket& socket) : socket_(socket) {}
+    /// \param socket The ASIO representation of the UDP socket.  It
+    /// is assumed that the caller will open and close the socket, so
+    /// these operations are a no-op for that socket.
+    UDPSocket(asio::ip::udp::socket& socket);
+
+    /// \brief Constructor
+    ///
+    /// Used when the UDPSocket is being asked to manage its own internal
+    /// socket.  It is assumed that open() and close() will not be used.
+    ///
+    /// \param service I/O Service object used to manage the socket.
+    UDPSocket(IOService& service);
+
+    /// \brief Destructor
+    virtual ~UDPSocket();
 
     virtual int getNative() const { return (socket_.native()); }
     virtual int getProtocol() const { return (IPPROTO_UDP); }
 
+    /// \brief Open Socket
+    ///
+    /// Opens the UDP socket.  In the model for transport-layer agnostic I/O,
+    /// an "open" operation includes a connection to the remote end (which
+    /// may take time).  This does not happen for UDP, so the method returns
+    /// "false" to indicate that the operation completed synchronously.
+    ///
+    /// \param endpoint Endpoint to which the socket will connect to.
+    /// \param callback Unused.
+    ///
+    /// \return false to indicate that the "operation" completed synchronously.
+    virtual bool open(const IOEndpoint* endpoint, C&);
+
+    /// \brief Send Asynchronously
+    ///
+    /// This corresponds to async_send_to() for UDP sockets and async_send()
+    /// for TCP.  In both cases an endpoint argument is supplied indicating the
+    /// target of the send - this is ignored for TCP.
+    ///
+    /// \param data Data to send
+    /// \param length Length of data to send
+    /// \param endpoint Target of the send
+    /// \param callback Callback object.
+    virtual void asyncSend(const void* data, size_t length,
+        const IOEndpoint* endpoint, C& callback);
+
+    /// \brief Receive Asynchronously
+    ///
+    /// This correstponds to async_receive_from() for UDP sockets and
+    /// async_receive() for TCP.  In both cases, an endpoint argument is
+    /// supplied to receive the source of the communication.  For TCP it will
+    /// be filled in with details of the connection.
+    ///
+    /// \param data Buffer to receive incoming message
+    /// \param length Length of the data buffer
+    /// \param cumulative Amount of data that should already be in the buffer.
+    /// (This is ignored - every UPD receive fills the buffer from the start.)
+    /// \param endpoint Source of the communication
+    /// \param callback Callback object
+    virtual void asyncReceive(void* data, size_t length, size_t cumulative,
+        IOEndpoint* endpoint, C& callback);
+
+    /// \brief Checks if the data received is complete.
+    ///
+    /// As all the data is received in one I/O, so this is, this is effectively
+    /// a no-op (although it does update the amount of data received).
+    ///
+    /// \param data Data buffer containing data to date.  (This is ignored
+    /// for UDP receives.)
+    /// \param length Amount of data received in last asynchronous I/O
+    /// \param cumulative On input, amount of data received before the last
+    /// I/O.  On output, the total amount of data received to date.
+    ///
+    /// \return true if the receive is complete, false if another receive is
+    /// needed.
+    virtual bool receiveComplete(void*, size_t length, size_t& cumulative) {
+        cumulative = length;
+        return (true);
+    }
+
+    /// \brief Cancel I/O On Socket
+    virtual void cancel();
+
+    /// \brief Close socket
+    virtual void close();
+
+
 private:
-    asio::ip::udp::socket& socket_;
+    // Two variables to hold the socket - a socket and a pointer to it.  This
+    // handles the case where a socket is passed to the UDPSocket on
+    // construction, or where it is asked to manage its own socket.
+    asio::ip::udp::socket*      socket_ptr_;    ///< Pointer to own socket
+    asio::ip::udp::socket&      socket_;        ///< Socket
+    bool                        isopen_;        ///< true when socket is open
 };
 
-}      // namespace asiolink
+// Constructor - caller manages socket
+
+template <typename C>
+UDPSocket<C>::UDPSocket(asio::ip::udp::socket& socket) :
+    socket_ptr_(NULL), socket_(socket), isopen_(true)
+{
+}
+
+// Constructor - create socket on the fly
+
+template <typename C>
+UDPSocket<C>::UDPSocket(IOService& service) :
+    socket_ptr_(new asio::ip::udp::socket(service.get_io_service())),
+    socket_(*socket_ptr_), isopen_(false)
+{
+}
+
+// Destructor.  Only delete the socket if we are managing it.
+
+template <typename C>
+UDPSocket<C>::~UDPSocket()
+{
+    delete socket_ptr_;
+}
+
+// Open the socket.  Throws an error on failure
+// TODO: Make the open more resilient
+
+template <typename C> bool
+UDPSocket<C>::open(const IOEndpoint* endpoint, C&) {
+
+    // Ignore opens on already-open socket.  Don't throw a failure because
+    // of uncertainties as to what precedes whan when using asynchronous I/O.
+    // At also allows us a treat a passed-in socket as a self-managed socket.
+
+    if (!isopen_) {
+        if (endpoint->getFamily() == AF_INET) {
+            socket_.open(asio::ip::udp::v4());
+        }
+        else {
+            socket_.open(asio::ip::udp::v6());
+        }
+        isopen_ = true;
+
+        // Ensure it can send and receive 4K buffers.
+        socket_.set_option(asio::socket_base::send_buffer_size(MAX_SIZE));
+        socket_.set_option(asio::socket_base::receive_buffer_size(MAX_SIZE));
+    ;
+        // Allow reuse of an existing port/address
+        socket_.set_option(asio::socket_base::reuse_address(true));
+    }
+    return (false);
+}
+
+// Send a message.  Should never do this if the socket is not open, so throw
+// an exception if this is the case.
+
+template <typename C> void
+UDPSocket<C>::asyncSend(const void* data, size_t length,
+    const IOEndpoint* endpoint, C& callback)
+{
+    if (isopen_) {
+
+        // Upconvert to a UDPEndpoint.  We need to do this because although
+        // IOEndpoint is the base class of UDPEndpoint and TCPEndpoint, it
+        // doing cont contain a method for getting at the underlying endpoint
+        // type - those are in the derived class and the two classes differ on
+        // return type.
+
+        assert(endpoint->getProtocol() == IPPROTO_UDP);
+        const UDPEndpoint* udp_endpoint =
+            static_cast<const UDPEndpoint*>(endpoint);
+        socket_.async_send_to(asio::buffer(data, length),
+            udp_endpoint->getASIOEndpoint(), callback);
+    } else {
+        isc_throw(SocketNotOpen,
+            "attempt to send on a UDP socket that is not open");
+    }
+}
+
+// Receive a message. Note that the "cumulative" argument is ignored - every UDP
+// receive is put into the buffer beginning at the start - there is no concept
+// receiving a subsequent part of a message.  Same critera as before concerning
+// the need for the socket to be open.
+
+template <typename C> void
+UDPSocket<C>::asyncReceive(void* data, size_t length, size_t,
+    IOEndpoint* endpoint, C& callback)
+{
+    if (isopen_) {
+
+        // Upconvert the endpoint again.
+        assert(endpoint->getProtocol() == IPPROTO_UDP);
+        UDPEndpoint* udp_endpoint = static_cast<UDPEndpoint*>(endpoint);
+
+        socket_.async_receive_from(asio::buffer(data, length),
+            udp_endpoint->getASIOEndpoint(), callback);
+    } else {
+        isc_throw(SocketNotOpen,
+            "attempt to receive from a UDP socket that is not open");
+    }
+}
+
+// Cancel I/O on the socket.  No-op if the socket is not open.
+template <typename C> void
+UDPSocket<C>::cancel() {
+    if (isopen_) {
+        socket_.cancel();
+    }
+}
+
+// Close the socket down.  Can only do this if the socket is open and we are
+// managing it ourself.
+
+template <typename C> void
+UDPSocket<C>::close() {
+    if (isopen_ && socket_ptr_) {
+        socket_.close();
+        isopen_ = false;
+    }
+}
+
+} // namespace asiolink
+
 #endif // __UDP_SOCKET_H