Browse Source

[master] Merge branch 'trac2916'

JINMEI Tatuya 12 years ago
parent
commit
994dfcd88a

+ 1 - 0
src/lib/asiolink/Makefile.am

@@ -32,6 +32,7 @@ libb10_asiolink_la_SOURCES += tcp_endpoint.h
 libb10_asiolink_la_SOURCES += tcp_socket.h
 libb10_asiolink_la_SOURCES += udp_endpoint.h
 libb10_asiolink_la_SOURCES += udp_socket.h
+libb10_asiolink_la_SOURCES += local_socket.h local_socket.cc
 
 # Note: the ordering matters: -Wno-... must follow -Wextra (defined in
 # B10_CXXFLAGS)

+ 102 - 0
src/lib/asiolink/local_socket.cc

@@ -0,0 +1,102 @@
+// Copyright (C) 2013  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#include <asiolink/local_socket.h>
+#include <asiolink/io_service.h>
+#include <asiolink/io_error.h>
+
+#include <asio.hpp>
+
+#include <boost/bind.hpp>
+
+#include <string>
+#include <sys/socket.h>
+
+namespace isc {
+namespace asiolink {
+class LocalSocket::Impl {
+public:
+    Impl(IOService& io_service, int fd) :
+        asio_sock_(io_service.get_io_service(),
+                   asio::local::stream_protocol(), fd)
+    {
+        // Depending on the underlying demultiplex API, the constructor may or
+        // may not throw in case fd is invalid.  To catch such cases sooner,
+        // we try to get the local endpoint (we don't need it in the rest of
+        // this implementation).
+        asio_sock_.local_endpoint(ec_);
+        if (ec_) {
+            isc_throw(IOError, "failed to open local socket with FD " << fd
+                      << " (local endpoint unknown): " << ec_.message());
+        }
+    }
+
+    asio::local::stream_protocol::socket asio_sock_;
+    asio::error_code ec_;
+};
+
+LocalSocket::LocalSocket(IOService& io_service, int fd) :
+    impl_(NULL)
+{
+    try {
+        impl_ = new Impl(io_service, fd);
+    } catch (const asio::system_error& error) {
+        // Catch and convert any exception from asio's constructor
+        isc_throw(IOError, "failed to open local socket with FD " << fd
+                  << ": " << error.what());
+    }
+}
+
+LocalSocket::~LocalSocket() {
+    delete impl_;
+}
+
+int
+LocalSocket::getNative() const {
+    return (impl_->asio_sock_.native());
+}
+
+int
+LocalSocket::getProtocol() const {
+    return (AF_UNIX);
+}
+
+namespace {
+// Wrapper callback for async_read that simply adjusts asio-native parameters
+// for the LocalSocket interface.  Note that this is a free function and
+// doesn't rely on internal member variables of LocalSocket.
+// So it can be called safely even after the LocalSocket object on which
+// asyncRead() was called is destroyed.
+void
+readCompleted(const asio::error_code& ec,
+              LocalSocket::ReadCallback user_callback)
+{
+    // assumption check: we pass non empty string iff ec indicates an error.
+    const std::string err_msg = ec ? ec.message() : std::string();
+    assert(ec || err_msg.empty());
+
+    user_callback(err_msg);
+}
+}
+
+void
+LocalSocket::asyncRead(const ReadCallback& callback, void* buf,
+                       size_t buflen)
+{
+    asio::async_read(impl_->asio_sock_, asio::buffer(buf, buflen),
+                     boost::bind(readCompleted, _1, callback));
+}
+
+} // namespace asiolink
+} // namespace isc

+ 132 - 0
src/lib/asiolink/local_socket.h

@@ -0,0 +1,132 @@
+// Copyright (C) 2013  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#ifndef LOCAL_SOCKET_H
+#define LOCAL_SOCKET_H 1
+
+#include <asiolink/io_socket.h>
+#include <asiolink/io_service.h>
+
+#include <boost/noncopyable.hpp>
+
+namespace isc {
+namespace asiolink {
+
+/// \brief A wrapper for ASIO stream socket in the local (AF_UNIX) domain.
+///
+/// This class provides a simple, limited set of wrapper interfaces to an
+/// ASIO stream socket object in the local domain.  Unlike other concrete
+/// derived classes of \c IOSocket, this class is intended to be instantiated
+/// directly.  Right now it only provides read interface due to the limited
+/// expected usage, but it can be extended as we see need for other operations
+/// on this socket.
+///
+/// Note that in the initial implementation there's even no stop() or cancel()
+/// method; for these cases users are expected to just destroy the socket
+/// object (this may be extended in future, too).
+class LocalSocket : boost::noncopyable, public IOSocket {
+public:
+    /// \brief Constructor from a native file descriptor of AF_UNIX stream
+    /// socket.
+    ///
+    /// Parameter \c fd must be an open stream-type socket of the AF_UNIX
+    /// domain.  The constructor tries to detect some invalid cases, but
+    /// it may not reject all invalid cases.  It's generally the
+    /// responsibility of the caller.
+    ///
+    /// \throw IOError Failed to create the socket object, most likely because
+    /// the given file descriptor is invalid.
+    ///
+    /// \param io_service The IO service object to handle events on this
+    /// socket.
+    /// \param fd File descriptor of an AF_UNIX socket.
+    LocalSocket(IOService& io_service, int fd);
+
+    /// \brief Destructor.
+    ///
+    /// \throw None.
+    virtual ~LocalSocket();
+
+    /// \brief Local socket version of getNative().
+    ///
+    /// \throw None.
+    virtual int getNative() const;
+
+    /// \brief Local socket version of getProtocol().
+    ///
+    /// It always returns \c AF_UNIX.
+    ///
+    /// \throw None.
+    virtual int getProtocol() const;
+
+    /// \brief The callback functor for the \c asyncRead method.
+    ///
+    /// The callback takes one parameter, \c error.  It will be set to
+    /// non empty string if read operation fails and the string explains
+    /// the reason for the failure.  On success \c error will be empty.
+    typedef boost::function<void(const std::string& error)> ReadCallback;
+
+    /// \brief Start asynchronous read on the socket.
+    ///
+    /// This method registers an interest on a new read event on the local
+    /// socket for the specified length of data (\c buflen bytes).  This
+    /// method returns immediately.   When the specified amount of data
+    /// are available for read from the socket or an error happens, the
+    /// specified callback will be called.  In the former case the data are
+    /// copied into the given buffer (pointed to by \c buf); in the latter
+    /// case, the \c error parameter of the callback will be set to a non
+    /// empty string.
+    ///
+    /// In the case of error, this socket should be considered
+    /// unusable anymore, because this class doesn't provide a feasible way
+    /// to identify where in the input stream to restart reading.  So,
+    /// in practice, the user of this socket should destroy this socket,
+    /// and, if necessary to continue, create a new one.
+    ///
+    /// \c buf must point to a memory region that has at least \c buflen
+    /// bytes of valid space.  That region must be kept valid until the
+    /// callback is called or the \c IOService passed to the constructor
+    /// is stopped.  This method and class do not check these conditions;
+    /// it's the caller's responsibility to guarantee them.
+    ///
+    /// \note If asyncRead() has been called and hasn't been completed (with
+    /// the callback being called), it's possible that the callback is called
+    /// even after the \c LocalSocket object is destroyed.  So the caller
+    /// has to make sure that either \c LocalSocket is valid until the
+    /// callback is called or the callback does not depend on \c LocalSocket;
+    /// alternatively, the caller can stop the \c IOService.  This will make
+    /// sure the callback will not be called regardless of when and how
+    /// the \c LocalSocket is destroyed.
+    ///
+    /// \throw None.
+    ///
+    /// \brief callback The callback functor to be called on the completion
+    /// of read.
+    /// \brief buf Buffer to read in data from the socket.
+    /// \brief buflen Length of data to read.
+    void asyncRead(const ReadCallback& callback, void* buf, size_t buflen);
+
+private:
+    class Impl;
+    Impl* impl_;
+};
+
+} // namespace asiolink
+} // namespace isc
+
+#endif // LOCAL_SOCKET_H
+
+// Local Variables:
+// mode: c++
+// End:

+ 1 - 0
src/lib/asiolink/tests/Makefile.am

@@ -34,6 +34,7 @@ run_unittests_SOURCES += tcp_socket_unittest.cc
 run_unittests_SOURCES += udp_endpoint_unittest.cc
 run_unittests_SOURCES += udp_socket_unittest.cc
 run_unittests_SOURCES += io_service_unittest.cc
+run_unittests_SOURCES += local_socket_unittest.cc
 
 run_unittests_CPPFLAGS = $(AM_CPPFLAGS) $(GTEST_INCLUDES)
 

+ 250 - 0
src/lib/asiolink/tests/local_socket_unittest.cc

@@ -0,0 +1,250 @@
+// Copyright (C) 2013  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#include <asiolink/local_socket.h>
+#include <asiolink/io_error.h>
+
+#include <gtest/gtest.h>
+
+#include <boost/noncopyable.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/bind.hpp>
+
+#include <csignal>
+#include <vector>
+
+#include <sys/socket.h>
+#include <stdint.h>
+#include <unistd.h>             // for alarm(3)
+
+using namespace isc::asiolink;
+
+namespace {
+
+// duration (in seconds) until we break possible hangup; value is an
+// arbitrary choice.
+const unsigned IO_TIMEOUT = 10;
+
+// A simple RAII wrapper for a file descriptor so test sockets are safely
+// closed in each test.
+class ScopedSocket : boost::noncopyable {
+public:
+    ScopedSocket() : fd_(-1) {}
+    ~ScopedSocket() {
+        if (fd_ >= 0) {
+            EXPECT_EQ(0, ::close(fd_));
+        }
+    }
+    void set(int fd) {
+        assert(fd_ == -1);
+        fd_ = fd;
+    }
+    int get() { return (fd_); }
+    int release() {
+        const int ret = fd_;
+        fd_ = -1;
+        return (ret);
+    }
+private:
+    int fd_;
+};
+
+class LocalSocketTest : public ::testing::Test {
+protected:
+    LocalSocketTest() {
+        int sock_pair[2];
+        EXPECT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, sock_pair));
+        sock_pair_[0].set(sock_pair[0]);
+        sock_pair_[1].set(sock_pair[1]);
+
+        // For tests using actual I/O we use a timer to prevent hangup
+        // due to a bug.  Set up the signal handler for the timer here.
+        g_io_service_ = &io_service_;
+        prev_handler_ = std::signal(SIGALRM, stopIOService);
+    }
+
+    ~LocalSocketTest() {
+        alarm(0);
+        // reset the global to NULL to detect any invalid access to freed
+        // io_service (this shouldn't happen, so we don't change stopIOService
+        // itself)
+        g_io_service_ = NULL;
+        std::signal(SIGALRM, prev_handler_);
+    }
+
+    // Common set of tests for async read
+    void checkAsyncRead(size_t data_len);
+
+    IOService io_service_;
+    ScopedSocket sock_pair_[2];
+    std::vector<uint8_t> read_buf_;
+private:
+    static IOService* g_io_service_; // will be set to &io_service_
+    void (*prev_handler_)(int);
+
+    // SIGALRM handler to prevent hangup.  This must be a static method
+    // so it can be passed to std::signal().
+    static void stopIOService(int) {
+        g_io_service_->stop();
+    }
+};
+
+IOService* LocalSocketTest::g_io_service_ = NULL;
+
+TEST_F(LocalSocketTest, construct) {
+    const int fd = sock_pair_[0].release();
+    LocalSocket sock(io_service_, fd);
+    EXPECT_EQ(fd, sock.getNative());
+    EXPECT_EQ(AF_UNIX, sock.getProtocol());
+}
+
+TEST_F(LocalSocketTest, constructError) {
+    // try to construct a LocalSocket object with a closed socket.  It should
+    // fail.
+    const int fd = sock_pair_[0].release();
+    EXPECT_EQ(0, close(fd));
+    EXPECT_THROW(LocalSocket(io_service_, fd), IOError);
+}
+
+TEST_F(LocalSocketTest, autoClose) {
+    // Confirm that passed FD will be closed on destruction of LocalSocket
+    const int fd = sock_pair_[0].release();
+    {
+        LocalSocket sock(io_service_, fd);
+    }
+    // fd should have been closed, so close() should fail (we assume there's
+    // no other open() call since then)
+    EXPECT_EQ(-1, ::close(fd));
+}
+
+void
+callback(const std::string& error, IOService* io_service, bool* called,
+         bool expect_error)
+{
+    if (expect_error) {
+        EXPECT_NE("", error);
+    } else {
+        EXPECT_EQ("", error);
+    }
+    *called = true;
+    io_service->stop();
+}
+
+void
+LocalSocketTest::checkAsyncRead(size_t data_len) {
+    LocalSocket sock(io_service_, sock_pair_[0].release());
+    bool callback_called = false;
+    read_buf_.resize(data_len);
+    sock.asyncRead(boost::bind(&callback, _1, &io_service_, &callback_called,
+                               false), &read_buf_[0], data_len);
+
+    std::vector<uint8_t> expected_data(data_len);
+    for (size_t i = 0; i < data_len; ++i) {
+        expected_data[i] = i & 0xff;
+    }
+    alarm(IO_TIMEOUT);
+    // If write blocks, it will eventually fail due to signal interruption.
+    // Since io_service has been stopped already, run() would immediately
+    // return and test should complete (with failure).  But to make very sure
+    // it never cause hangup we rather return from the test at the point of
+    // failure of write.  In either case it signals a failure and need for
+    // a fix.
+    ASSERT_EQ(data_len, write(sock_pair_[1].get(), &expected_data[0],
+                              data_len));
+    io_service_.run();
+    EXPECT_TRUE(callback_called);
+    EXPECT_EQ(0, std::memcmp(&expected_data[0], &read_buf_[0], data_len));
+    
+}
+
+TEST_F(LocalSocketTest, asyncRead) {
+    // A simple case of asynchronous read: wait for 1 byte and successfully
+    // read it in the run() loop.
+    checkAsyncRead(1);
+}
+
+TEST_F(LocalSocketTest, asyncLargeRead) {
+    // Similar to the previous case, but for moderately larger data.
+    // (for the moment) we don't expect to use this interface with much
+    // larger data that could cause blocking write.
+    checkAsyncRead(1024);
+}
+
+TEST_F(LocalSocketTest, asyncPartialRead) {
+    alarm(IO_TIMEOUT);
+
+    // specify reading 4 bytes of data, and send 3 bytes.  It shouldn't cause
+    // callback.
+    char recv_buf[4];
+    bool callback_called = false;
+    LocalSocket sock(io_service_, sock_pair_[0].release());
+    sock.asyncRead(boost::bind(&callback, _1, &io_service_, &callback_called,
+                               false), recv_buf, sizeof(recv_buf));
+    EXPECT_EQ(3, write(sock_pair_[1].get(), recv_buf, 3));
+
+    // open another pair of sockets so we can stop the IO service after run.
+    int socks[2];
+    char ch = 0;
+    EXPECT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, socks));
+    ScopedSocket aux_sockpair[2];
+    aux_sockpair[0].set(socks[0]);
+    aux_sockpair[1].set(socks[1]);
+    LocalSocket aux_sock(io_service_, aux_sockpair[0].get());
+    aux_sockpair[0].release();  // on successful construction we should release
+    bool aux_callback_called = false;
+    aux_sock.asyncRead(boost::bind(&callback, _1, &io_service_,
+                                   &aux_callback_called, false), &ch, 1);
+    EXPECT_EQ(1, write(aux_sockpair[1].get(), &ch, 1));
+
+    // run the IO service, it will soon be stopped via the auxiliary callback.
+    // the main callback shouldn't be called.
+    io_service_.run();
+    EXPECT_FALSE(callback_called);
+    EXPECT_TRUE(aux_callback_called);
+}
+
+TEST_F(LocalSocketTest, asyncReadError) {
+    const int sock_fd = sock_pair_[0].release();
+    LocalSocket sock(io_service_, sock_fd);
+    bool callback_called = false;
+    read_buf_.resize(1);
+    read_buf_.at(0) = 53;       // dummy data to check it later
+    const char ch = 35; // send different data to the read socket with data
+    EXPECT_EQ(1, write(sock_pair_[1].get(), &ch, 1));
+    close(sock_fd);             // invalidate the read socket
+    // we'll get callback with an error (e.g. 'bad file descriptor)
+    sock.asyncRead(boost::bind(&callback, _1, &io_service_, &callback_called,
+                               true), &read_buf_[0], 1);
+
+    io_service_.run();
+    EXPECT_TRUE(callback_called);
+    EXPECT_EQ(53, read_buf_.at(0));
+}
+
+TEST_F(LocalSocketTest, asyncReadThenDestroy) {
+    // destroy the socket before running the IO service.  we'll still get
+    // callback with an error.
+    boost::scoped_ptr<LocalSocket> sock(
+        new LocalSocket(io_service_, sock_pair_[0].release()));
+    read_buf_.resize(1);
+    bool callback_called = false;
+    sock->asyncRead(boost::bind(&callback, _1, &io_service_, &callback_called,
+                                true), &read_buf_[0], 1);
+    sock.reset();
+
+    io_service_.run();
+    EXPECT_TRUE(callback_called);
+}
+
+}