Browse Source

[5189] Added asynchronous operations to the UnixDomainSocket class.

Marcin Siodelski 8 years ago
parent
commit
a61bff09a6

+ 138 - 0
src/lib/asiolink/tests/unix_domain_socket_unittest.cc

@@ -109,6 +109,101 @@ TEST_F(UnixDomainSocketTest, sendReceive) {
     EXPECT_EQ("received foo", response);
 }
 
+// This test verifies that the client can send the data over the unix
+// domain socket and receive a response asynchronously.
+TEST_F(UnixDomainSocketTest, asyncSendReceive) {
+    // Start the server.
+    test_socket_->bindServerSocket();
+
+    // Setup client side.
+    UnixDomainSocket socket(io_service_);
+
+    // We're going to asynchronously connect to the server. The boolean value
+    // below will be modified by the connect handler function (lambda) invoked
+    // when the connection is established or if an error occurs.
+    bool connect_handler_invoked = false;
+    ASSERT_NO_THROW(socket.asyncConnect(unixSocketFilePath(),
+        [this, &connect_handler_invoked](const boost::system::error_code& ec) {
+            // Indicate that the handler has been called so as the loop below gets
+            // interrupted.
+            connect_handler_invoked = true;
+            // Operation aborted indicates that IO service has been stopped. This
+            // shouldn't happen here.
+            if (ec && ec.value() != boost::asio::error::operation_aborted) {
+                ADD_FAILURE() << "error occurred while asynchronously connecting"
+                    " via unix domain socket: " << ec.message();
+            }
+        }
+    ));
+    // Run IO service until connect handler is invoked.
+    while (!connect_handler_invoked) {
+        io_service_.run_one();
+    }
+
+    // We are going to asynchronously send the 'foo' over the unix socket.
+    const std::string outbound_data = "foo";
+    size_t sent_size = 0;
+    ASSERT_NO_THROW(socket.asyncSend(outbound_data.c_str(), outbound_data.size(),
+        [this, &sent_size](const boost::system::error_code& ec, size_t length) {
+        // If we have been successful sending the data, record the number of
+        // bytes we have sent.
+        if (!ec) {
+            sent_size = length;
+
+        } else if (ec.value() != boost::asio::error::operation_aborted) {
+            ADD_FAILURE() << "error occurred while asynchronously sending the"
+            " data over unix domain socket: " << ec.message();
+        }
+    }
+    ));
+
+    // Run IO service to generate server's response.
+    while (test_socket_->getResponseNum() < 1) {
+        io_service_.run_one();
+    }
+
+    // There is no guarantee that all data have been sent so we only check that
+    // some data have been sent.
+    ASSERT_GT(sent_size, 0);
+
+    // Receive response from the socket.
+    std::array<char, 1024> read_buf;
+    size_t bytes_read = 0;
+    bool receive_handler_invoked = false;
+    ASSERT_NO_THROW(socket.asyncReceive(&read_buf[0], read_buf.size(),
+        [this, &receive_handler_invoked, &bytes_read]
+            (const boost::system::error_code& ec, size_t length) mutable {
+        // Indicate that the handler has been called to interrupt the
+        // loop below.
+        receive_handler_invoked = true;
+
+        // If we have been successful receiving the data, record the number of
+        // bytes received.
+        if (!ec) {
+            bytes_read = length;
+
+        } else if (ec.value() != boost::asio::error::operation_aborted) {
+            ADD_FAILURE() << "error occurred while asynchronously receiving"
+                " data via unix domain socket: " << ec.message();
+        }
+
+    }));
+    // Run IO service until we get some response from the server.
+    while (!receive_handler_invoked) {
+        io_service_.run_one();
+    }
+
+    // Make sure we have received something.
+    ASSERT_GT(bytes_read, 0);
+
+    std::string response(&read_buf[0], bytes_read);
+
+    // What we have received should be a substring of the sent data prepended
+    // with 'received'. For such short chunks of data it is usually 'received foo'
+    // that we receive but there is no guarantee.
+    EXPECT_EQ(0, std::string("received foo").find(response));
+}
+
 // This test verifies that UnixDomainSocketError exception is thrown
 // on attempt to connect, write or receive when the server socket
 // is not available.
@@ -123,6 +218,49 @@ TEST_F(UnixDomainSocketTest, clientErrors) {
                  UnixDomainSocketError);
 }
 
+// This test verifies that an error is returned on attempt to asynchronously
+// connect, write or receive when the server socket is not available.
+TEST_F(UnixDomainSocketTest, asyncClientErrors) {
+    UnixDomainSocket socket(io_service_);
+
+    // Connect
+    bool connect_handler_invoked = false;
+    socket.asyncConnect(unixSocketFilePath(),
+        [this, &connect_handler_invoked](const boost::system::error_code& ec) {
+        connect_handler_invoked = true;
+        EXPECT_TRUE(ec);
+    });
+    while (!connect_handler_invoked) {
+        io_service_.run_one();
+    }
+
+    // Send
+    const std::string outbound_data = "foo";
+    bool send_handler_invoked = false;
+    socket.asyncSend(outbound_data.c_str(), outbound_data.size(),
+        [this, &send_handler_invoked]
+        (const boost::system::error_code& ec, size_t length) {
+        send_handler_invoked = true;
+        EXPECT_TRUE(ec);
+    });
+    while (!send_handler_invoked) {
+        io_service_.run_one();
+    }
+
+    // Receive
+    bool receive_handler_invoked = false;
+    std::array<char, 1024> read_buf;
+    socket.asyncReceive(&read_buf[0], read_buf.size(),
+        [this, &receive_handler_invoked]
+        (const boost::system::error_code& ec, size_t length) {
+        receive_handler_invoked = true;
+        EXPECT_TRUE(ec);
+    });
+    while (!receive_handler_invoked) {
+        io_service_.run_one();
+    }
+}
+
 // Check that native socket descriptor is returned correctly when
 // the socket is connected.
 TEST_F(UnixDomainSocketTest, getNative) {

+ 227 - 3
src/lib/asiolink/unix_domain_socket.cc

@@ -6,6 +6,7 @@
 
 #include <asiolink/asio_wrapper.h>
 #include <asiolink/unix_domain_socket.h>
+#include <boost/enable_shared_from_this.hpp>
 #include <iostream>
 using namespace boost::asio::local;
 
@@ -13,7 +14,7 @@ namespace isc {
 namespace asiolink {
 
 /// @brief Implementation of the unix domain socket.
-class UnixDomainSocketImpl {
+class UnixDomainSocketImpl : public boost::enable_shared_from_this<UnixDomainSocketImpl> {
 public:
 
     /// @brief Constructor.
@@ -30,6 +31,120 @@ public:
         close();
     }
 
+    /// @brief Asynchronously connects to an endpoint.
+    ///
+    /// This method schedules asynchronous connect and installs the
+    /// @ref UnixDomainSocketImpl::connectHandler as a callback.
+    ///
+    /// @param endpoint Reference to an endpoint to connect to.
+    /// @param handler User supplied handler to be invoked when the connection
+    /// is established or when error is signalled.
+    void asyncConnect(const stream_protocol::endpoint& endpoint,
+                      const UnixDomainSocket::ConnectHandler& handler);
+
+    /// @brief Local handler invoked as a result of asynchronous connection.
+    ///
+    /// This is a wrapper around the user supplied callback. It ignores
+    /// EINPROGRESS errors which are observed on some operating systems as
+    /// a result of trying to connect asynchronously. This error code doesn't
+    /// necessarily indicate a problem and the subsequent attempts to read
+    /// and write to the socket will succeed. Therefore, the handler simply
+    /// overrides this error code with success status. The user supplied
+    /// handler don't need to deal with the EINPROGRESS error codes.
+    ///
+    /// @param remote_handler User supplied callback.
+    /// @param ec Error code returned as a result of connection.
+    void connectHandler(const UnixDomainSocket::ConnectHandler& remote_handler,
+                        const boost::system::error_code& ec);
+
+    /// @brief Asynchronously sends data over the socket.
+    ///
+    /// This method schedules an asynchronous send and installs the
+    /// @ref UnixDomainSocketImpl::sendHandler as a callback.
+    ///
+    /// @param data Pointer to data to be sent.
+    /// @param length Number of bytes to be sent.
+    /// @param handler Callback to be invoked when data have been sent or an
+    /// sending error is signalled.
+    void asyncSend(const void* data, const size_t length,
+                   const UnixDomainSocket::Handler& handler);
+
+    /// @brief Asynchronously sends the data over the socket.
+    ///
+    /// This method is called by the @ref asyncSend and the @ref sendHandler
+    /// if the asynchronous send has to be repeated as a result of receiving
+    /// EAGAIN or EWOULDBLOCK.
+    ///
+    /// @param buffer Buffers holding the data to be sent.
+    /// @param handler User supplied callback to be invoked when data have
+    /// been sent or sending error is signalled.
+    void doSend(const boost::asio::const_buffers_1& buffer,
+                const UnixDomainSocket::Handler& handler);
+
+
+    /// @brief Local handler invoked as a result of asynchronous send.
+    ///
+    /// This handler is invoked as a result of asynchronous send. It is a
+    /// wrapper callback around the user supplied callback. It handles
+    /// EWOULDBLOCK and EAGAIN errors by retrying an asynchronous send.
+    /// These errors are often returned on some operating systems, even
+    /// though one would expect that asynchronous operation would not
+    /// return such errors. Because these errors are handled by the
+    /// wrapper callback, the user supplied callback never receives
+    /// these errors.
+    ///
+    /// @param remote_handler User supplied callback.
+    /// @param buffer Buffers holding the data to be sent.
+    /// @param ec Error code returned as a result of sending the data.
+    /// @param length Length of the data sent.
+    void sendHandler(const UnixDomainSocket::Handler& remote_handler,
+                     const boost::asio::const_buffers_1& buffer,
+                     const boost::system::error_code& ec,
+                     size_t length);
+
+    /// @brief Asynchronously receive data over the socket.
+    ///
+    /// This method schedules asynchronous receive and installs the
+    /// @ref UnixDomainSocket::receiveHandler is a callback.
+    ///
+    /// @param data Pointer to a buffer into which the data should be read.
+    /// @param length Length of the buffer.
+    /// @param handler User supplied callback invoked when data have been
+    /// received or an error is signalled.
+    void asyncReceive(void* data, const size_t length,
+                      const UnixDomainSocket::Handler& handler);
+
+    /// @brief Asynchronously receives the data over the socket.
+    ///
+    /// This method is called @ref asyncReceive and @ref receiveHandler when
+    /// EWOULDBLOCK or EAGAIN is returned.
+    ///
+    /// @param buffer A buffer into which the data should be received.
+    /// @param handler User supplied callback invoked when data have been
+    /// received on an error is signalled.
+    void doReceive(const boost::asio::mutable_buffers_1& buffer,
+                   const UnixDomainSocket::Handler& handler);
+
+    /// @brief Local handler invoked as a result of asynchronous receive.
+    ///
+    /// This handler is invoked as a result of asynchronous receive. It is a
+    /// wrapper callback around the user supplied callback. It handles
+    /// EWOULDBLOCK and EAGAIN by retrying to asynchronously receive the
+    /// data. These errors are often returned on some operating systems, even
+    /// though one would expect that asynchronous operation would not
+    /// return such errors. Because these errors are handled by the
+    /// wrapper callback, the user supplied callback never receives
+    /// these errors.
+    ///
+    /// @param remote_handler User supplied callback.
+    /// @param buffer Buffer into which the data are received.
+    /// @param ec Error code returned as a result of asynchronous receive.
+    /// @param length Size of the received data.
+    void receiveHandler(const UnixDomainSocket::Handler& remote_handler,
+                        const boost::asio::mutable_buffers_1& buffer,
+                        const boost::system::error_code& ec,
+                        size_t length);
+
     /// @brief Closes the socket.
     void close();
 
@@ -38,6 +153,98 @@ public:
 };
 
 void
+UnixDomainSocketImpl::asyncConnect(const stream_protocol::endpoint& endpoint,
+                                   const UnixDomainSocket::ConnectHandler& handler) {
+    using namespace std::placeholders;
+
+    UnixDomainSocket::ConnectHandler local_handler =
+        std::bind(&UnixDomainSocketImpl::connectHandler, shared_from_this(),
+                  handler, _1);
+    socket_.async_connect(endpoint, local_handler);
+}
+
+void
+UnixDomainSocketImpl::connectHandler(const UnixDomainSocket::ConnectHandler& remote_handler,
+                                     const boost::system::error_code& ec) {
+    // It was observed on Debian and Fedora that asynchronous connect may result
+    // in EINPROGRESS error. This doesn't really indicate a problem with a
+    // connection. If we continue transmitting data over the socket it will
+    // succeed. So we suppress this error and return 'success' to the user's
+    // handler.
+    if (ec.value() == boost::asio::error::in_progress) {
+        remote_handler(boost::system::error_code());
+    } else {
+        remote_handler(ec);
+    }
+}
+
+void
+UnixDomainSocketImpl::asyncSend(const void* data, const size_t length,
+                                const UnixDomainSocket::Handler& handler) {
+    doSend(boost::asio::buffer(data, length), handler);
+}
+
+void
+UnixDomainSocketImpl::doSend(const boost::asio::const_buffers_1& buffer,
+                             const UnixDomainSocket::Handler& handler) {
+    using namespace std::placeholders;
+
+    UnixDomainSocket::Handler local_handler =
+        std::bind(&UnixDomainSocketImpl::sendHandler, shared_from_this(),
+                  handler, buffer, _1, _2);
+    socket_.async_send(buffer, local_handler);
+}
+
+void
+UnixDomainSocketImpl::sendHandler(const UnixDomainSocket::Handler& remote_handler,
+                                  const boost::asio::const_buffers_1& buffer,
+                                  const boost::system::error_code& ec,
+                                  size_t length) {
+    // The asynchronous send may return EWOULDBLOCK or EAGAIN on some
+    // operating systems. In this case, we simply retry hoping that it
+    // will succeed next time. The user's callback never sees these
+    // errors.
+    if ((ec.value() == boost::asio::error::would_block) ||
+        (ec.value() == boost::asio::error::try_again)) {
+        doSend(buffer, remote_handler);
+    }
+    remote_handler(ec, length);
+}
+
+void
+UnixDomainSocketImpl::asyncReceive(void* data, const size_t length,
+                                   const UnixDomainSocket::Handler& handler) {
+    doReceive(boost::asio::buffer(data, length), handler);
+}
+
+void
+UnixDomainSocketImpl::doReceive(const boost::asio::mutable_buffers_1& buffer,
+                                const UnixDomainSocket::Handler& handler) {
+    using namespace std::placeholders;
+
+    UnixDomainSocket::Handler local_handler =
+        std::bind(&UnixDomainSocketImpl::receiveHandler, shared_from_this(),
+                  handler, buffer, _1, _2);
+    socket_.async_receive(buffer, 0, local_handler);
+}
+
+void
+UnixDomainSocketImpl::receiveHandler(const UnixDomainSocket::Handler& remote_handler,
+                                     const boost::asio::mutable_buffers_1& buffer,
+                                     const boost::system::error_code& ec,
+                                     size_t length) {
+    // The asynchronous receive may return EWOULDBLOCK or EAGAIN on some
+    // operating systems. In this case, we simply retry hoping that it
+    // will succeed next time. The user's callback never sees these
+    // errors.
+    if ((ec.value() == boost::asio::error::would_block) ||
+        (ec.value() == boost::asio::error::try_again)) {
+        doReceive(buffer, remote_handler);
+    }
+    remote_handler(ec, length);
+}
+
+void
 UnixDomainSocketImpl::close() {
     static_cast<void>(socket_.close());
 }
@@ -65,6 +272,11 @@ UnixDomainSocket::connect(const std::string& path) {
     }
 }
 
+void
+UnixDomainSocket::asyncConnect(const std::string& path, const ConnectHandler& handler) {
+    impl_->asyncConnect(stream_protocol::endpoint(path.c_str()), handler);
+}
+
 size_t
 UnixDomainSocket::write(const void* data, size_t length) {
     boost::system::error_code ec;
@@ -78,6 +290,12 @@ UnixDomainSocket::write(const void* data, size_t length) {
     return (res);
 }
 
+void
+UnixDomainSocket::asyncSend(const void* data, const size_t length,
+                            const Handler& handler) {
+    impl_->asyncSend(data, length, handler);
+}
+
 size_t
 UnixDomainSocket::receive(void* data, size_t length) {
     boost::system::error_code ec;
@@ -89,9 +307,15 @@ UnixDomainSocket::receive(void* data, size_t length) {
 }
 
 void
+UnixDomainSocket::asyncReceive(void* data, const size_t length,
+                               const Handler& handler) {
+    impl_->asyncReceive(data, length, handler);
+}
+
+void
 UnixDomainSocket::close() {
     impl_->close();
 }
 
-}
-}
+} // end of namespace asiolink
+} // end of namespace isc

+ 37 - 0
src/lib/asiolink/unix_domain_socket.h

@@ -10,6 +10,7 @@
 #include <asiolink/io_service.h>
 #include <asiolink/io_socket.h>
 #include <boost/shared_ptr.hpp>
+#include <functional>
 #include <string>
 
 namespace isc {
@@ -29,6 +30,13 @@ class UnixDomainSocketImpl;
 class UnixDomainSocket : public IOSocket {
 public:
 
+    /// @brief Callback type used in call to @ref UnixDomainSocket::asyncConnect.
+    typedef std::function<void(const boost::system::error_code&)> ConnectHandler;
+
+    /// @brief Callback type used in calls to @ref UnixDomainSocket::asyncSend
+    /// and @ref UnixDomainSocket::asyncReceive.
+    typedef std::function<void(const boost::system::error_code&, size_t)> Handler;
+
     /// @brief Constructor.
     ///
     /// @param io_service Reference to IOService to be used by this
@@ -48,6 +56,15 @@ public:
     /// @throw UnixDomainSocketError if error occurs.
     void connect(const std::string& path);
 
+    /// @brief Asynchronously connects the socket to the specified endpoint.
+    ///
+    /// Always returns immediatelly.
+    ///
+    /// @param path Path to the unix socket to which we should connect.
+    /// @param handler Callback to be invoked when connection is established or
+    /// a connection error is signalled.
+    void asyncConnect(const std::string& path, const ConnectHandler& handler);
+
     /// @brief Writes specified amount of data to a socket.
     ///
     /// @param data Pointer to data to be written.
@@ -57,6 +74,16 @@ public:
     /// @throw UnixDomainSocketError if error occurs.
     size_t write(const void* data, size_t length);
 
+    /// @brief Asynchronously sends data over the socket.
+    ///
+    /// Always returns immediatelly.
+    ///
+    /// @param data Pointer to data to be sent.
+    /// @param length Number of bytes to be sent.
+    /// @param handler Callback to be invoked when data have been sent or an
+    /// sending error is signalled.
+    void asyncSend(const void* data, const size_t length, const Handler& handler);
+
     /// @brief Receives data from a socket.
     ///
     /// @param [out] data Pointer to a location into which the read data should
@@ -67,6 +94,16 @@ public:
     /// @throw UnixDomainSocketError if error occurs.
     size_t receive(void* data, size_t length);
 
+    /// @brief Asynchronously receives data over the socket.
+    ///
+    /// Always returns immediatelly.
+    /// @param [out] data Pointer to a location into which the read data should
+    /// be stored.
+    /// @param length Length of the buffer.
+    /// @param handler Callback to be invoked when data have been received or an
+    /// error is signalled.
+    void asyncReceive(void* data, const size_t length, const Handler& handler);
+
     /// @brief Closes the socket.
     void close();