Browse Source

[1452b] Merge branch 'trac1452' into trac1452b

JINMEI Tatuya 13 years ago
parent
commit
b76b851a6a

+ 1 - 1
doc/Doxyfile

@@ -573,7 +573,7 @@ INPUT                  = ../src/lib/exceptions ../src/lib/cc \
     ../src/bin/auth ../src/bin/resolver ../src/lib/bench ../src/lib/log \
     ../src/lib/log/compiler ../src/lib/asiolink/ ../src/lib/nsas \
     ../src/lib/testutils ../src/lib/cache ../src/lib/server_common/ \
-    ../src/bin/sockcreator/ ../src/lib/util/ \
+    ../src/bin/sockcreator/ ../src/lib/util/ ../src/lib/util/io/ \
     ../src/lib/resolve ../src/lib/acl ../src/bin/dhcp6 ../src/lib/dhcp
 
 # This tag can be used to specify the character encoding of the source files

+ 1 - 1
src/bin/xfrout/xfrout.py.in

@@ -716,7 +716,7 @@ class UnixSockServer(socketserver_mixin.NoPollMixIn,
             # This may happen when one xfrout process try to connect to
             # xfrout unix socket server, to check whether there is another
             # xfrout running.
-            if sock_fd == FD_COMM_ERROR:
+            if sock_fd == FD_SYSTEM_ERROR:
                 logger.error(XFROUT_RECEIVE_FILE_DESCRIPTOR_ERROR)
             return
 

+ 4 - 1
src/lib/util/io/Makefile.am

@@ -1,8 +1,11 @@
 AM_CXXFLAGS = $(B10_CXXFLAGS)
 
+AM_CPPFLAGS = -I$(top_srcdir)/src/lib -I$(top_builddir)/src/lib
+AM_CPPFLAGS += $(BOOST_INCLUDES)
+
 lib_LTLIBRARIES = libutil_io.la
 libutil_io_la_SOURCES = fd.h fd.cc fd_share.h fd_share.cc
-libutil_io_la_CXXFLAGS = $(AM_CXXFLAGS) -fno-strict-aliasing
+libutil_io_la_SOURCES += socketsession.h socketsession.cc sockaddr_util.h
 
 CLEANFILES = *.gcno *.gcda
 

+ 9 - 4
src/lib/util/io/fd_share.cc

@@ -18,6 +18,7 @@
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <sys/uio.h>
+#include <errno.h>
 #include <stdlib.h>             // for malloc and free
 #include "fd_share.h"
 
@@ -87,12 +88,16 @@ recv_fd(const int sock) {
     msghdr.msg_controllen = cmsg_space(sizeof(int));
     msghdr.msg_control = malloc(msghdr.msg_controllen);
     if (msghdr.msg_control == NULL) {
-        return (FD_OTHER_ERROR);
+        return (FD_SYSTEM_ERROR);
     }
 
-    if (recvmsg(sock, &msghdr, 0) < 0) {
+    const int cc = recvmsg(sock, &msghdr, 0);
+    if (cc <= 0) {
         free(msghdr.msg_control);
-        return (FD_COMM_ERROR);
+        if (cc == 0) {
+            errno = ECONNRESET;
+        }
+        return (FD_SYSTEM_ERROR);
     }
     const struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msghdr);
     int fd = FD_OTHER_ERROR;
@@ -131,7 +136,7 @@ send_fd(const int sock, const int fd) {
 
     const int ret = sendmsg(sock, &msghdr, 0);
     free(msghdr.msg_control);
-    return (ret >= 0 ? 0 : FD_COMM_ERROR);
+    return (ret >= 0 ? 0 : FD_SYSTEM_ERROR);
 }
 
 } // End for namespace io

+ 9 - 5
src/lib/util/io/fd_share.h

@@ -25,7 +25,7 @@ namespace isc {
 namespace util {
 namespace io {
 
-const int FD_COMM_ERROR = -2;
+const int FD_SYSTEM_ERROR = -2;
 const int FD_OTHER_ERROR = -1;
 
 /**
@@ -33,8 +33,11 @@ const int FD_OTHER_ERROR = -1;
  * This receives a file descriptor sent over an unix domain socket. This
  * is the counterpart of send_fd().
  *
- * \return FD_COMM_ERROR when there's error receiving the socket, FD_OTHER_ERROR
- *     when there's a different error.
+ * \return FD_SYSTEM_ERROR when there's an error at the operating system
+ * level (such as a system call failure).  The global 'errno' variable
+ * indicates the specific error.  FD_OTHER_ERROR when there's a different
+ * error.
+ *
  * \param sock The unix domain socket to read from. Tested and it does
  *     not work with a pipe.
  */
@@ -45,8 +48,9 @@ int recv_fd(const int sock);
  * This sends a file descriptor over an unix domain socket. This is the
  * counterpart of recv_fd().
  *
- * \return FD_COMM_ERROR when there's error sending the socket, FD_OTHER_ERROR
- *     for all other possible errors.
+ * \return FD_SYSTEM_ERROR when there's an error at the operating system
+ * level (such as a system call failure).  The global 'errno' variable
+ * indicates the specific error.
  * \param sock The unix domain socket to send to. Tested and it does not
  *     work with a pipe.
  * \param fd The file descriptor to send. It should work with any valid

+ 6 - 5
src/lib/util/io/fdshare_python.cc

@@ -67,14 +67,15 @@ PyInit_libutil_io_python(void) {
         return (NULL);
     }
 
-    PyObject* FD_COMM_ERROR = Py_BuildValue("i", isc::util::io::FD_COMM_ERROR);
-    if (FD_COMM_ERROR == NULL) {
+    PyObject* FD_SYSTEM_ERROR = Py_BuildValue("i",
+                                              isc::util::io::FD_SYSTEM_ERROR);
+    if (FD_SYSTEM_ERROR == NULL) {
         Py_XDECREF(mod);
         return (NULL);
     }
-    int ret = PyModule_AddObject(mod, "FD_COMM_ERROR", FD_COMM_ERROR);
-    if (-1 == ret) {
-        Py_XDECREF(FD_COMM_ERROR);
+    int ret = PyModule_AddObject(mod, "FD_SYSTEM_ERROR", FD_SYSTEM_ERROR);
+    if (ret == -1) {
+        Py_XDECREF(FD_SYSTEM_ERROR);
         Py_XDECREF(mod);
         return (NULL);
     }

+ 66 - 0
src/lib/util/io/sockaddr_util.h

@@ -0,0 +1,66 @@
+// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#ifndef __SOCKADDR_UTIL_H_
+#define __SOCKADDR_UTIL_H_ 1
+
+#include <cassert>
+
+// This definitions in this file are for the convenience of internal
+// implementation and test code, and are not intended to be used publicly.
+// The namespace "internal" indicates the intent.
+
+namespace isc {
+namespace util {
+namespace io {
+namespace internal {
+
+inline socklen_t
+getSALength(const struct sockaddr& sa) {
+    if (sa.sa_family == AF_INET) {
+        return (sizeof(struct sockaddr_in));
+    } else {
+        assert(sa.sa_family == AF_INET6);
+        return (sizeof(struct sockaddr_in6));
+    }
+}
+
+// Lower level C-APIs require conversion between various variants of
+// sockaddr's, which is not friendly with C++.  The following templates
+// are a shortcut of common workaround conversion in such cases.
+
+template <typename SA_TYPE>
+const struct sockaddr*
+convertSockAddr(const SA_TYPE* sa) {
+    const void* p = sa;
+    return (static_cast<const struct sockaddr*>(p));
+}
+
+template <typename SA_TYPE>
+struct sockaddr*
+convertSockAddr(SA_TYPE* sa) {
+    void* p = sa;
+    return (static_cast<struct sockaddr*>(p));
+}
+
+}
+}
+}
+}
+
+#endif  // __SOCKADDR_UTIL_H_
+
+// Local Variables:
+// mode: c++
+// End:

+ 393 - 0
src/lib/util/io/socketsession.cc

@@ -0,0 +1,393 @@
+// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <sys/un.h>
+
+#include <netinet/in.h>
+
+#include <errno.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <stdint.h>
+#include <string.h>
+
+#include <cassert>
+#include <string>
+#include <vector>
+
+#include <exceptions/exceptions.h>
+
+#include <util/buffer.h>
+
+#include "fd_share.h"
+#include "socketsession.h"
+#include "sockaddr_util.h"
+
+using namespace std;
+
+namespace isc {
+namespace util {
+namespace io {
+
+using namespace internal;
+
+// The expected max size of the session header: 2-byte header length,
+// 6 32-bit fields, and 2 sockaddr structure. (see the SocketSessionUtility
+// overview description in the header file).  sizeof sockaddr_storage
+// should be the possible max of any sockaddr structure
+const size_t DEFAULT_HEADER_BUFLEN = 2 + sizeof(uint32_t) * 6 +
+    sizeof(struct sockaddr_storage) * 2;
+
+// The allowable maximum size of data passed with the socket FD.  For now
+// we use a fixed value of 65535, the largest possible size of valid DNS
+// messages.  We may enlarge it or make it configurable as we see the need
+// for more flexibility.
+const int MAX_DATASIZE = 65535;
+
+// The initial buffer size for receiving socket session data in the receptor.
+// This value is the maximum message size of DNS messages carried over UDP
+// (without EDNS).  In our expected usage (at the moment) this should be
+// sufficiently large (the expected data is AXFR/IXFR query or an UPDATE
+// requests.  The former should be generally quite small.  While the latter
+// could be large, it would often be small enough for a single UDP message).
+// If it turns out that there are many exceptions, we may want to extend
+// the class so that this value can be customized.  Note that the buffer
+// will be automatically extended for longer data and this is only about
+// efficiency.
+const size_t INITIAL_BUFSIZE = 512;
+
+// The (default) socket buffer size for the forwarder and receptor.  This is
+// chosen to be sufficiently large to store two full-size DNS messages.  We
+// may want to customize this value in future.
+const int SOCKSESSION_BUFSIZE = (DEFAULT_HEADER_BUFLEN + MAX_DATASIZE) * 2;
+
+struct SocketSessionForwarder::ForwarderImpl {
+    ForwarderImpl() : buf_(DEFAULT_HEADER_BUFLEN) {}
+    struct sockaddr_un sock_un_;
+    socklen_t sock_un_len_;
+    int fd_;
+    OutputBuffer buf_;
+};
+
+SocketSessionForwarder::SocketSessionForwarder(const std::string& unix_file) :
+    impl_(NULL)
+{
+    // We need to filter SIGPIPE for subsequent push().  See the class
+    // description.
+    if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
+        isc_throw(Unexpected, "Failed to filter SIGPIPE: " << strerror(errno));
+    }
+
+    ForwarderImpl impl;
+    if (sizeof(impl.sock_un_.sun_path) - 1 < unix_file.length()) {
+        isc_throw(SocketSessionError,
+                  "File name for a UNIX domain socket is too long: " <<
+                  unix_file);
+    }
+    impl.sock_un_.sun_family = AF_UNIX;
+    strncpy(impl.sock_un_.sun_path, unix_file.c_str(),
+            sizeof(impl.sock_un_.sun_path));
+    assert(impl.sock_un_.sun_path[sizeof(impl.sock_un_.sun_path) - 1] == '\0');
+    impl.sock_un_len_ = 2 + unix_file.length();
+#ifdef HAVE_SA_LEN
+    impl.sock_un_.sun_len = sock_un_len_;
+#endif
+    impl.fd_ = -1;
+
+    impl_ = new ForwarderImpl;
+    *impl_ = impl;
+}
+
+SocketSessionForwarder::~SocketSessionForwarder() {
+    if (impl_->fd_ != -1) {
+        close();
+    }
+    delete impl_;
+}
+
+void
+SocketSessionForwarder::connectToReceptor() {
+    if (impl_->fd_ != -1) {
+        isc_throw(BadValue, "Duplicate connect to UNIX domain "
+                  "endpoint " << impl_->sock_un_.sun_path);
+    }
+
+    impl_->fd_ = socket(AF_UNIX, SOCK_STREAM, 0);
+    if (impl_->fd_ == -1) {
+        isc_throw(SocketSessionError, "Failed to create a UNIX domain socket: "
+                  << strerror(errno));
+    }
+    // Make the socket non blocking
+    int fcntl_flags = fcntl(impl_->fd_, F_GETFL, 0);
+    if (fcntl_flags != -1) {
+        fcntl_flags |= O_NONBLOCK;
+        fcntl_flags = fcntl(impl_->fd_, F_SETFL, fcntl_flags);
+    }
+    if (fcntl_flags == -1) {
+        close();   // note: this is the internal method, not ::close()
+        isc_throw(SocketSessionError,
+                  "Failed to make UNIX domain socket non blocking: " <<
+                  strerror(errno));
+    }
+    if (setsockopt(impl_->fd_, SOL_SOCKET, SO_SNDBUF, &SOCKSESSION_BUFSIZE,
+                   sizeof(SOCKSESSION_BUFSIZE)) == -1) {
+        close();
+        isc_throw(SocketSessionError, "Failed to set send buffer size");
+    }
+    if (connect(impl_->fd_, convertSockAddr(&impl_->sock_un_),
+                impl_->sock_un_len_) == -1) {
+        close();
+        isc_throw(SocketSessionError, "Failed to connect to UNIX domain "
+                  "endpoint " << impl_->sock_un_.sun_path << ": " <<
+                  strerror(errno));
+    }
+}
+
+void
+SocketSessionForwarder::close() {
+    if (impl_->fd_ == -1) {
+        isc_throw(BadValue, "Attempt of close before connect");
+    }
+    ::close(impl_->fd_);
+    impl_->fd_ = -1;
+}
+
+void
+SocketSessionForwarder::push(int sock, int family, int type, int protocol,
+                             const struct sockaddr& local_end,
+                             const struct sockaddr& remote_end,
+                             const void* data, size_t data_len)
+{
+    if (impl_->fd_ == -1) {
+        isc_throw(BadValue, "Attempt of push before connect");
+    }
+    if ((local_end.sa_family != AF_INET && local_end.sa_family != AF_INET6) ||
+        (remote_end.sa_family != AF_INET && remote_end.sa_family != AF_INET6))
+    {
+        isc_throw(BadValue, "Invalid address family: must be "
+                  "AF_INET or AF_INET6; " <<
+                  static_cast<int>(local_end.sa_family) << ", " <<
+                  static_cast<int>(remote_end.sa_family) << " given");
+    }
+    if (family != local_end.sa_family || family != remote_end.sa_family) {
+        isc_throw(BadValue, "Inconsistent address family: must be "
+                  << static_cast<int>(family) << "; "
+                  << static_cast<int>(local_end.sa_family) << ", "
+                  << static_cast<int>(remote_end.sa_family) << " given");
+    }
+    if (data_len == 0 || data == NULL) {
+        isc_throw(BadValue, "Data for a socket session must not be empty");
+    }
+    if (data_len > MAX_DATASIZE) {
+        isc_throw(BadValue, "Invalid socket session data size: " <<
+                  data_len << ", must not exceed " << MAX_DATASIZE);
+    }
+
+    if (send_fd(impl_->fd_, sock) != 0) {
+        isc_throw(SocketSessionError, "FD passing failed: " <<
+                  strerror(errno));
+    }
+
+    impl_->buf_.clear();
+    // Leave the space for the header length
+    impl_->buf_.skip(sizeof(uint16_t));
+    // Socket properties: family, type, protocol
+    impl_->buf_.writeUint32(static_cast<uint32_t>(family));
+    impl_->buf_.writeUint32(static_cast<uint32_t>(type));
+    impl_->buf_.writeUint32(static_cast<uint32_t>(protocol));
+    // Local endpoint
+    impl_->buf_.writeUint32(static_cast<uint32_t>(getSALength(local_end)));
+    impl_->buf_.writeData(&local_end, getSALength(local_end));
+    // Remote endpoint
+    impl_->buf_.writeUint32(static_cast<uint32_t>(getSALength(remote_end)));
+    impl_->buf_.writeData(&remote_end, getSALength(remote_end));
+    // Data length.  Must be fit uint32 due to the range check above.
+    const uint32_t data_len32 = static_cast<uint32_t>(data_len);
+    assert(data_len == data_len32); // shouldn't cause overflow.
+    impl_->buf_.writeUint32(data_len32);
+    // Write the resulting header length at the beginning of the buffer
+    impl_->buf_.writeUint16At(impl_->buf_.getLength() - sizeof(uint16_t), 0);
+
+    const struct iovec iov[2] = {
+        { const_cast<void*>(impl_->buf_.getData()), impl_->buf_.getLength() },
+        { const_cast<void*>(data), data_len }
+    };
+    const int cc = writev(impl_->fd_, iov, 2);
+    if (cc != impl_->buf_.getLength() + data_len) {
+        if (cc < 0) {
+            isc_throw(SocketSessionError,
+                      "Write failed in forwarding a socket session: " <<
+                      strerror(errno));
+        }
+        isc_throw(SocketSessionError,
+                  "Incomplete write in forwarding a socket session: " << cc <<
+                  "/" << (impl_->buf_.getLength() + data_len));
+    }
+}
+
+SocketSession::SocketSession(int sock, int family, int type, int protocol,
+                             const sockaddr* local_end,
+                             const sockaddr* remote_end,
+                             const void* data, size_t data_len) :
+    sock_(sock), family_(family), type_(type), protocol_(protocol),
+    local_end_(local_end), remote_end_(remote_end),
+    data_(data), data_len_(data_len)
+{
+    if (local_end == NULL || remote_end == NULL) {
+        isc_throw(BadValue, "sockaddr must be non NULL for SocketSession");
+    }
+    if (data_len == 0) {
+        isc_throw(BadValue, "data_len must be non 0 for SocketSession");
+    }
+    if (data == NULL) {
+        isc_throw(BadValue, "data must be non NULL for SocketSession");
+    }
+}
+
+struct SocketSessionReceptor::ReceptorImpl {
+    ReceptorImpl(int fd) : fd_(fd),
+                           sa_local_(convertSockAddr(&ss_local_)),
+                           sa_remote_(convertSockAddr(&ss_remote_)),
+                           header_buf_(DEFAULT_HEADER_BUFLEN),
+                           data_buf_(INITIAL_BUFSIZE)
+    {
+        if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &SOCKSESSION_BUFSIZE,
+                       sizeof(SOCKSESSION_BUFSIZE)) == -1) {
+            isc_throw(SocketSessionError,
+                      "Failed to set receive buffer size");
+        }
+    }
+
+    const int fd_;
+    struct sockaddr_storage ss_local_; // placeholder for local endpoint
+    struct sockaddr* const sa_local_;
+    struct sockaddr_storage ss_remote_; // placeholder for remote endpoint
+    struct sockaddr* const sa_remote_;
+
+    // placeholder for session header and data
+    vector<uint8_t> header_buf_;
+    vector<uint8_t> data_buf_;
+};
+
+SocketSessionReceptor::SocketSessionReceptor(int fd) :
+    impl_(new ReceptorImpl(fd))
+{
+}
+
+SocketSessionReceptor::~SocketSessionReceptor() {
+    delete impl_;
+}
+
+namespace {
+// A shortcut to throw common exception on failure of recv(2)
+void
+readFail(int actual_len, int expected_len) {
+    if (expected_len < 0) {
+        isc_throw(SocketSessionError, "Failed to receive data from "
+                  "SocketSessionForwarder: " << strerror(errno));
+    }
+    isc_throw(SocketSessionError, "Incomplete data from "
+              "SocketSessionForwarder: " << actual_len << "/" <<
+              expected_len);
+}
+}
+
+SocketSession
+SocketSessionReceptor::pop() {
+    const int passed_fd = recv_fd(impl_->fd_);
+    if (passed_fd == FD_SYSTEM_ERROR) {
+        isc_throw(SocketSessionError, "Receiving a forwarded FD failed: " <<
+                  strerror(errno));
+    } else if (passed_fd < 0) {
+        isc_throw(SocketSessionError, "No FD forwarded");
+    }
+
+    uint16_t header_len;
+    const int cc_hlen = recv(impl_->fd_, &header_len, sizeof(header_len),
+                        MSG_WAITALL);
+    if (cc_hlen < sizeof(header_len)) {
+        readFail(cc_hlen, sizeof(header_len));
+    }
+    header_len = InputBuffer(&header_len, sizeof(header_len)).readUint16();
+    if (header_len > DEFAULT_HEADER_BUFLEN) {
+        isc_throw(SocketSessionError, "Too large header length: " <<
+                  header_len);
+    }
+    impl_->header_buf_.clear();
+    impl_->header_buf_.resize(header_len);
+    const int cc_hdr = recv(impl_->fd_, &impl_->header_buf_[0], header_len,
+                            MSG_WAITALL);
+    if (cc_hdr < header_len) {
+        readFail(cc_hdr, header_len);
+    }
+
+    InputBuffer ibuffer(&impl_->header_buf_[0], header_len);
+    try {
+        const int family = static_cast<int>(ibuffer.readUint32());
+        if (family != AF_INET && family != AF_INET6) {
+            isc_throw(SocketSessionError,
+                      "Unsupported address family is passed: " << family);
+        }
+        const int type = static_cast<int>(ibuffer.readUint32());
+        const int protocol = static_cast<int>(ibuffer.readUint32());
+        const socklen_t local_end_len = ibuffer.readUint32();
+        if (local_end_len > sizeof(impl_->ss_local_)) {
+            isc_throw(SocketSessionError, "Local SA length too large: " <<
+                      local_end_len);
+        }
+        ibuffer.readData(&impl_->ss_local_, local_end_len);
+        const socklen_t remote_end_len = ibuffer.readUint32();
+        if (remote_end_len > sizeof(impl_->ss_remote_)) {
+            isc_throw(SocketSessionError, "Remote SA length too large: " <<
+                      remote_end_len);
+        }
+        ibuffer.readData(&impl_->ss_remote_, remote_end_len);
+        if (family != impl_->sa_local_->sa_family) {
+            isc_throw(SocketSessionError, "SA family inconsistent: " <<
+                      static_cast<int>(impl_->sa_local_->sa_family) << ", " <<
+                      static_cast<int>(impl_->sa_remote_->sa_family) <<
+                      " given, must be " << family);
+        }
+        const size_t data_len = ibuffer.readUint32();
+        if (data_len == 0 || data_len > MAX_DATASIZE) {
+            isc_throw(SocketSessionError,
+                      "Invalid socket session data size: " << data_len <<
+                      ", must be > 0 and <= " << MAX_DATASIZE);
+        }
+
+        impl_->data_buf_.clear();
+        impl_->data_buf_.resize(data_len);
+        const int cc_data = recv(impl_->fd_, &impl_->data_buf_[0], data_len,
+                                 MSG_WAITALL);
+        if (cc_data < data_len) {
+            readFail(cc_data, data_len);
+        }
+
+        return (SocketSession(passed_fd, family, type, protocol,
+                              impl_->sa_local_, impl_->sa_remote_,
+                              &impl_->data_buf_[0], data_len));
+    } catch (const InvalidBufferPosition& ex) {
+        // We catch the case where the given header is too short and convert
+        // the exception to SocketSessionError.
+        isc_throw(SocketSessionError, "bogus socket session header: " <<
+                  ex.what());
+    }
+}
+
+}
+}
+}

+ 458 - 0
src/lib/util/io/socketsession.h

@@ -0,0 +1,458 @@
+// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#ifndef __SOCKETSESSION_H_
+#define __SOCKETSESSION_H_ 1
+
+#include <string>
+
+#include <boost/noncopyable.hpp>
+
+#include <exceptions/exceptions.h>
+
+namespace isc {
+namespace util {
+namespace io {
+
+/// \page SocketSessionUtility Socket session utility
+///
+/// This utility defines a set of classes that support forwarding a
+/// "socket session" from one process to another.  A socket session is a
+/// conceptual tuple of the following elements:
+/// - A network socket
+/// - The local and remote endpoints of a (IP) communication taking place on
+///   the socket.  In practice an endpoint is a pair of an IP address and
+///   TCP or UDP port number.
+/// - Some amount of data sent from the remote endpoint and received on the
+///   socket.  We call it (socket) session data in this documentation.
+///
+/// Note that this is a conceptual definition.  Depending on the underlying
+/// implementation and/or the network protocol, some of the elements could be
+/// part of others; for example, if it's an established TCP connection,
+/// the local and remote endpoints would be able to be retrieved from the
+/// socket using the standard \c getsockname() and \c getpeername() system
+/// calls.  But in this definition we separate these to be more generic.
+/// Also, as a matter of fact our intended usage includes non-connected UDP
+/// communications, in which case at least the remote endpoint should be
+/// provided separately from the socket.
+///
+/// In the actual implementation we represent a socket as a tuple of
+/// socket's file descriptor, address family (e.g. \c AF_INET6),
+/// socket type (e.g. \c SOCK_STREAM), and protocol (e.g. \c IPPROTO_TCP).
+/// The latter three are included in the representation of a socket in order
+/// to provide complete information of how the socket would be created
+/// by the \c socket(2) system call.  More specifically in practice, these
+/// parameters could be used to construct a Python socket object from the
+/// file descriptor.
+///
+/// We use the standard \c sockaddr structure to represent endpoints.
+///
+/// Socket session data is an opaque memory region of an arbitrary length
+/// (possibly with some reasonable upper limit).
+///
+/// To forward a socket session between processes, we use connected UNIX
+/// domain sockets established between the processes.  The file descriptor
+/// will be forwarded through the sockets as an ancillary data item of
+/// type \c SCM_RIGHTS.  Other elements of the session will be transferred
+/// as normal data over the connection.
+///
+/// We provide three classes to help applications forward socket sessions:
+/// \c SocketSessionForwarder is the sender of the UNIX domain connection,
+/// while \c SocketSessionReceptor is the receiver (this interface assumes
+/// one direction of forwarding); \c SocketSession represents a single
+/// socket session.
+///
+/// \c SocketSessionForwarder and \c SocketSessionReceptor objects use a
+/// straightforward protocol to pass elements of socket sessions.
+/// Once the connection is established, the forwarder object first forwards
+/// the file descriptor with 1-byte dummy data.  It then forwards a
+/// "(socket) session header", which contains all other elements of the session
+/// except the file descriptor (already forwarded) and session data.
+/// The wire format of the header is as follows:
+/// - The length of the header (16-bit unsigned integer)
+/// - Address family
+/// - Socket type
+/// - Protocol
+/// - Size of the local endpoint in bytes
+/// - Local endpoint (a copy of the memory image of the corresponding
+///   \c sockaddr)
+/// - Size of the remote endpoint in bytes
+/// - Remote endpoint (same as local endpoint)
+/// - Size of session data in bytes
+///
+/// The type of the fields is 32-bit unsigned integer unless explicitly
+/// noted, and all fields are formatted in the network byte order.
+///
+/// The socket session data immediately follows the session header.
+///
+/// Note that the fields do not necessarily be in the network byte order
+/// because they are expected to be exchanged on the same machine.  Likewise,
+/// integer elements such as address family do not necessarily be represented
+/// as an fixed-size value (i.e., 32-bit).  But fixed size fields are used
+/// in order to ensure maximum portability in such a (rare) case where the
+/// forwarder and the receptor are built with different compilers that have
+/// different definitions of \c int.  Also, since \c sockaddr fields are
+/// generally formatted in the network byte order, other fields are defined
+/// so to be consistent.
+///
+/// One basic assumption in the API of this utility is socket sessions should
+/// be forwarded without blocking, thus eliminating the need for incremental
+/// read/write or blocking other important services such as responding to
+/// requests from the application's clients.  This assumption should be held
+/// as long as both the forwarder and receptor have sufficient resources
+/// to handle the forwarding process since the communication is local.
+/// But a forward attempt could still block if the receptor is busy (or even
+/// hang up) and cannot keep up with the volume of incoming sessions.
+///
+/// So, in this implementation, the forwarder uses non blocking writes to
+/// forward sessions.  If a write attempt could block, it immediately gives
+/// up the operation with an exception.  The corresponding application is
+/// expected to catch it, close the connection, and perform any necessary
+/// recovery for that application (that would normally be re-establish the
+/// connection with a new receptor, possibly after confirming the receiving
+/// side is still alive).  On the other hand, the receptor implementation
+/// assumes it's possible that it only receive incomplete elements of a
+/// session (such as in the case where the forwarder writes part of the
+/// entire session and gives up the connection).  The receptor implementation
+/// throws an exception when it encounters an incomplete session.  Like the
+/// case of the forwarder application, the receptor application is expected
+/// to catch it, close the connection, and perform any necessary recovery
+/// steps.
+///
+/// Note that the receptor implementation uses blocking read.  So it's
+/// application's responsibility to ensure that there's at least some data
+/// in the connection when the receptor object is requested to receive a
+/// session (unless this operation can be blocking, e.g., by the use of
+/// a separate thread).  Also, if the forwarder implementation or application
+/// is malicious or extremely buggy and intentionally sends partial session
+/// and keeps the connection, the receptor could block in receiving a session.
+/// In general, we assume the forwarder doesn't do intentional blocking
+/// as it's a local node and is generally a module of the same (BIND 10)
+/// system.  The minimum requirement for the forwarder implementation (and
+/// application) is to make sure the connection is closed once it detects
+/// an error on it.  Even a naive implementation that simply dies due to
+/// the exception will meet this requirement.
+
+/// An exception indicating general errors that takes place in the
+/// socket session related class objects.
+///
+/// In general the errors are unusual but possible failures such as unexpected
+/// connection reset, and suggest the application to close the connection and
+/// (if necessary) reestablish it.
+class SocketSessionError: public Exception {
+public:
+    SocketSessionError(const char *file, size_t line, const char *what):
+        isc::Exception(file, line, what) {}
+};
+
+/// The forwarder of socket sessions
+///
+/// An object of this class maintains a UNIX domain socket (normally expected
+/// to be connected to a \c SocketSessionReceptor object) and forwards
+/// socket sessions to the receptor.
+///
+/// See the description of \ref SocketSessionUtility for other details of how
+/// the session forwarding works.
+class SocketSessionForwarder : boost::noncopyable {
+public:
+    /// The constructor.
+    ///
+    /// It's constructed with path information of the intended receptor,
+    /// but does not immediately establish a connection to the receptor;
+    /// \c connectToReceptor() must be called to establish it.  These are
+    /// separated so that an object of class can be initialized (possibly
+    /// as an attribute of a higher level application class object) without
+    /// knowing the receptor is ready for accepting new forwarders.  The
+    /// separate connect interface allows the object to be reused when it
+    /// detects connection failure and tries to re-establish it after closing
+    /// the failed one.
+    ///
+    /// On construction, it also installs a signal filter for SIGPIPE to
+    /// ignore it.  Since this class uses a stream-type connected UNIX domain
+    /// socket, if the receptor (abruptly) closes the connection a subsequent
+    /// write operation on the socket would trigger a SIGPIPE signal, which
+    /// kills the caller process by default.   This behavior would be
+    /// undesirable in many cases, so this implementation always disables
+    /// the signal.
+    ///
+    /// This approach has some drawbacks, however; first, since signal handling
+    /// is process (or thread) wide, ignoring it may not what the application
+    /// wants.  On the other hand, if the application changes how the signal is
+    /// handled after instantiating this class, the new behavior affects the
+    /// class operation.  Secondly, even if ignoring the signal is the desired
+    /// operation, it's a waste to set the filter every time this class object
+    /// is constructed.  It's sufficient to do it once.  We still adopt this
+    /// behavior based on the observation that in most cases applications would
+    /// like to ignore SIGPIPE (or simply doesn't care about it) and that this
+    /// class is not instantiated so often (so the wasteful setting overhead
+    /// should be marginal).  On the other hand, doing it every time is
+    /// beneficial if the application is threaded and different threads
+    /// create different forwarder objects (and if signals work per thread).
+    ///
+    /// \exception SocketSessionError \c unix_file is invalid as a path name
+    /// of a UNIX domain socket.
+    /// \exception Unexpected Error in setting a filter for SIGPIPE (see above)
+    /// \exception std::bad_alloc resource allocation failure
+    ///
+    /// \param unix_file Path name of the receptor.
+    explicit SocketSessionForwarder(const std::string& unix_file);
+
+    /// The destructor.
+    ///
+    /// If a connection has been established, it's automatically closed in
+    /// the destructor.
+    ~SocketSessionForwarder();
+
+    /// Establish a connection to the receptor.
+    ///
+    /// This method establishes a connection to the receptor at the path
+    /// given on construction.  It makes the underlying UNIX domain socket
+    /// non blocking, so this method (or subsequent \c push() calls) does not
+    /// block.
+    ///
+    /// \exception BadValue The method is called while an already
+    /// established connection is still active.
+    /// \exception SocketSessionError A system error in socket operation.
+    void connectToReceptor();
+
+    /// Close the connection to the receptor.
+    ///
+    /// The connection must have been established by \c connectToReceptor().
+    /// As long as it's met this method is exception free.
+    ///
+    /// \exception BadValue The connection hasn't been established.
+    void close();
+
+    /// Forward a socket session to the receptor.
+    ///
+    /// This method takes a set of parameters that represent a single socket
+    /// session, renders them in the "wire" format according to the internal
+    /// protocol (see \ref SocketSessionUtility) and forwards them to
+    /// the receptor through the UNIX domain connection.
+    ///
+    /// The connection must have been established by \c connectToReceptor().
+    ///
+    /// For simplicity and for the convenience of detecting application
+    /// errors, this method imposes some restrictions on the parameters:
+    /// - Socket family must be either \c AF_INET or \c AF_INET6
+    /// - The address family (\c sa_family) member of the local and remote
+    ///   end points must be equal to the \c family parameter
+    /// - Socket session data must not be empty (\c data_len must not be 0
+    ///   and \c data must not be NULL)
+    /// - Data length must not exceed 65535
+    /// These are not architectural limitation, and might be loosened in
+    /// future versions as we see the need for flexibility.
+    ///
+    /// Since the underlying UNIX domain socket is non blocking
+    /// (see the description for the constructor), a call to this method
+    /// should either return immediately or result in exception (in case of
+    /// "would block").
+    ///
+    /// \exception BadValue The method is called before establishing a
+    /// connection or given parameters are invalid.
+    /// \exception SocketSessionError A system error in socket operation,
+    /// including the case where the write operation would block.
+    ///
+    /// \param sock The socket file descriptor
+    /// \param family The address family (such as AF_INET6) of the socket
+    /// \param type The socket type (such as SOCK_DGRAM) of the socket
+    /// \param protocol The transport protocol (such as IPPROTO_UDP) of the
+    ///        socket
+    /// \param local_end The local end point of the session in the form of
+    ///        \c sockaddr.
+    /// \param remote_end The remote end point of the session in the form of
+    ///        \c sockaddr.
+    /// \param data A pointer to the beginning of the memory region for the
+    ///             session data
+    /// \param data_len The size of the session data in bytes.
+    void push(int sock, int family, int type, int protocol,
+              const struct sockaddr& local_end,
+              const struct sockaddr& remote_end,
+              const void* data, size_t data_len);
+
+private:
+    struct ForwarderImpl;
+    ForwarderImpl* impl_;
+};
+
+/// Socket session object.
+///
+/// The \c SocketSession class provides a convenient encapsulation
+/// for the notion of a socket session.  It's instantiated with straightforward
+/// parameters corresponding to a socket session, and provides read only
+/// accessors to the parameters to ensure data integrity.
+///
+/// In the initial design and implementation it's only used as a return type
+/// of \c SocketSessionReceptor::pop(), but it could also be used by
+/// the \c SocketSessionForwarder class or for other purposes.
+///
+/// It is assumed that the original owner of a \c SocketSession object
+/// (e.g. a class or a function that constructs it) is responsible for validity
+/// of the data passed to the object.  See the description of
+/// \c SocketSessionReceptor::pop() for the specific case of that usage.
+class SocketSession {
+public:
+    /// The constructor.
+    ///
+    /// This is a trivial constructor, taking a straightforward representation
+    /// of session parameters and storing them internally to ensure integrity.
+    ///
+    /// As long as the given parameters are valid it never throws an exception.
+    ///
+    /// \exception BadValue Given parameters don't meet the requirement
+    /// (see the parameter descriptions).
+    ///
+    /// \param sock The socket file descriptor
+    /// \param family The address family (such as AF_INET6) of the socket
+    /// \param type The socket type (such as SOCK_DGRAM) of the socket
+    /// \param protocol The transport protocol (such as IPPROTO_UDP) of the
+    ///        socket.
+    /// \param local_end The local end point of the session in the form of
+    ///        \c sockaddr.  Must not be NULL.
+    /// \param remote_end The remote end point of the session in the form of
+    ///        \c sockaddr.  Must not be NULL.
+    /// \param data A pointer to the beginning of the memory region for the
+    /// session data.  Must not be NULL, and the subsequent \c data_len bytes
+    /// must be valid.
+    /// \param data_len The size of the session data in bytes.  Must not be 0.
+    SocketSession(int sock, int family, int type, int protocol,
+                  const sockaddr* local_end, const sockaddr* remote_end,
+                  const void* data, size_t data_len);
+
+    /// Return the socket file descriptor.
+    int getSocket() const { return (sock_); }
+
+    /// Return the address family (such as AF_INET6) of the socket.
+    int getFamily() const { return (family_); }
+
+    /// Return the socket type (such as SOCK_DGRAM) of the socket.
+    int getType() const { return (type_); }
+
+    /// Return the transport protocol (such as IPPROTO_UDP) of the socket.
+    int getProtocol() const { return (protocol_); }
+
+    /// Return the local end point of the session in the form of \c sockaddr.
+    const sockaddr& getLocalEndpoint() const { return (*local_end_); }
+
+    /// Return the remote end point of the session in the form of \c sockaddr.
+    const sockaddr& getRemoteEndpoint() const { return (*remote_end_); }
+
+    /// Return a pointer to the beginning of the memory region for the session
+    /// data.
+    ///
+    /// In the current implementation it should never be NULL, and the region
+    /// of the size returned by \c getDataLength() is expected to be valid.
+    const void* getData() const { return (data_); }
+
+    /// Return the size of the session data in bytes.
+    ///
+    /// In the current implementation it should be always larger than 0.
+    size_t getDataLength() const { return (data_len_); }
+
+private:
+    const int sock_;
+    const int family_;
+    const int type_;
+    const int protocol_;
+    const sockaddr* local_end_;
+    const sockaddr* remote_end_;
+    const void* const data_;
+    const size_t data_len_;
+};
+
+/// The receiver of socket sessions
+///
+/// An object of this class holds a UNIX domain socket for an
+/// <em>established connection</em>, receives socket sessions from
+/// the remote forwarder, and provides the session to the application
+/// in the form of a \c SocketSession object.
+///
+/// Note that this class is instantiated with an already connected socket;
+/// it's not a listening socket that is accepting connection requests from
+/// forwarders.  It's application's responsibility to create the listening
+/// socket, listen on it and accept connections.  Once the connection is
+/// established, the application would construct a \c SocketSessionReceptor
+/// object with the socket for the newly established connection.
+/// This behavior is based on the design decision that the application should
+/// decide when it performs (possibly) blocking operations (see \ref
+/// SocketSessionUtility for more details).
+///
+/// See the description of \ref SocketSessionUtility for other details of how
+/// the session forwarding works.
+class SocketSessionReceptor : boost::noncopyable {
+public:
+    /// The constructor.
+    ///
+    /// \exception SocketSessionError Any error on an operation that is
+    /// performed on the given socket as part of initialization.
+    /// \exception std::bad_alloc Resource allocation failure
+    ///
+    /// \param fd A UNIX domain socket for an established connection with
+    /// a forwarder.
+    explicit SocketSessionReceptor(int fd);
+
+    /// The destructor.
+    ///
+    /// The destructor does \c not close the socket given on construction.
+    /// It's up to the application what to do with it (note that the
+    /// application would have to maintain the socket itself for detecting
+    /// the existence of a new socket session asynchronously).
+    ~SocketSessionReceptor();
+
+    /// Receive a socket session from the forwarder.
+    ///
+    /// This method receives wire-format data (see \ref SocketSessionUtility)
+    /// for a socket session on the UNIX domain socket, performs some
+    /// validation on the data, and returns the session information in the
+    /// form of a \c SocketSession object.
+    ///
+    /// The returned SocketSession object is valid only until the next time
+    /// this method is called or until the \c SocketSessionReceptor object is
+    /// destructed.
+    ///
+    /// It ensures the following:
+    /// - The address family is either \c AF_INET or \c AF_INET6
+    /// - The address family (\c sa_family) member of the local and remote
+    ///   end points must be equal to the \c family parameter
+    /// - The socket session data is not empty and does not exceed 65535
+    ///   bytes.
+    /// If the validation fails or an unexpected system error happens
+    /// (including a connection close in the meddle of reception), it throws
+    /// an SocketSessionError exception.  When this happens, it's very
+    /// unlikely that a subsequent call to this method succeeds, so in reality
+    /// the application is expected to destruct it and close the socket in
+    /// such a case.
+    ///
+    /// \exception SocketSessionError Invalid data is received or a system
+    /// error on socket operation happens.
+    /// \exception std::bad_alloc Resource allocation failure
+    ///
+    /// \return A \c SocketSession object corresponding to the extracted
+    /// socket session.
+    SocketSession pop();
+
+private:
+    struct ReceptorImpl;
+    ReceptorImpl* impl_;
+};
+
+}
+}
+}
+
+#endif  // __SOCKETSESSION_H_
+
+// Local Variables:
+// mode: c++
+// End:

+ 2 - 0
src/lib/util/tests/Makefile.am

@@ -2,6 +2,7 @@ SUBDIRS = .
 
 AM_CPPFLAGS = -I$(top_builddir)/src/lib -I$(top_srcdir)/src/lib
 AM_CPPFLAGS += $(BOOST_INCLUDES)
+AM_CPPFLAGS += -DTEST_DATA_BUILDDIR=\"$(abs_builddir)\"
 AM_CXXFLAGS = $(B10_CXXFLAGS)
 
 if USE_STATIC_LINK
@@ -26,6 +27,7 @@ run_unittests_SOURCES += lru_list_unittest.cc
 run_unittests_SOURCES += qid_gen_unittest.cc
 run_unittests_SOURCES += random_number_generator_unittest.cc
 run_unittests_SOURCES += sha1_unittest.cc
+run_unittests_SOURCES += socketsession_unittest.cc
 run_unittests_SOURCES += strutil_unittest.cc
 run_unittests_SOURCES += time_utilities_unittest.cc
 

+ 803 - 0
src/lib/util/tests/socketsession_unittest.cc

@@ -0,0 +1,803 @@
+// Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include <errno.h>
+#include <fcntl.h>
+#include <string.h>
+#include <netdb.h>
+#include <unistd.h>
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <boost/noncopyable.hpp>
+#include <boost/scoped_ptr.hpp>
+
+#include <gtest/gtest.h>
+
+#include <exceptions/exceptions.h>
+
+#include <util/buffer.h>
+#include <util/io/fd_share.h>
+#include <util/io/socketsession.h>
+#include <util/io/sockaddr_util.h>
+
+using namespace std;
+using namespace isc;
+using boost::scoped_ptr;
+using namespace isc::util::io;
+using namespace isc::util::io::internal;
+
+namespace {
+
+const char* const TEST_UNIX_FILE = TEST_DATA_BUILDDIR "/test.unix";
+const char* const TEST_PORT = "53535";
+const char TEST_DATA[] = "BIND10 test";
+
+// A simple helper structure to automatically close test sockets on return
+// or exception in a RAII manner.  non copyable to prevent duplicate close.
+struct ScopedSocket : boost::noncopyable {
+    ScopedSocket() : fd(-1) {}
+    ScopedSocket(int sock) : fd(sock) {}
+    ~ScopedSocket() {
+        closeSocket();
+    }
+    void reset(int sock) {
+        closeSocket();
+        fd = sock;
+    }
+    int fd;
+private:
+    void closeSocket() {
+        if (fd >= 0) {
+            close(fd);
+        }
+    }
+};
+
+// A helper function that makes a test socket non block so that a certain
+// kind of test failure (such as missing send) won't cause hangup.
+void
+setNonBlock(int s, bool on) {
+    int fcntl_flags = fcntl(s, F_GETFL, 0);
+    if (on) {
+        fcntl_flags |= O_NONBLOCK;
+    } else {
+        fcntl_flags &= ~O_NONBLOCK;
+    }
+    if (fcntl(s, F_SETFL, fcntl_flags) == -1) {
+        isc_throw(isc::Unexpected, "fcntl(O_NONBLOCK) failed: " <<
+                  strerror(errno));
+    }
+}
+
+// A helper to impose some reasonable amount of wait on recv(from)
+// if possible.  It returns an option flag to be set for the system call
+// (when necessary).
+int
+setRecvDelay(int s) {
+    const struct timeval timeo = { 10, 0 };
+    if (setsockopt(s, SOL_SOCKET, SO_RCVTIMEO, &timeo, sizeof(timeo)) == -1) {
+        if (errno == ENOPROTOOPT) {
+            // Workaround for Solaris: see recursive_query_unittest
+            return (MSG_DONTWAIT);
+        } else {
+            isc_throw(isc::Unexpected, "set RCVTIMEO failed: " <<
+                      strerror(errno));
+        }
+    }
+    return (0);
+}
+
+// A shortcut type that is convenient to be used for socket related
+// system calls, which generally require this pair
+typedef pair<const struct sockaddr*, socklen_t> SockAddrInfo;
+
+// A helper class to convert textual representation of IP address and port
+// to a pair of sockaddr and its length (in the form of a SockAddrInfo
+// pair).  Its get method uses getaddrinfo(3) for the conversion and stores
+// the result in the addrinfo_list_ vector until the object is destructed.
+// The allocated resources will be automatically freed in an RAII manner.
+class SockAddrCreator {
+public:
+    ~SockAddrCreator() {
+        vector<struct addrinfo*>::const_iterator it;
+        for (it = addrinfo_list_.begin(); it != addrinfo_list_.end(); ++it) {
+            freeaddrinfo(*it);
+        }
+    }
+    SockAddrInfo get(const string& addr_str, const string& port_str) {
+        struct addrinfo hints, *res;
+        memset(&hints, 0, sizeof(hints));
+        hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
+        hints.ai_family = AF_UNSPEC;
+        hints.ai_socktype = SOCK_DGRAM; // could be either DGRAM or STREAM here
+        const int error = getaddrinfo(addr_str.c_str(), port_str.c_str(),
+                                      &hints, &res);
+        if (error != 0) {
+            isc_throw(isc::Unexpected, "getaddrinfo failed for " <<
+                      addr_str << ", " << port_str << ": " <<
+                      gai_strerror(error));
+        }
+
+        // Technically, this is not entirely exception safe; if push_back
+        // throws, the resources allocated for 'res' will leak.  We prefer
+        // brevity here and ignore the minor failure mode.
+        addrinfo_list_.push_back(res);
+
+        return (SockAddrInfo(res->ai_addr, res->ai_addrlen));
+    }
+private:
+    vector<struct addrinfo*> addrinfo_list_;
+};
+
+class ForwardTest : public ::testing::Test {
+protected:
+    ForwardTest() : listen_fd_(-1), forwarder_(TEST_UNIX_FILE),
+                    large_text_(65535, 'a'),
+                    test_un_len_(2 + strlen(TEST_UNIX_FILE))
+    {
+        unlink(TEST_UNIX_FILE);
+        test_un_.sun_family = AF_UNIX;
+        strncpy(test_un_.sun_path, TEST_UNIX_FILE, sizeof(test_un_.sun_path));
+#ifdef HAVE_SA_LEN
+        test_un_.sun_len = test_un_len_;
+#endif
+    }
+
+    ~ForwardTest() {
+        if (listen_fd_ != -1) {
+            close(listen_fd_);
+        }
+        unlink(TEST_UNIX_FILE);
+    }
+
+    // Start an internal "socket session server".
+    void startListen() {
+        if (listen_fd_ != -1) {
+            isc_throw(isc::Unexpected, "duplicate call to startListen()");
+        }
+        listen_fd_ = socket(AF_UNIX, SOCK_STREAM, 0);
+        if (listen_fd_ == -1) {
+            isc_throw(isc::Unexpected, "failed to create UNIX domain socket" <<
+                      strerror(errno));
+        }
+        if (bind(listen_fd_, convertSockAddr(&test_un_), test_un_len_) == -1) {
+            isc_throw(isc::Unexpected, "failed to bind UNIX domain socket" <<
+                      strerror(errno));
+        }
+        // 10 is an arbitrary choice, should be sufficient for a single test
+        if (listen(listen_fd_, 10) == -1) {
+            isc_throw(isc::Unexpected, "failed to listen on UNIX domain socket"
+                      << strerror(errno));
+        }
+    }
+
+    int dummyConnect() const {
+        const int s = socket(AF_UNIX, SOCK_STREAM, 0);
+        if (s == -1) {
+            isc_throw(isc::Unexpected,
+                      "failed to create a test UNIX domain socket");
+        }
+        setNonBlock(s, true);
+        if (connect(s, convertSockAddr(&test_un_), sizeof(test_un_)) == -1) {
+            isc_throw(isc::Unexpected,
+                      "failed to connect to the test SocketSessionForwarder");
+        }
+        return (s);
+    }
+
+    // Accept a new connection from a SocketSessionForwarder and return
+    // the socket FD of the new connection.  This assumes startListen()
+    // has been called.
+    int acceptForwarder() {
+        setNonBlock(listen_fd_, true); // prevent the test from hanging up
+        struct sockaddr_un from;
+        socklen_t from_len = sizeof(from);
+        const int s = accept(listen_fd_, convertSockAddr(&from), &from_len);
+        if (s == -1) {
+            isc_throw(isc::Unexpected, "accept failed: " << strerror(errno));
+        }
+        // Make sure the socket is *blocking*.  We may pass large data, through
+        // it, and apparently non blocking read could cause some unexpected
+        // partial read on some systems.
+        setNonBlock(s, false);
+        return (s);
+    }
+
+    // A convenient shortcut for the namespace-scope version of getSockAddr
+    SockAddrInfo getSockAddr(const string& addr_str, const string& port_str) {
+        return (addr_creator_.get(addr_str, port_str));
+    }
+
+    // A helper method that creates a specified type of socket that is
+    // supposed to be passed via a SocketSessionForwarder.  It will bound
+    // to the specified address and port in sainfo.  If do_listen is true
+    // and it's a TCP socket, it will also start listening to new connection
+    // requests.
+    int createSocket(int family, int type, int protocol,
+                     const SockAddrInfo& sainfo, bool do_listen) 
+    {
+        int s = socket(family, type, protocol);
+        if (s < 0) {
+            isc_throw(isc::Unexpected, "socket(2) failed: " <<
+                      strerror(errno));
+        }
+        const int on = 1;
+        if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
+            isc_throw(isc::Unexpected, "setsockopt(SO_REUSEADDR) failed: " <<
+                      strerror(errno));
+        }
+        if (bind(s, sainfo.first, sainfo.second) < 0) {
+            close(s);
+            isc_throw(isc::Unexpected, "bind(2) failed: " <<
+                      strerror(errno));
+        }
+        if (do_listen && protocol == IPPROTO_TCP) {
+            if (listen(s, 1) == -1) {
+                isc_throw(isc::Unexpected, "listen(2) failed: " <<
+                          strerror(errno));
+            }
+        }
+        return (s);
+    }
+
+    // A helper method to push some (normally bogus) socket session header
+    // via a Unix domain socket that pretends to be a valid
+    // SocketSessionForwarder.  It first opens the Unix domain socket,
+    // and connect to the test receptor server (startListen() is expected to
+    // be called beforehand), forwards a valid file descriptor ("stdin" is
+    // used for simplicity), the pushed a 2-byte header length field of the
+    // session header.  The internal receptor_ pointer will be set to a
+    // newly created receptor object for the connection.
+    //
+    // \param hdrlen: The header length to be pushed.  It may or may not be
+    //                valid.
+    // \param hdrlen_len: The length of the actually pushed data as "header
+    //                    length".  Normally it should be 2 (the default), but
+    //                    could be a bogus value for testing.
+    // \param push_fd: Whether to forward the FD.  Normally it should be true,
+    //                 but can be false for testing.
+    void pushSessionHeader(uint16_t hdrlen,
+                           size_t hdrlen_len = sizeof(uint16_t),
+                           bool push_fd = true)
+    {
+        isc::util::OutputBuffer obuffer(0);
+        obuffer.clear();
+
+        dummy_forwarder_.reset(dummyConnect());
+        if (push_fd && send_fd(dummy_forwarder_.fd, 0) != 0) {
+            isc_throw(isc::Unexpected, "Failed to pass FD");
+        }
+        obuffer.writeUint16(hdrlen);
+        if (hdrlen_len > 0) {
+            if (send(dummy_forwarder_.fd, obuffer.getData(), hdrlen_len, 0) !=
+                hdrlen_len) {
+                isc_throw(isc::Unexpected,
+                          "Failed to pass session header len");
+            }
+        }
+        accept_sock_.reset(acceptForwarder());
+        receptor_.reset(new SocketSessionReceptor(accept_sock_.fd));
+    }
+
+    // A helper method to push some (normally bogus) socket session via a
+    // Unix domain socket pretending to be a valid SocketSessionForwarder.
+    // It internally calls pushSessionHeader() for setup and pushing the
+    // header, and pass (often bogus) header data and session data based
+    // on the function parameters.  The parameters are generally compatible
+    // to those for SocketSessionForwarder::push, but could be invalid for
+    // testing purposes.  For session data, we use TEST_DATA and its size
+    // by default for simplicity, but the size can be tweaked for testing.
+    void pushSession(int family, int type, int protocol, socklen_t local_len,
+                     const sockaddr& local, socklen_t remote_len,
+                     const sockaddr& remote,
+                     size_t data_len = sizeof(TEST_DATA))
+    {
+        isc::util::OutputBuffer obuffer(0);
+        obuffer.writeUint32(static_cast<uint32_t>(family));
+        obuffer.writeUint32(static_cast<uint32_t>(type));
+        obuffer.writeUint32(static_cast<uint32_t>(protocol));
+        obuffer.writeUint32(static_cast<uint32_t>(local_len));
+        obuffer.writeData(&local, getSALength(local));
+        obuffer.writeUint32(static_cast<uint32_t>(remote_len));
+        obuffer.writeData(&remote, getSALength(remote));
+        obuffer.writeUint32(static_cast<uint32_t>(data_len));
+        pushSessionHeader(obuffer.getLength());
+        if (send(dummy_forwarder_.fd, obuffer.getData(), obuffer.getLength(),
+                 0) != obuffer.getLength()) {
+            isc_throw(isc::Unexpected, "Failed to pass session header");
+        }
+        if (send(dummy_forwarder_.fd, TEST_DATA, sizeof(TEST_DATA), 0) !=
+            sizeof(TEST_DATA)) {
+            isc_throw(isc::Unexpected, "Failed to pass session data");
+        }
+    }
+
+    // See below
+    void checkPushAndPop(int family, int type, int protocoal,
+                         const SockAddrInfo& local,
+                         const SockAddrInfo& remote, const void* const data,
+                         size_t data_len, bool new_connection);
+
+protected:
+    int listen_fd_;
+    SocketSessionForwarder forwarder_;
+    ScopedSocket dummy_forwarder_; // forwarder "like" socket to pass bad data
+    scoped_ptr<SocketSessionReceptor> receptor_;
+    ScopedSocket accept_sock_;
+    const string large_text_;
+
+private:
+    struct sockaddr_un test_un_;
+    const socklen_t test_un_len_;
+    SockAddrCreator addr_creator_;
+};
+
+TEST_F(ForwardTest, construct) {
+    // On construction the existence of the file doesn't matter.
+    SocketSessionForwarder("some_file");
+
+    // But too long a path should be rejected
+    struct sockaddr_un s;     // can't be const; some compiler complains
+    EXPECT_THROW(SocketSessionForwarder(string(sizeof(s.sun_path), 'x')),
+                 SocketSessionError);
+    // If it's one byte shorter it should be okay
+    SocketSessionForwarder(string(sizeof(s.sun_path) - 1, 'x'));
+}
+
+TEST_F(ForwardTest, connect) {
+    // File doesn't exist (we assume the file "no_such_file" doesn't exist)
+    SocketSessionForwarder forwarder("no_such_file");
+    EXPECT_THROW(forwarder.connectToReceptor(), SocketSessionError);
+    // The socket should be closed internally, so close() should result in
+    // error.
+    EXPECT_THROW(forwarder.close(), BadValue);
+
+    // Set up the receptor and connect.  It should succeed.
+    SocketSessionForwarder forwarder2(TEST_UNIX_FILE);
+    startListen();
+    forwarder2.connectToReceptor();
+    // And it can be closed successfully.
+    forwarder2.close();
+    // Duplicate close should fail
+    EXPECT_THROW(forwarder2.close(), BadValue);
+    // Once closed, reconnect is okay.
+    forwarder2.connectToReceptor();
+    forwarder2.close();
+
+    // Duplicate connect should be rejected
+    forwarder2.connectToReceptor();
+    EXPECT_THROW(forwarder2.connectToReceptor(), BadValue);
+
+    // Connect then destroy.  Should be internally closed, but unfortunately
+    // it's not easy to test it directly.  We only check no disruption happens.
+    SocketSessionForwarder* forwarderp =
+        new SocketSessionForwarder(TEST_UNIX_FILE);
+    forwarderp->connectToReceptor();
+    delete forwarderp;
+}
+
+TEST_F(ForwardTest, close) {
+    // can't close before connect
+    EXPECT_THROW(SocketSessionForwarder(TEST_UNIX_FILE).close(), BadValue);
+}
+
+void
+checkSockAddrs(const sockaddr& expected, const sockaddr& actual) {
+    char hbuf_expected[NI_MAXHOST], sbuf_expected[NI_MAXSERV],
+        hbuf_actual[NI_MAXHOST], sbuf_actual[NI_MAXSERV];
+    EXPECT_EQ(0, getnameinfo(&expected, getSALength(expected),
+                             hbuf_expected, sizeof(hbuf_expected),
+                             sbuf_expected, sizeof(sbuf_expected),
+                             NI_NUMERICHOST | NI_NUMERICSERV));
+    EXPECT_EQ(0, getnameinfo(&actual, getSALength(actual),
+                             hbuf_actual, sizeof(hbuf_actual),
+                             sbuf_actual, sizeof(sbuf_actual),
+                             NI_NUMERICHOST | NI_NUMERICSERV));
+    EXPECT_EQ(string(hbuf_expected), string(hbuf_actual));
+    EXPECT_EQ(string(sbuf_expected), string(sbuf_actual));
+}
+
+// This is a commonly used test case that confirms normal behavior of
+// session passing.  It first creates a "local" socket (which is supposed
+// to act as a "server") bound to the 'local' parameter.  It then forwards
+// the descriptor of the FD of the local socket along with given data.
+// Next, it creates an Receptor object to receive the forwarded FD itself,
+// receives the FD, and sends test data from the received FD.  The
+// test finally checks if it can receive the test data from the local socket
+// at the Forwarder side.  In the case of TCP it's a bit complicated because
+// it first needs to establish a new connection, but essentially the test
+// scenario is the same.  See the diagram below for more details.
+//
+// UDP:
+//   Forwarder          Receptor
+//   sock -- (pass) --> passed_sock
+//   (check)  <-------- send TEST_DATA
+//
+// TCP:
+//   Forwarder               Receptor
+//   server_sock---(pass)--->passed_sock
+//     ^                        |
+//     |(connect)               |
+//   client_sock                |
+//      (check)<---------send TEST_DATA
+void
+ForwardTest::checkPushAndPop(int family, int type, int protocol,
+                             const SockAddrInfo& local,
+                             const SockAddrInfo& remote,
+                             const void* const data,
+                             size_t data_len, bool new_connection)
+{
+    // Create an original socket to be passed
+    const ScopedSocket sock(createSocket(family, type, protocol, local, true));
+    int fwd_fd = sock.fd;       // default FD to be forwarded
+    ScopedSocket client_sock;   // for TCP test we need a separate "client"..
+    ScopedSocket server_sock;   // ..and a separate socket for the connection
+    if (protocol == IPPROTO_TCP) {
+        // Use unspecified port for the "client" to avoid bind(2) failure
+        const SockAddrInfo client_addr = getSockAddr(family == AF_INET6 ?
+                                                     "::1" : "127.0.0.1", "0");
+        client_sock.reset(createSocket(family, type, protocol, client_addr,
+                                       false));
+        setNonBlock(client_sock.fd, true);
+        // This connect would "fail" due to EINPROGRESS.  Ignore it for now.
+        connect(client_sock.fd, local.first, local.second);
+        sockaddr_storage ss;
+        socklen_t salen = sizeof(ss);
+        server_sock.reset(accept(sock.fd, convertSockAddr(&ss), &salen));
+        if (server_sock.fd == -1) {
+            isc_throw(isc::Unexpected, "internal accept failed: " <<
+                      strerror(errno));
+        }
+        fwd_fd = server_sock.fd;
+    }
+
+    // If a new connection is required, start the "server", have the
+    // internal forwarder connect to it, and then internally accept it.
+    if (new_connection) {
+        startListen();
+        forwarder_.connectToReceptor();
+        accept_sock_.reset(acceptForwarder());
+    }
+
+    // Then push one socket session via the forwarder.
+    forwarder_.push(fwd_fd, family, type, protocol, *local.first,
+                    *remote.first, data, data_len);
+
+    // Pop the socket session we just pushed from a local receptor, and
+    // check the content.  Since we do blocking read on the receptor's socket,
+    // we set up an alarm to prevent hangup in case there's a bug that really
+    // makes the blocking happen.
+    SocketSessionReceptor receptor(accept_sock_.fd);
+    alarm(1);                   // set up 1-sec timer, an arbitrary choice.
+    const SocketSession sock_session = receptor.pop();
+    alarm(0);                   // then cancel it.
+    const ScopedSocket passed_sock(sock_session.getSocket());
+    EXPECT_LE(0, passed_sock.fd);
+    // The passed FD should be different from the original FD
+    EXPECT_NE(fwd_fd, passed_sock.fd);
+    EXPECT_EQ(family, sock_session.getFamily());
+    EXPECT_EQ(type, sock_session.getType());
+    EXPECT_EQ(protocol, sock_session.getProtocol());
+    checkSockAddrs(*local.first, sock_session.getLocalEndpoint());
+    checkSockAddrs(*remote.first, sock_session.getRemoteEndpoint());
+    ASSERT_EQ(data_len, sock_session.getDataLength());
+    EXPECT_EQ(0, memcmp(data, sock_session.getData(), data_len));
+
+    // Check if the passed FD is usable by sending some data from it.
+    setNonBlock(passed_sock.fd, false);
+    if (protocol == IPPROTO_UDP) {
+        EXPECT_EQ(sizeof(TEST_DATA),
+                  sendto(passed_sock.fd, TEST_DATA, sizeof(TEST_DATA), 0,
+                         convertSockAddr(local.first), local.second));
+    } else {
+        server_sock.reset(-1);
+        EXPECT_EQ(sizeof(TEST_DATA),
+                  send(passed_sock.fd, TEST_DATA, sizeof(TEST_DATA), 0));
+    }
+    // We don't use non blocking read below as it doesn't seem to be always
+    // reliable.  Instead we impose some reasonably large upper time limit of
+    // blocking (normally it shouldn't even block at all; the limit is to
+    // force the test to stop even if there's some bug and recv fails).
+    char recvbuf[sizeof(TEST_DATA)];
+    sockaddr_storage ss;
+    socklen_t sa_len = sizeof(ss);
+    if (protocol == IPPROTO_UDP) {
+        EXPECT_EQ(sizeof(recvbuf),
+                  recvfrom(fwd_fd, recvbuf, sizeof(recvbuf),
+                           setRecvDelay(fwd_fd), convertSockAddr(&ss),
+                           &sa_len));
+    } else {
+        setNonBlock(client_sock.fd, false);
+        EXPECT_EQ(sizeof(recvbuf),
+                  recv(client_sock.fd, recvbuf, sizeof(recvbuf),
+                       setRecvDelay(client_sock.fd)));
+    }
+    EXPECT_EQ(string(TEST_DATA), string(recvbuf));
+}
+
+TEST_F(ForwardTest, pushAndPop) {
+    // Pass a UDP/IPv6 session.
+    const SockAddrInfo sai_local6(getSockAddr("::1", TEST_PORT));
+    const SockAddrInfo sai_remote6(getSockAddr("2001:db8::1", "5300"));
+    {
+        SCOPED_TRACE("Passing UDP/IPv6 session");
+        checkPushAndPop(AF_INET6, SOCK_DGRAM, IPPROTO_UDP, sai_local6,
+                        sai_remote6, TEST_DATA, sizeof(TEST_DATA), true);
+    }
+    // Pass a TCP/IPv6 session.
+    {
+        SCOPED_TRACE("Passing TCP/IPv6 session");
+        checkPushAndPop(AF_INET6, SOCK_STREAM, IPPROTO_TCP, sai_local6,
+                        sai_remote6, TEST_DATA, sizeof(TEST_DATA), false);
+    }
+
+    // Pass a UDP/IPv4 session.  This reuses the same pair of forwarder and
+    // receptor, which should be usable for multiple attempts of passing,
+    // regardless of family of the passed session
+    const SockAddrInfo sai_local4(getSockAddr("127.0.0.1", TEST_PORT));
+    const SockAddrInfo sai_remote4(getSockAddr("192.0.2.2", "5300"));
+    {
+        SCOPED_TRACE("Passing UDP/IPv4 session");
+        checkPushAndPop(AF_INET, SOCK_DGRAM, IPPROTO_UDP, sai_local4,
+                        sai_remote4, TEST_DATA, sizeof(TEST_DATA), false);
+    }
+    // Pass a TCP/IPv4 session.
+    {
+        SCOPED_TRACE("Passing TCP/IPv4 session");
+        checkPushAndPop(AF_INET, SOCK_STREAM, IPPROTO_TCP, sai_local4,
+                        sai_remote4, TEST_DATA, sizeof(TEST_DATA), false);
+    }
+
+    // Also try large data
+    {
+        SCOPED_TRACE("Passing UDP/IPv6 session with large data");
+        checkPushAndPop(AF_INET6, SOCK_DGRAM, IPPROTO_UDP, sai_local6,
+                        sai_remote6, large_text_.c_str(), large_text_.length(),
+                        false);
+    }
+    {
+        SCOPED_TRACE("Passing TCP/IPv6 session with large data");
+        checkPushAndPop(AF_INET6, SOCK_STREAM, IPPROTO_TCP, sai_local6,
+                        sai_remote6, large_text_.c_str(), large_text_.length(),
+                        false);
+    }
+    {
+        SCOPED_TRACE("Passing UDP/IPv4 session with large data");
+        checkPushAndPop(AF_INET, SOCK_DGRAM, IPPROTO_UDP, sai_local4,
+                        sai_remote4, large_text_.c_str(), large_text_.length(),
+                        false);
+    }
+    {
+        SCOPED_TRACE("Passing TCP/IPv4 session with large data");
+        checkPushAndPop(AF_INET, SOCK_STREAM, IPPROTO_TCP, sai_local4,
+                        sai_remote4, large_text_.c_str(), large_text_.length(),
+                        false);
+    }
+}
+
+TEST_F(ForwardTest, badPush) {
+    // push before connect
+    EXPECT_THROW(forwarder_.push(1, AF_INET, SOCK_DGRAM, IPPROTO_UDP,
+                                 *getSockAddr("192.0.2.1", "53").first,
+                                 *getSockAddr("192.0.2.2", "53").first,
+                                 TEST_DATA, sizeof(TEST_DATA)),
+                 BadValue);
+
+    // Now connect the forwarder for the rest of tests
+    startListen();
+    forwarder_.connectToReceptor();
+
+    // Invalid address family
+    struct sockaddr sockaddr_unspec;
+    sockaddr_unspec.sa_family = AF_UNSPEC;
+    EXPECT_THROW(forwarder_.push(1, AF_INET, SOCK_DGRAM, IPPROTO_UDP,
+                                 sockaddr_unspec,
+                                 *getSockAddr("192.0.2.2", "53").first,
+                                 TEST_DATA, sizeof(TEST_DATA)),
+                 BadValue);
+    EXPECT_THROW(forwarder_.push(1, AF_INET, SOCK_DGRAM, IPPROTO_UDP,
+                                 *getSockAddr("192.0.2.2", "53").first,
+                                 sockaddr_unspec, TEST_DATA,
+                                 sizeof(TEST_DATA)),
+                 BadValue);
+
+    // Inconsistent address family
+    EXPECT_THROW(forwarder_.push(1, AF_INET, SOCK_DGRAM, IPPROTO_UDP,
+                                 *getSockAddr("2001:db8::1", "53").first,
+                                 *getSockAddr("192.0.2.2", "53").first,
+                                 TEST_DATA, sizeof(TEST_DATA)),
+                 BadValue);
+    EXPECT_THROW(forwarder_.push(1, AF_INET6, SOCK_DGRAM, IPPROTO_UDP,
+                                 *getSockAddr("2001:db8::1", "53").first,
+                                 *getSockAddr("192.0.2.2", "53").first,
+                                 TEST_DATA, sizeof(TEST_DATA)),
+                 BadValue);
+
+    // Empty data: we reject them at least for now
+    EXPECT_THROW(forwarder_.push(1, AF_INET, SOCK_DGRAM, IPPROTO_UDP,
+                                 *getSockAddr("192.0.2.1", "53").first,
+                                 *getSockAddr("192.0.2.2", "53").first,
+                                 TEST_DATA, 0),
+                 BadValue);
+    EXPECT_THROW(forwarder_.push(1, AF_INET, SOCK_DGRAM, IPPROTO_UDP,
+                                 *getSockAddr("192.0.2.1", "53").first,
+                                 *getSockAddr("192.0.2.2", "53").first,
+                                 NULL, sizeof(TEST_DATA)),
+                 BadValue);
+
+    // Too big data: we reject them at least for now
+    EXPECT_THROW(forwarder_.push(1, AF_INET, SOCK_DGRAM, IPPROTO_UDP,
+                                 *getSockAddr("192.0.2.1", "53").first,
+                                 *getSockAddr("192.0.2.2", "53").first,
+                                 string(65536, 'd').c_str(), 65536),
+                 BadValue);
+
+    // Close the receptor before push.  It will result in SIGPIPE (should be
+    // ignored) and EPIPE, which will be converted to SocketSessionError.
+    const int receptor_fd = acceptForwarder();
+    close(receptor_fd);
+    EXPECT_THROW(forwarder_.push(1, AF_INET, SOCK_DGRAM, IPPROTO_UDP,
+                                 *getSockAddr("192.0.2.1", "53").first,
+                                 *getSockAddr("192.0.2.2", "53").first,
+                                 TEST_DATA, sizeof(TEST_DATA)),
+                 SocketSessionError);
+}
+
+// A subroutine for pushTooFast.  Due to the fixed configuration of the
+// send buffer size, we shouldn't be able to forward 3 full-size DNS messages
+// without receiving them.  Exactly how many we can forward depends on the
+// internal system implementation, so we'll at least confirm we can't do for 3.
+void
+multiPush(SocketSessionForwarder& forwarder, const struct sockaddr& sa,
+          const void* data, size_t data_len)
+{
+    for (int i = 0; i < 3; ++i) {
+        forwarder.push(1, AF_INET, SOCK_DGRAM, IPPROTO_UDP, sa, sa,
+                       data, data_len);
+    }
+}
+
+TEST_F(ForwardTest, pushTooFast) {
+    // Emulate the situation where the forwarder is pushing sessions too fast.
+    // It should eventually fail without blocking.
+    startListen();
+    forwarder_.connectToReceptor();
+    EXPECT_THROW(multiPush(forwarder_, *getSockAddr("192.0.2.1", "53").first,
+                           large_text_.c_str(), large_text_.length()),
+                 SocketSessionError);
+}
+
+TEST_F(ForwardTest, badPop) {
+    startListen();
+
+    // Close the forwarder socket before pop() without sending anything.
+    pushSessionHeader(0, 0, false);
+    dummy_forwarder_.reset(-1);
+    EXPECT_THROW(receptor_->pop(), SocketSessionError);
+
+    // Pretending to be a forwarder but don't actually pass FD.
+    pushSessionHeader(0, 1, false);
+    dummy_forwarder_.reset(-1);
+    EXPECT_THROW(receptor_->pop(), SocketSessionError);
+
+    // Pass a valid FD (stdin), but provide short data for the hdrlen
+    pushSessionHeader(0, 1);
+    dummy_forwarder_.reset(-1);
+    EXPECT_THROW(receptor_->pop(), SocketSessionError);
+
+    // Pass a valid FD, but provides too large hdrlen
+    pushSessionHeader(0xffff);
+    dummy_forwarder_.reset(-1);
+    EXPECT_THROW(receptor_->pop(), SocketSessionError);
+
+    // Don't provide full header
+    pushSessionHeader(sizeof(uint32_t));
+    dummy_forwarder_.reset(-1);
+    EXPECT_THROW(receptor_->pop(), SocketSessionError);
+
+    // Pushed header is too short
+    const uint8_t dummy_data = 0;
+    pushSessionHeader(1);
+    send(dummy_forwarder_.fd, &dummy_data, 1, 0);
+    dummy_forwarder_.reset(-1);
+    EXPECT_THROW(receptor_->pop(), SocketSessionError);
+
+    // socket addresses commonly used below (the values don't matter).
+    const SockAddrInfo sai_local(getSockAddr("192.0.2.1", "53535"));
+    const SockAddrInfo sai_remote(getSockAddr("192.0.2.2", "53536"));
+    const SockAddrInfo sai6(getSockAddr("2001:db8::1", "53537"));
+
+    // Pass invalid address family (AF_UNSPEC)
+    pushSession(AF_UNSPEC, SOCK_DGRAM, IPPROTO_UDP, sai_local.second,
+                *sai_local.first, sai_remote.second, *sai_remote.first);
+    dummy_forwarder_.reset(-1);
+    EXPECT_THROW(receptor_->pop(), SocketSessionError);
+
+    // Pass inconsistent address family for local
+    pushSession(AF_INET, SOCK_DGRAM, IPPROTO_UDP, sai6.second,
+                *sai6.first, sai_remote.second, *sai_remote.first);
+    dummy_forwarder_.reset(-1);
+    EXPECT_THROW(receptor_->pop(), SocketSessionError);
+
+    // Same for remote
+    pushSession(AF_INET, SOCK_DGRAM, IPPROTO_UDP, sai_local.second,
+                *sai_local.first, sai6.second, *sai6.first);
+    dummy_forwarder_.reset(-1);
+
+    // Pass too big sa length for local
+    pushSession(AF_INET, SOCK_DGRAM, IPPROTO_UDP,
+                sizeof(struct sockaddr_storage) + 1, *sai_local.first,
+                sai_remote.second, *sai_remote.first);
+    dummy_forwarder_.reset(-1);
+    EXPECT_THROW(receptor_->pop(), SocketSessionError);
+
+    // Same for remote
+    pushSession(AF_INET, SOCK_DGRAM, IPPROTO_UDP, sai_local.second,
+                *sai_local.first, sizeof(struct sockaddr_storage) + 1,
+                *sai_remote.first);
+    dummy_forwarder_.reset(-1);
+    EXPECT_THROW(receptor_->pop(), SocketSessionError);
+
+    // Data length is too large
+    pushSession(AF_INET, SOCK_DGRAM, IPPROTO_UDP, sai_local.second,
+                *sai_local.first, sai_remote.second,
+                *sai_remote.first, 65536);
+    dummy_forwarder_.reset(-1);
+    EXPECT_THROW(receptor_->pop(), SocketSessionError);
+
+    // Empty data
+    pushSession(AF_INET, SOCK_DGRAM, IPPROTO_UDP, sai_local.second,
+                *sai_local.first, sai_remote.second,
+                *sai_remote.first, 0);
+    dummy_forwarder_.reset(-1);
+    EXPECT_THROW(receptor_->pop(), SocketSessionError);
+
+    // Not full data are passed
+    pushSession(AF_INET, SOCK_DGRAM, IPPROTO_UDP, sai_local.second,
+                *sai_local.first, sai_remote.second,
+                *sai_remote.first, sizeof(TEST_DATA) + 1);
+    dummy_forwarder_.reset(-1);
+    EXPECT_THROW(receptor_->pop(), SocketSessionError);
+}
+
+TEST(SocketSessionTest, badValue) {
+    // normal cases are confirmed in ForwardTest.  We only check some
+    // abnormal cases here.
+
+    SockAddrCreator addr_creator;
+
+    EXPECT_THROW(SocketSession(42, AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL,
+                               addr_creator.get("192.0.2.1", "53").first,
+                               TEST_DATA, sizeof(TEST_DATA)),
+                 BadValue);
+    EXPECT_THROW(SocketSession(42, AF_INET6, SOCK_STREAM, IPPROTO_TCP,
+                               addr_creator.get("2001:db8::1", "53").first,
+                               NULL, TEST_DATA , sizeof(TEST_DATA)), BadValue);
+    EXPECT_THROW(SocketSession(42, AF_INET, SOCK_DGRAM, IPPROTO_UDP,
+                               addr_creator.get("192.0.2.1", "53").first,
+                               addr_creator.get("192.0.2.2", "5300").first,
+                               TEST_DATA, 0), BadValue);
+    EXPECT_THROW(SocketSession(42, AF_INET, SOCK_DGRAM, IPPROTO_UDP,
+                               addr_creator.get("192.0.2.1", "53").first,
+                               addr_creator.get("192.0.2.2", "5300").first,
+                               NULL, sizeof(TEST_DATA)), BadValue);
+}
+}