Browse Source

More refactoring; cut down number of "new" allocations needed per packet

git-svn-id: svn://bind10.isc.org/svn/bind10/branches/trac327@3083 e5f2f494-b856-4b98-b285-d166d9295462
Evan Hunt 14 years ago
parent
commit
0089203f35

+ 2 - 1
src/lib/asiolink/asiolink.cc

@@ -88,7 +88,8 @@ IOEndpoint::create(const int protocol, const IOAddress& address,
 }
 
 IOMessage::IOMessage(const void* data, const size_t data_size,
-                     IOSocket& io_socket, const IOEndpoint& remote_endpoint) :
+                     const IOSocket& io_socket,
+                     const IOEndpoint& remote_endpoint) :
     data_(data), data_size_(data_size), io_socket_(io_socket),
     remote_endpoint_(remote_endpoint)
 {}

+ 3 - 3
src/lib/asiolink/asiolink.h

@@ -364,8 +364,8 @@ public:
     /// \param io_socket The socket over which the data is given.
     /// \param remote_endpoint The other endpoint of the socket, that is,
     /// the sender of the message.
-    IOMessage(const void* data, const size_t data_size, IOSocket& io_socket,
-              const IOEndpoint& remote_endpoint);
+    IOMessage(const void* data, const size_t data_size,
+              const IOSocket& io_socket, const IOEndpoint& remote_endpoint);
     //@}
 
     /// \brief Returns a pointer to the received data.
@@ -382,7 +382,7 @@ public:
 private:
     const void* data_;
     const size_t data_size_;
-    IOSocket& io_socket_;
+    const IOSocket& io_socket_;
     const IOEndpoint& remote_endpoint_;
 };
 

+ 9 - 14
src/lib/asiolink/internal/tcpdns.h

@@ -88,25 +88,20 @@ private:
     static const size_t TCP_MESSAGE_LENGTHSIZE = 2;
 
     // Class member variables which are dynamic, and changes to which
-    // are expected to be accessible from both sides of a coroutine fork,
-    // should be declared here as shared pointers and allocated in the
-    // constructor or in the coroutine itself.  (Forking a new coroutine
-    // causes class members to be copied, not referenced, so without using
-    // this approach, when a variable is changed by a "parent" coroutine
-    // the change might not be visible to the "child".  Using shared_ptr<>
-    // ensures that when all coroutines using this data are deleted, the
-    // memory will be freed.)
+    // need to accessible from both sides of a coroutine fork or from
+    // outside of the coroutine (i.e., from an asynchronous I/O call),
+    // should be declared here as pointers and allocated in the
+    // constructor or in the coroutine.
     boost::shared_ptr<asio::ip::tcp::acceptor> acceptor_;
     boost::shared_ptr<asio::ip::tcp::socket> socket_;
-    boost::shared_ptr<isc::dns::OutputBuffer> response_;
-    boost::shared_ptr<isc::dns::OutputBuffer> lenbuf_;
     boost::shared_ptr<isc::dns::MessageRenderer> renderer_;
-    boost::shared_ptr<isc::dns::Message> dns_message_;
-    boost::shared_ptr<IOMessage> io_message_;
-    boost::shared_ptr<TCPSocket> io_socket_;
-    boost::shared_ptr<TCPEndpoint> io_endpoint_;
     boost::shared_ptr<char> data_;
 
+    // State information that is entirely internal to a given instance
+    // of the coroutine can be declared here.
+    isc::dns::OutputBuffer respbuf_;
+    isc::dns::OutputBuffer lenbuf_;
+
     // Callbacks
     const CheckinProvider* checkin_callback_;
     const DNSProvider* dns_callback_;

+ 10 - 10
src/lib/asiolink/internal/udpdns.h

@@ -57,6 +57,7 @@ private:
     const asio::ip::udp::endpoint& asio_endpoint_;
 };
 
+class UDPBuffers;
 class UDPSocket : public IOSocket {
 private:
     UDPSocket(const UDPSocket& source);
@@ -83,20 +84,19 @@ public:
 private:
     enum { MAX_LENGTH = 4096 };
 
-    // As mentioned in the comments to TCPServer, class member variables
-    // which are dynamic, and changes to which are expected to be
-    // accessible from both sides of a coroutine fork, should be
-    // declared here as shared pointers and allocated in the
+    // Class member variables which are dynamic, and changes to which
+    // need to accessible from both sides of a coroutine fork or from
+    // outside of the coroutine (i.e., from an asynchronous I/O call),
+    // should be declared here as pointers and allocated in the
     // constructor or in the coroutine.
     boost::shared_ptr<asio::ip::udp::socket> socket_;
+    boost::shared_ptr<char> data_;
     boost::shared_ptr<asio::ip::udp::endpoint> sender_;
-    boost::shared_ptr<isc::dns::OutputBuffer> response_;
     boost::shared_ptr<isc::dns::MessageRenderer> renderer_;
-    boost::shared_ptr<isc::dns::Message> dns_message_;
-    boost::shared_ptr<UDPEndpoint> io_endpoint_;
-    boost::shared_ptr<IOMessage> io_message_;
-    boost::shared_ptr<UDPSocket> io_socket_;
-    boost::shared_ptr<char> data_;
+
+    // State information that is entirely internal to a given instance
+    // of the coroutine can be declared here.
+    isc::dns::OutputBuffer respbuf_;
     size_t bytes_;
 
     // Callbacks

+ 33 - 28
src/lib/asiolink/tcpdns.cc

@@ -23,6 +23,7 @@
 #include <asio.hpp>
 #include <asio/ip/address.hpp>
 
+#include <boost/array.hpp>
 #include <boost/lexical_cast.hpp>
 #include <boost/shared_ptr.hpp>
 
@@ -76,12 +77,13 @@ TCPSocket::getProtocol() const {
 TCPServer::TCPServer(io_service& io_service,
                      const ip::address& addr, const uint16_t port, 
                      CheckinProvider* checkin, DNSProvider* process) :
+    respbuf_(0), lenbuf_(TCP_MESSAGE_LENGTHSIZE),
     checkin_callback_(checkin), dns_callback_(process)
 {
     tcp::endpoint endpoint(addr, port);
     acceptor_.reset(new tcp::acceptor(io_service));
     acceptor_->open(endpoint.protocol());
-    // Set v6-only (we use a different instantiation for v4,
+    // Set v6-only (we use a separate instantiation for v4,
     // otherwise asio will bind to both v4 and v6
     if (addr.is_v6()) {
         acceptor_->set_option(ip::v6_only(true));
@@ -97,20 +99,20 @@ TCPServer::operator()(error_code ec, size_t length) {
         return;
     }
 
+    bool done = false;
     CORO_REENTER (this) {
         do {
             socket_.reset(new tcp::socket(acceptor_->get_io_service()));
             CORO_YIELD acceptor_->async_accept(*socket_, *this);
             CORO_FORK TCPServer(*this)();
-        } while (is_parent());
+        } while (is_child());
 
-        // Perform any necessary operations prior to processing
-        // an incoming packet (e.g., checking for queued
-        // configuration messages).
+        // Perform any necessary operations prior to processing an incoming
+        // packet (e.g., checking for queued configuration messages).
         //
-        // (XXX: it may be a performance issue to have this
-        // called for every single incoming packet; we may wish to
-        // throttle it somehow in the future.)
+        // (XXX: it may be a performance issue to have this called for
+        // every single incoming packet; we may wish to throttle it somehow
+        // in the future.)
         if (checkin_callback_ != NULL) {
             (*checkin_callback_)();
         }
@@ -118,7 +120,6 @@ TCPServer::operator()(error_code ec, size_t length) {
         // Instantiate the data buffer that will be used by the
         // asynchronous read call.
         data_ = boost::shared_ptr<char>(new char[MAX_LENGTH]);
-
         CORO_YIELD async_read(*socket_, asio::buffer(data_.get(),
                                                      TCP_MESSAGE_LENGTHSIZE),
                               *this);
@@ -136,28 +137,32 @@ TCPServer::operator()(error_code ec, size_t length) {
 
         // Instantiate the objects that will be needed by the
         // DNS callback and the asynchronous write calls.
-        dns_message_.reset(new Message(Message::PARSE));
-        response_.reset(new OutputBuffer(0));
-        lenbuf_.reset(new OutputBuffer(TCP_MESSAGE_LENGTHSIZE));
-        renderer_.reset(new MessageRenderer(*response_));
-        io_socket_.reset(new TCPSocket(*socket_));
-        io_endpoint_.reset(new TCPEndpoint(socket_->remote_endpoint()));
-        io_message_.reset(new IOMessage(data_.get(), length, *io_socket_,
-                                        *io_endpoint_));
-
-        // Process the DNS message
-        if (! (*dns_callback_)(*io_message_, *dns_message_, *renderer_)) {
+        respbuf_.clear();
+        renderer_.reset(new MessageRenderer(respbuf_));
+
+        // Process the DNS message.  (Must be done in a separate scope 
+        // because CORO_REENTER is implemented with a switch statement
+        // and inline variable declaration isn't allowed.)
+        {
+            TCPEndpoint peer(socket_->remote_endpoint());
+            TCPSocket iosock(*socket_);
+            IOMessage io_message(data_.get(), length, iosock, peer);
+            Message message(Message::PARSE);
+            done = (*dns_callback_)(io_message, message, *renderer_);
+        }
+
+        if (!done) {
             CORO_YIELD return;
         }
 
-        lenbuf_->writeUint16(response_->getLength());
-        CORO_YIELD async_write(*socket_,
-                               buffer(lenbuf_->getData(), lenbuf_->getLength()),
-                               *this);
-        CORO_YIELD async_write(*socket_,
-                               buffer(response_->getData(),
-                                      response_->getLength()),
-                               *this);
+        CORO_YIELD {
+            lenbuf_.clear();
+            lenbuf_.writeUint16(respbuf_.getLength());
+            boost::array<const_buffer,2> bufs;
+            bufs[0] = buffer(lenbuf_.getData(), lenbuf_.getLength());
+            bufs[1] = buffer(respbuf_.getData(), respbuf_.getLength());
+            async_write(*socket_, bufs, *this);
+        }
     }
 }
 

+ 46 - 46
src/lib/asiolink/udpdns.cc

@@ -16,6 +16,8 @@
 
 #include <config.h>
 
+#include <list>
+
 #include <unistd.h>             // for some IPC/network system calls
 #include <sys/socket.h>
 #include <netinet/in.h>
@@ -74,49 +76,44 @@ UDPSocket::getProtocol() const {
 UDPServer::UDPServer(io_service& io_service,
                      const ip::address& addr, const uint16_t port,
                      CheckinProvider* checkin, DNSProvider* process) :
-    checkin_callback_(checkin), dns_callback_(process)
+    respbuf_(0), checkin_callback_(checkin), dns_callback_(process)
 {
     // Wwe use a different instantiation for v4,
     // otherwise asio will bind to both v4 and v6
+    udp proto = addr.is_v4() ? udp::v4() : udp::v6();
+    socket_.reset(new udp::socket(io_service, proto));
+    socket_->set_option(socket_base::reuse_address(true));
     if (addr.is_v6()) {
-        socket_.reset(new udp::socket(io_service, udp::v6()));
-        socket_->set_option(socket_base::reuse_address(true));
         socket_->set_option(asio::ip::v6_only(true));
-        socket_->bind(udp::endpoint(udp::v6(), port));
-    } else {
-        socket_.reset(new udp::socket(io_service, udp::v4()));
-        socket_->set_option(socket_base::reuse_address(true));
-        socket_->bind(udp::endpoint(udp::v6(), port));
     }
+    socket_->bind(udp::endpoint(proto, port));
 }
 
 void
 UDPServer::operator()(error_code ec, size_t length) {
-    CORO_REENTER (this) for (;;) {
-        // Instantiate the data buffer that will be used by the
-        // asynchronous read calls.
-        data_ = boost::shared_ptr<char>(new char[MAX_LENGTH]);
-        sender_.reset(new udp::endpoint());
-
+    bool done = false;
+    CORO_REENTER (this) {
         do {
-            CORO_YIELD socket_->async_receive_from(asio::buffer(data_.get(),
-                                                                MAX_LENGTH),
-                                              *sender_, *this);
-        } while (ec || length == 0);
-
-        bytes_ = length;
-        CORO_FORK UDPServer(*this)();
-        if (is_parent()) {
-            continue;
-        }
-
-        // Perform any necessary operations prior to processing
-        // an incoming packet (e.g., checking for queued
-        // configuration messages).
+            // Instantiate the data buffer and endpoint that will
+            // be used by the asynchronous receive call.
+            data_.reset(new char[MAX_LENGTH]);
+            sender_.reset(new udp::endpoint());
+            do {
+                CORO_YIELD socket_->async_receive_from(buffer(data_.get(),
+                                                              MAX_LENGTH),
+                                                  *sender_, *this);
+            } while (ec || length == 0);
+
+            bytes_ = length;
+            CORO_FORK UDPServer(*this)();
+        } while (is_child());
+
+        // Perform any necessary operations prior to processing an incoming
+        // packet (e.g., checking for queued configuration messages).
         //
-        // (XXX: it may be a performance issue to have this
-        // called for every single incoming packet; we may wish to
-        // throttle it somehow in the future.)
+        // (XXX: it may be a performance issue to have this called for
+        // every single incoming packet; we may wish to throttle it somehow
+        // in the future.)
         if (checkin_callback_ != NULL) {
             (*checkin_callback_)();
         }
@@ -126,25 +123,28 @@ UDPServer::operator()(error_code ec, size_t length) {
             CORO_YIELD return;
         }
 
-        // Instantialize objects that will be needed by the
-        // DNS callback function and the async write.
-        dns_message_.reset(new Message(Message::PARSE));
-        response_.reset(new OutputBuffer(0));
-        renderer_.reset(new MessageRenderer(*response_));
-        io_socket_.reset(new UDPSocket(*socket_));
-        io_endpoint_.reset(new UDPEndpoint(*sender_));
-        io_message_.reset(new IOMessage(data_.get(), bytes_,
-                                        *io_socket_,
-                                        *io_endpoint_));
-
-        // Process the DNS message
-        if (! (*dns_callback_)(*io_message_, *dns_message_, *renderer_))
+        // Instantiate objects that will be needed by the
+        // asynchronous send call.
+        respbuf_.clear();
+        renderer_.reset(new MessageRenderer(respbuf_));
+
+        // Process the DNS message.  (Must be done in a separate scope 
+        // because CORO_REENTER is implemented with a switch statement,
+        // and thus normal inline variable declaration isn't allowed.)
         {
+            UDPEndpoint peer(*sender_);
+            UDPSocket iosock(*socket_);
+            IOMessage io_message(data_.get(), bytes_, iosock, peer);
+            Message message(Message::PARSE);
+            done = (*dns_callback_)(io_message, message, *renderer_);
+        }
+
+        if (!done) {
             CORO_YIELD return;
         }
 
-        CORO_YIELD socket_->async_send_to(asio::buffer(response_->getData(),
-                                                       response_->getLength()),
+        CORO_YIELD socket_->async_send_to(buffer(respbuf_.getData(),
+                                                 respbuf_.getLength()),
                                      *sender_, *this);
     }
 }