Browse Source

Merge branch 'review/sockreq' into scfinal

Conflicts:
	src/lib/server_common/socket_request.cc
	src/lib/util/io/fd.cc
Michal 'vorner' Vaner 13 years ago
parent
commit
f68758a5f5

+ 35 - 1
src/lib/config/ccsession.h

@@ -315,7 +315,41 @@ public:
     isc::data::ConstElementPtr getRemoteConfigValue(
         const std::string& module_name,
         const std::string& identifier) const;
-    
+
+    /**
+     * Send a message to the underlying CC session.
+     * This has the same interface as isc::cc::Session::group_sendmsg()
+     *
+     * \param msg see isc::cc::Session::group_sendmsg()
+     * \param group see isc::cc::Session::group_sendmsg()
+     * \param instance see isc::cc::Session::group_sendmsg()
+     * \param to see isc::cc::Session::group_sendmsg()
+     * \return see isc::cc::Session::group_sendmsg()
+     */
+    int groupSendMsg(isc::data::ConstElementPtr msg,
+                     std::string group,
+                     std::string instance = "*",
+                     std::string to = "*") {
+        return (session_.group_sendmsg(msg, group, instance, to));
+    };
+
+    /**
+     * Receive a message from the underlying CC session.
+     * This has the same interface as isc::cc::Session::group_recvmsg()
+     *
+     * \param envelope see isc::cc::Session::group_recvmsg()
+     * \param msg see isc::cc::Session::group_recvmsg()
+     * \param nonblock see isc::cc::Session::group_recvmsg()
+     * \param seq see isc::cc::Session::group_recvmsg()
+     * \return see isc::cc::Session::group_recvmsg()
+     */
+    bool groupRecvMsg(isc::data::ConstElementPtr& envelope,
+                      isc::data::ConstElementPtr& msg,
+                      bool nonblock = true,
+                      int seq = -1) {
+        return (session_.group_recvmsg(envelope, msg, nonblock, seq));
+    };
+
 private:
     ModuleSpec readModuleSpecification(const std::string& filename);
     void startCheck();

+ 25 - 0
src/lib/server_common/server_common_messages.mes

@@ -16,6 +16,31 @@ $NAMESPACE isc::server_common
 
 # \brief Messages for the server_common library
 
+% SOCKETREQUESTOR_CREATED Socket requestor created
+Debug message.  A socket requesor (client of the socket creator) is created
+for the corresponding application.  Normally this should happen at most
+one time throughout the lifetime of the application.
+
+% SOCKETREQUESTOR_DESTROYED Socket requestor destoryed
+Debug message.  The socket requestor created at SOCKETREQUESTOR_CREATED
+has been destroyed.  This event is generally unexpected other than in
+test cases.
+
+% SOCKETREQUESTOR_GETSOCKET Received a %1 socket for [%2]:%3, FD=%4, token=%5, path=%6
+Debug message. The socket requestor for the corresponding application
+has requested a socket for a set of address, port and protocol (shown
+in the log message) and successfully got it from the creator.  The
+corresponding file descriptor and the associated "token" (an internal
+ID used between the creator and requestor) are shown in the log
+message.
+
+% SOCKETREQUESTOR_RELEASESOCKET Released a socket of token %1
+Debug message.  The socket requestor has released a socket passed by
+the creator.  The associated token of the socket is shown in the
+log message.  If the corresponding SOCKETREQUESTOR_GETSOCKET was logged
+more detailed information of the socket can be identified by matching
+the token.
+
 % SRVCOMM_ADDRESSES_NOT_LIST the address and port specification is not a list in %1
 This points to an error in configuration. What was supposed to be a list of
 IP address - port pairs isn't a list at all but something else.

+ 366 - 4
src/lib/server_common/socket_request.cc

@@ -11,14 +11,362 @@
 // LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
 // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
 // PERFORMANCE OF THIS SOFTWARE.
+#include <config.h>
 
 #include "socket_request.h"
+#include <server_common/logger.h>
+
+#include <config/ccsession.h>
+#include <cc/data.h>
+#include <util/io/fd.h>
+#include <util/io/fd_share.h>
+
+#include <sys/un.h>
+#include <sys/socket.h>
+#include <cerrno>
+#include <csignal>
+#include <cstddef>
 
 namespace isc {
 namespace server_common {
 
 namespace {
 SocketRequestor* requestor(NULL);
+
+// Before the boss process calls send_fd, it first sends this
+// string to indicate success, followed by the file descriptor
+const std::string& CREATOR_SOCKET_OK() {
+    static const std::string str("1\n");
+    return (str);
+}
+
+// Before the boss process calls send_fd, it sends this
+// string to indicate failure. It will not send a file descriptor.
+const std::string& CREATOR_SOCKET_UNAVAILABLE() {
+    static const std::string str("0\n");
+    return (str);
+}
+
+// The name of the ccsession command to request a socket from boss
+// (the actual format of command and response are hardcoded in their
+// respective methods)
+const std::string& REQUEST_SOCKET_COMMAND() {
+    static const std::string str("get_socket");
+    return (str);
+}
+
+// The name of the ccsession command to tell boss we no longer need
+// a socket (the actual format of command and response are hardcoded
+// in their respective methods)
+const std::string& RELEASE_SOCKET_COMMAND() {
+    static const std::string str("drop_socket");
+    return (str);
+}
+
+// A helper converter from numeric protocol ID to the corresponding string.
+// used both for generating a message for the boss process and for logging.
+inline const char*
+protocolString(SocketRequestor::Protocol protocol) {
+    switch (protocol) {
+    case SocketRequestor::TCP:
+        return ("TCP");
+    case SocketRequestor::UDP:
+        return ("UDP");
+    default:
+        return ("unknown protocol");
+    }
+}
+
+// Creates the cc session message to request a socket.
+// The actual command format is hardcoded, and should match
+// the format as read in bind10_src.py.in
+isc::data::ConstElementPtr
+createRequestSocketMessage(SocketRequestor::Protocol protocol,
+                           const std::string& address, uint16_t port,
+                           SocketRequestor::ShareMode share_mode,
+                           const std::string& share_name)
+{
+    const isc::data::ElementPtr request = isc::data::Element::createMap();
+    request->set("address", isc::data::Element::create(address));
+    request->set("port", isc::data::Element::create(port));
+    if (protocol != SocketRequestor::TCP && protocol != SocketRequestor::UDP) {
+        isc_throw(InvalidParameter, "invalid protocol: " << protocol);
+    }
+    request->set("protocol",
+                 isc::data::Element::create(protocolString(protocol)));
+    switch (share_mode) {
+    case SocketRequestor::DONT_SHARE:
+        request->set("share_mode", isc::data::Element::create("NO"));
+        break;
+    case SocketRequestor::SHARE_SAME:
+        request->set("share_mode", isc::data::Element::create("SAMEAPP"));
+        break;
+    case SocketRequestor::SHARE_ANY:
+        request->set("share_mode", isc::data::Element::create("ANY"));
+        break;
+    default:
+        isc_throw(InvalidParameter, "invalid share mode: " << share_mode);
+    }
+    request->set("share_name", isc::data::Element::create(share_name));
+
+    return (isc::config::createCommand(REQUEST_SOCKET_COMMAND(), request));
+}
+
+isc::data::ConstElementPtr
+createReleaseSocketMessage(const std::string& token) {
+    const isc::data::ElementPtr release = isc::data::Element::createMap();
+    release->set("token", isc::data::Element::create(token));
+
+    return (isc::config::createCommand(RELEASE_SOCKET_COMMAND(), release));
+}
+
+// Checks and parses the response receive from Boss
+// If successful, token and path will be set to the values found in the
+// answer.
+// If the response was an error response, or does not contain the
+// expected elements, a CCSessionError is raised.
+void
+readRequestSocketAnswer(isc::data::ConstElementPtr recv_msg,
+                        std::string& token, std::string& path)
+{
+    int rcode;
+    isc::data::ConstElementPtr answer = isc::config::parseAnswer(rcode,
+                                                                 recv_msg);
+    if (rcode != 0) {
+        isc_throw(isc::config::CCSessionError,
+                  "Error response when requesting socket: " << answer->str());
+    }
+
+    if (!answer || !answer->contains("token") || !answer->contains("path")) {
+        isc_throw(isc::config::CCSessionError,
+                  "Malformed answer when requesting socket");
+    }
+    token = answer->get("token")->stringValue();
+    path = answer->get("path")->stringValue();
+}
+
+// Connect to the domain socket that has been received from Boss.
+// (i.e. the one that is used to pass created sockets over).
+//
+// This should only be called if the socket had not been connected to
+// already. To get the socket and reuse existing ones, use
+// getFdShareSocket()
+//
+// \param path The domain socket to connect to
+// \exception SocketError if the socket cannot be connected to
+// \return the socket file descriptor
+int
+createFdShareSocket(const std::string& path) {
+    // TODO: Current master has socketsession code and better way
+    // of handling errors without potential leaks for this. It is
+    // not public there at this moment, but when this is merged
+    // we should make a ticket to move this functionality to the
+    // SocketSessionReceiver and use that.
+    const int sock_pass_fd = socket(AF_UNIX, SOCK_STREAM, 0);
+    if (sock_pass_fd == -1) {
+        isc_throw(SocketRequestor::SocketError,
+                  "Unable to open domain socket " << path <<
+                  ": " << strerror(errno));
+    }
+    struct sockaddr_un sock_pass_addr;
+    sock_pass_addr.sun_family = AF_UNIX;
+    if (path.size() >= sizeof(sock_pass_addr.sun_path)) {
+        close(sock_pass_fd);
+        isc_throw(SocketRequestor::SocketError,
+                  "Unable to open domain socket " << path <<
+                  ": path too long");
+    }
+#ifdef HAVE_SA_LEN
+    sock_pass_addr.sun_len = path.size();
+#endif
+    strcpy(sock_pass_addr.sun_path, path.c_str());
+    const socklen_t len = path.size() + offsetof(struct sockaddr_un, sun_path);
+    // Yes, C-style cast bad. See previous comment about SocketSessionReceiver.
+    if (connect(sock_pass_fd, (const struct sockaddr*)&sock_pass_addr,
+                len) == -1) {
+        close(sock_pass_fd);
+        isc_throw(SocketRequestor::SocketError,
+                  "Unable to open domain socket " << path <<
+                  ": " << strerror(errno));
+    }
+    return (sock_pass_fd);
+}
+
+// Reads a socket fd over the given socket (using recv_fd()).
+//
+// \exception SocketError if the socket cannot be read
+// \return the socket fd that has been read
+int
+getSocketFd(const std::string& token, int sock_pass_fd) {
+    // Tell the boss the socket token.
+    const std::string token_data = token + "\n";
+    if (!isc::util::io::write_data(sock_pass_fd, token_data.c_str(),
+                                   token_data.size())) {
+        isc_throw(SocketRequestor::SocketError, "Error writing socket token");
+    }
+
+    // Boss first sends some data to signal that getting the socket
+    // from its cache succeeded
+    char status[3];        // We need a space for trailing \0, hence 3
+    memset(status, 0, 3);
+    if (isc::util::io::read_data(sock_pass_fd, status, 2) < 2) {
+        isc_throw(SocketRequestor::SocketError,
+                  "Error reading status code while requesting socket");
+    }
+    // Actual status value hardcoded by boss atm.
+    if (CREATOR_SOCKET_UNAVAILABLE() == status) {
+        isc_throw(SocketRequestor::SocketError,
+                  "CREATOR_SOCKET_UNAVAILABLE returned");
+    } else if (CREATOR_SOCKET_OK() != status) {
+        isc_throw(SocketRequestor::SocketError,
+                  "Unknown status code returned before recv_fd '" << status <<
+                  "'");
+    }
+
+    const int passed_sock_fd = isc::util::io::recv_fd(sock_pass_fd);
+
+    // check for error values of passed_sock_fd (see fd_share.h)
+    if (passed_sock_fd < 0) {
+        switch (passed_sock_fd) {
+        case isc::util::io::FD_COMM_ERROR:
+            isc_throw(SocketRequestor::SocketError,
+                      "FD_COMM_ERROR while requesting socket");
+            break;
+        case isc::util::io::FD_OTHER_ERROR:
+            isc_throw(SocketRequestor::SocketError,
+                      "FD_OTHER_ERROR while requesting socket");
+            break;
+        default:
+            isc_throw(SocketRequestor::SocketError,
+                      "Unknown error while requesting socket");
+        }
+    }
+    return (passed_sock_fd);
+}
+
+// This implementation class for SocketRequestor uses
+// a ModuleCCSession for communication with the boss process,
+// and fd_share to read out the socket(s).
+// Since we only use a reference to the session, it must never
+// be closed during the lifetime of this class
+class SocketRequestorCCSession : public SocketRequestor {
+public:
+    explicit SocketRequestorCCSession(config::ModuleCCSession& session) :
+        session_(session)
+    {
+        // We need to filter SIGPIPE to prevent it from happening in
+        // getSocketFd() while writing to the UNIX domain socket after the
+        // remote end closed it.  See lib/util/io/socketsession for more
+        // background details.
+        // Note: we should eventually unify this level of details into a single
+        // module.  Setting a single filter here should be considered a short
+        // term workaround.
+        if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
+            isc_throw(Unexpected, "Failed to filter SIGPIPE: " <<
+                      strerror(errno));
+        }
+        LOG_DEBUG(logger, DBGLVL_TRACE_BASIC, SOCKETREQUESTOR_CREATED);
+    }
+
+    ~SocketRequestorCCSession() {
+        closeFdShareSockets();
+        LOG_DEBUG(logger, DBGLVL_TRACE_BASIC, SOCKETREQUESTOR_DESTROYED);
+    }
+
+    virtual SocketID requestSocket(Protocol protocol,
+                                   const std::string& address,
+                                   uint16_t port, ShareMode share_mode,
+                                   const std::string& share_name)
+    {
+        const isc::data::ConstElementPtr request_msg =
+            createRequestSocketMessage(protocol, address, port,
+                                       share_mode, share_name);
+
+        // Send it to boss
+        const int seq = session_.groupSendMsg(request_msg, "Boss");
+
+        // Get the answer from the boss.
+        // Just do a blocking read, we can't really do much anyway
+        isc::data::ConstElementPtr env, recv_msg;
+        if (!session_.groupRecvMsg(env, recv_msg, false, seq)) {
+            isc_throw(isc::config::CCSessionError,
+                      "Incomplete response when requesting socket");
+        }
+
+        // Read the socket file from the answer
+        std::string token, path;
+        readRequestSocketAnswer(recv_msg, token, path);
+        // get the domain socket over which we will receive the
+        // real socket
+        const int sock_pass_fd = getFdShareSocket(path);
+
+        // and finally get the socket itself
+        const int passed_sock_fd = getSocketFd(token, sock_pass_fd);
+        LOG_DEBUG(logger, DBGLVL_TRACE_DETAIL, SOCKETREQUESTOR_GETSOCKET).
+            arg(protocolString(protocol)).arg(address).arg(port).
+            arg(passed_sock_fd).arg(token).arg(path);
+        return (SocketID(passed_sock_fd, token));
+    }
+
+    virtual void releaseSocket(const std::string& token) {
+        const isc::data::ConstElementPtr release_msg =
+            createReleaseSocketMessage(token);
+
+        // Send it to boss
+        const int seq = session_.groupSendMsg(release_msg, "Boss");
+        LOG_DEBUG(logger, DBGLVL_TRACE_DETAIL, SOCKETREQUESTOR_RELEASESOCKET).
+            arg(token);
+
+        // Get the answer from the boss.
+        // Just do a blocking read, we can't really do much anyway
+        isc::data::ConstElementPtr env, recv_msg;
+        if (!session_.groupRecvMsg(env, recv_msg, false, seq)) {
+            isc_throw(isc::config::CCSessionError,
+                      "Incomplete response when sending drop socket command");
+        }
+
+        // Answer should just be success
+        int rcode;
+        isc::data::ConstElementPtr error = isc::config::parseAnswer(rcode,
+                                                                    recv_msg);
+        if (rcode != 0) {
+            isc_throw(SocketError,
+                      "Error requesting release of socket: " << error->str());
+        }
+    }
+
+private:
+    // Returns the domain socket file descriptor
+    // If we had not opened it yet, opens it now
+    int
+    getFdShareSocket(const std::string& path) {
+        if (fd_share_sockets_.find(path) == fd_share_sockets_.end()) {
+            const int new_fd = createFdShareSocket(path);
+            // Technically, the (creation and) assignment of the new map entry
+            // could thrown an exception and lead to FD leak.  This should be
+            // cleaned up later (see comment about SocketSessionReceiver above)
+            fd_share_sockets_[path] = new_fd;
+            return (new_fd);
+        } else {
+            return (fd_share_sockets_[path]);
+        }
+    }
+
+    // Closes the sockets that has been used for fd_share
+    void
+    closeFdShareSockets() {
+        for (std::map<std::string, int>::const_iterator it =
+                fd_share_sockets_.begin();
+             it != fd_share_sockets_.end();
+             ++it) {
+            close((*it).second);
+        }
+    }
+
+    config::ModuleCCSession& session_;
+    std::map<std::string, int> fd_share_sockets_;
+};
+
 }
 
 SocketRequestor&
@@ -31,14 +379,28 @@ socketRequestor() {
 }
 
 void
-SocketRequestor::initTest(SocketRequestor* new_requestor) {
+initSocketReqeustor(config::ModuleCCSession& session) {
+    if (requestor != NULL) {
+        isc_throw(InvalidOperation,
+                  "The socket requestor was already initialized");
+    } else {
+        requestor = new SocketRequestorCCSession(session);
+    }
+}
+
+void
+initTestSocketRequestor(SocketRequestor* new_requestor) {
     requestor = new_requestor;
 }
 
 void
-SocketRequestor::init(config::ModuleCCSession&) {
-    isc_throw(NotImplemented,
-              "The socket requestor will be implemented in #1522");
+cleanupSocketRequestor() {
+    if (requestor != NULL) {
+        delete requestor;
+        requestor = NULL;
+    } else {
+        isc_throw(InvalidOperation, "The socket requestor is not initialized");
+    }
 }
 
 }

+ 50 - 36
src/lib/server_common/socket_request.h

@@ -39,7 +39,7 @@ namespace server_common {
 /// sense to have two of them.
 ///
 /// This is actually an abstract base class. There'll be one with
-/// hidden implementation and we expect the tests to create it's own
+/// hidden implementation and we expect the tests to create its own
 /// subclass when needed.
 ///
 /// \see socketRequestor function to access the object of this class.
@@ -51,20 +51,21 @@ protected:
     /// (which it can't anyway, as it has pure virtual methods, but just to
     /// be sure).
     SocketRequestor() {}
+
 public:
     /// \brief virtual destructor
     ///
     /// A virtual destructor, as we have virtual methods, to make sure it is
     /// destroyed by the destructor of the subclass. This shouldn't matter, as
     /// a singleton class wouldn't get destroyed, but just to be sure.
-
     virtual ~ SocketRequestor() {}
+
     /// \brief A representation of received socket
     ///
     /// The pair holds two parts. The OS-level file descriptor acting as the
     /// socket (you might want to use it directly with functions like recv,
-    /// or fill it into an asio socket). The other part is the token representing
-    /// the socket, which allows it to be given up again.
+    /// or fill it into an asio socket). The other part is the token
+    /// representing the socket, which allows it to be given up again.
     typedef std::pair<int, std::string> SocketID;
 
     /// \brief The protocol of requested socket
@@ -98,7 +99,7 @@ public:
     /// else or ask for nonsense (releasing a socket we don't own).
     class SocketError : public Exception {
     public:
-        SocketError(const char* file, size_t line, const char *what) :
+        SocketError(const char* file, size_t line, const char* what) :
             Exception(file, line, what)
         { }
     };
@@ -108,15 +109,19 @@ public:
     /// Asks the socket creator to give us a socket. The socket will be bound
     /// to the given address and port.
     ///
-    /// \param protocol specifies the protocol of the socket.
+    /// \param protocol specifies the protocol of the socket.  This must be
+    /// either UDP or TCP.
     /// \param address to which the socket should be bound.
     /// \param port the port to which the socket should be bound (native endian,
     ///     not network byte order).
     /// \param share_mode how the socket can be shared with other requests.
+    /// This must be one of the defined values of ShareMode.
     /// \param share_name the name of sharing group, relevant for SHARE_SAME
     ///     (specified by us or someone else).
     /// \return the socket, as a file descriptor and token representing it on
     ///     the socket creator side.
+    ///
+    /// \throw InvalidParameter protocol or share_mode is invalid
     /// \throw CCSessionError when we have a problem talking over the CC
     ///     session.
     /// \throw SocketError in case the other side doesn't want to give us
@@ -144,33 +149,6 @@ public:
     ///     release (like we're trying to release a socket that doesn't
     ///     belong to us or exist at all).
     virtual void releaseSocket(const std::string& token) = 0;
-
-    /// \brief Initialize the singleton object
-    ///
-    /// This creates the object that will be used to request sockets.
-    /// It can be called only once per the life of application.
-    ///
-    /// \param session the CC session that'll be used to talk to the
-    ///     socket creator.
-    /// \throw InvalidOperation when it is called more than once.
-    static void init(config::ModuleCCSession& session);
-
-    /// \brief Initialization for tests
-    ///
-    /// This is to support different subclasses in tests. It replaces
-    /// the object used by socketRequestor() function by this one provided
-    /// as parameter. The ownership is not taken, eg. it's up to the caller
-    /// to delete it when necessary.
-    ///
-    /// This is not to be used in production applications. It is meant as
-    /// an replacement of init.
-    ///
-    /// This never throws.
-    ///
-    /// \param requestor the object to be used. It can be NULL to reset to
-    ///     an "virgin" state (which acts as if initTest or init was never
-    ///     called before).
-    static void initTest(SocketRequestor* requestor);
 };
 
 /// \brief Access the requestor object.
@@ -180,10 +158,46 @@ public:
 /// \return the active socket requestor object.
 /// \throw InvalidOperation if the object was not yet initialized.
 /// \see SocketRequestor::init to initialize the object.
-SocketRequestor&
-socketRequestor();
+SocketRequestor& socketRequestor();
+
+/// \brief Initialize the singleton object
+///
+/// This creates the object that will be used to request sockets.
+/// It can be called only once per the life of application.
+///
+/// \param session the CC session that'll be used to talk to the
+///                socket creator.
+/// \throw InvalidOperation when it is called more than once
+void initSocketReqeustor(config::ModuleCCSession& session);
+
+/// \brief Initialization for tests
+///
+/// This is to support different subclasses in tests. It replaces
+/// the object used by socketRequestor() function by this one provided
+/// as parameter. The ownership is not taken, eg. it's up to the caller
+/// to delete it when necessary.
+///
+/// This is not to be used in production applications. It is meant as
+/// an replacement of init.
+///
+/// This never throws.
+///
+/// \param requestor the object to be used. It can be NULL to reset to
+///     an "virgin" state (which acts as if initTest or init was never
+///     called before).
+void initTestSocketRequestor(SocketRequestor* requestor);
+
+/// \brief Destroy the singleton instance
+///
+/// Calling this function is not strictly necessary; the socket
+/// requestor is a singleton anyway. However, for some tests it
+/// is useful to destroy and recreate it, as well as for programs
+/// that want to be completely clean on exit.
+/// After this function has been called, all operations except init
+/// will fail.
+void cleanupSocketRequestor();
 
 }
 }
 
-#endif
+#endif  // __SOCKET_REQUEST_H

+ 522 - 3
src/lib/server_common/tests/socket_requestor_test.cc

@@ -12,10 +12,32 @@
 // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
 // PERFORMANCE OF THIS SOFTWARE.
 
+#include <config.h>
+
 #include <server_common/socket_request.h>
 
 #include <gtest/gtest.h>
 
+#include <config/tests/fake_session.h>
+#include <config/ccsession.h>
+#include <exceptions/exceptions.h>
+
+#include <server_common/tests/data_path.h>
+
+#include <cstdlib>
+#include <cstddef>
+#include <cerrno>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include <boost/foreach.hpp>
+#include <boost/scoped_ptr.hpp>
+
+#include <util/io/fd.h>
+#include <util/io/fd_share.h>
+
+using namespace isc::data;
+using namespace isc::config;
 using namespace isc::server_common;
 using namespace isc;
 
@@ -24,7 +46,7 @@ namespace {
 // Check it throws an exception when it is not initialized
 TEST(SocketRequestorAccess, unitialized) {
     // Make sure it is not initialized
-    SocketRequestor::initTest(NULL);
+    initTestSocketRequestor(NULL);
     EXPECT_THROW(socketRequestor(), InvalidOperation);
 }
 
@@ -43,7 +65,7 @@ TEST(SocketRequestorAccess, initialized) {
     };
     DummyRequestor requestor;
     // Make sure it is initialized (the test way, of course)
-    SocketRequestor::initTest(&requestor);
+    initTestSocketRequestor(&requestor);
     // It returs the same "pointer" as inserted
     // The casts are there as the template system seemed to get confused
     // without them, the types should be correct even without them, but
@@ -51,7 +73,504 @@ TEST(SocketRequestorAccess, initialized) {
     EXPECT_EQ(static_cast<const SocketRequestor*>(&requestor),
               static_cast<const SocketRequestor*>(&socketRequestor()));
     // Just that we don't have an invalid pointer anyway
-    SocketRequestor::initTest(NULL);
+    initTestSocketRequestor(NULL);
+}
+
+// This class contains a fake (module)ccsession to emulate answers from Boss
+class SocketRequestorTest : public ::testing::Test {
+public:
+    SocketRequestorTest() : session(ElementPtr(new ListElement),
+                                    ElementPtr(new ListElement),
+                                    ElementPtr(new ListElement)),
+                            specfile(std::string(TEST_DATA_PATH) +
+                                     "/spec.spec")
+    {
+        session.getMessages()->add(createAnswer());
+        cc_session.reset(new ModuleCCSession(specfile, session, NULL, NULL,
+                                             false, false));
+        initSocketReqeustor(*cc_session);
+    }
+
+    ~SocketRequestorTest() {
+        cleanupSocketRequestor();
+    }
+
+    // Do a standard request with some default values
+    SocketRequestor::SocketID
+    doRequest() {
+        return (socketRequestor().requestSocket(SocketRequestor::UDP,
+                                                "192.0.2.1", 12345,
+                                                SocketRequestor::DONT_SHARE,
+                                                "test"));
+    }
+
+    // Creates a valid socket request answer, as it would be sent by
+    // Boss. 'valid' in terms of format, not values
+    void
+    addAnswer(const std::string& token, const std::string& path) {
+        ElementPtr answer_part = Element::createMap();
+        answer_part->set("token", Element::create(token));
+        answer_part->set("path", Element::create(path));
+        session.getMessages()->add(createAnswer(0, answer_part));
+    }
+
+    // Clears the messages the client sent so far on the fake msgq
+    // (for easier access to new messages later)
+    void
+    clearMsgQueue() {
+        while (session.getMsgQueue()->size() > 0) {
+            session.getMsgQueue()->remove(0);
+        }
+    }
+
+    isc::cc::FakeSession session;
+    boost::scoped_ptr<ModuleCCSession> cc_session;
+    const std::string specfile;
+};
+
+// helper function to create the request packet as we expect the
+// socket requestor to send
+ConstElementPtr
+createExpectedRequest(const std::string& address,
+                      int port,
+                      const std::string& protocol,
+                      const std::string& share_mode,
+                      const std::string& share_name)
+{
+    // create command arguments
+    const ElementPtr command_args = Element::createMap();
+    command_args->set("address", Element::create(address));
+    command_args->set("port", Element::create(port));
+    command_args->set("protocol", Element::create(protocol));
+    command_args->set("share_mode", Element::create(share_mode));
+    command_args->set("share_name", Element::create(share_name));
+
+    // create the envelope
+    const ElementPtr packet = Element::createList();
+    packet->add(Element::create("Boss"));
+    packet->add(Element::create("*"));
+    packet->add(createCommand("get_socket", command_args));
+
+    return (packet);
+}
+
+TEST_F(SocketRequestorTest, testSocketRequestMessages) {
+    // For each request, it will raise CCSessionError, since we don't
+    // answer here.
+    // We are only testing the request messages that are sent,
+    // so for this test that is no problem
+    clearMsgQueue();
+    ConstElementPtr expected_request;
+
+    expected_request = createExpectedRequest("192.0.2.1", 12345, "UDP",
+                                             "NO", "test");
+    ASSERT_THROW(socketRequestor().requestSocket(SocketRequestor::UDP,
+                                   "192.0.2.1", 12345,
+                                   SocketRequestor::DONT_SHARE,
+                                   "test"),
+                 CCSessionError);
+    ASSERT_EQ(1, session.getMsgQueue()->size());
+    ASSERT_EQ(*expected_request, *(session.getMsgQueue()->get(0)));
+
+    clearMsgQueue();
+    expected_request = createExpectedRequest("192.0.2.2", 1, "TCP",
+                                             "ANY", "test2");
+    ASSERT_THROW(socketRequestor().requestSocket(SocketRequestor::TCP,
+                                   "192.0.2.2", 1,
+                                   SocketRequestor::SHARE_ANY,
+                                   "test2"),
+                 CCSessionError);
+    ASSERT_EQ(1, session.getMsgQueue()->size());
+    ASSERT_EQ(*expected_request, *(session.getMsgQueue()->get(0)));
+
+    clearMsgQueue();
+    expected_request = createExpectedRequest("::1", 2, "UDP",
+                                             "SAMEAPP", "test3");
+    ASSERT_THROW(socketRequestor().requestSocket(SocketRequestor::UDP,
+                                   "::1", 2,
+                                   SocketRequestor::SHARE_SAME,
+                                   "test3"),
+                 CCSessionError);
+    ASSERT_EQ(1, session.getMsgQueue()->size());
+    ASSERT_EQ(*expected_request, *(session.getMsgQueue()->get(0)));
+}
+
+TEST_F(SocketRequestorTest, invalidParameterForSocketRequest) {
+    // Bad protocol
+    EXPECT_THROW(socketRequestor().
+                 requestSocket(static_cast<SocketRequestor::Protocol>(2),
+                               "192.0.2.1", 12345,
+                               SocketRequestor::DONT_SHARE,
+                               "test"),
+                 InvalidParameter);
+
+    // Bad share mode
+    EXPECT_THROW(socketRequestor().
+                 requestSocket(SocketRequestor::UDP,
+                               "192.0.2.1", 12345,
+                               static_cast<SocketRequestor::ShareMode>(3),
+                               "test"),
+                 InvalidParameter);
+}
+
+TEST_F(SocketRequestorTest, testBadRequestAnswers) {
+    // Test various scenarios where the requestor gets back bad answers
+
+    // Should raise CCSessionError if there is no answer
+    ASSERT_THROW(doRequest(), CCSessionError);
+
+    // Also if the answer does not match the format
+    session.getMessages()->add(createAnswer());
+    ASSERT_THROW(doRequest(), CCSessionError);
+
+    // Now a 'real' answer, should fail on socket connect (no such file)
+    addAnswer("foo", "/does/not/exist");
+    ASSERT_THROW(doRequest(), SocketRequestor::SocketError);
+
+    // Another failure (domain socket path too long)
+    addAnswer("foo", std::string(1000, 'x'));
+    ASSERT_THROW(doRequest(), SocketRequestor::SocketError);
+
+    // Test values around path boundary
+    struct sockaddr_un sock_un;
+    const std::string max_len(sizeof(sock_un.sun_path) - 1, 'x');
+    addAnswer("foo", max_len);
+    // The failure should NOT contain 'too long'
+    // (explicitly checking for existance of nonexistence of 'too long',
+    // as opposed to the actual error, since 'too long' is a value we set).
+    try {
+        doRequest();
+        FAIL() << "doRequest did not throw an exception";
+    } catch (const SocketRequestor::SocketError& se) {
+        ASSERT_EQ(std::string::npos, std::string(se.what()).find("too long"));
+    }
+
+    const std::string too_long(sizeof(sock_un.sun_path), 'x');
+    addAnswer("foo", too_long);
+    // The failure SHOULD contain 'too long'
+    try {
+        doRequest();
+        FAIL() << "doRequest did not throw an exception";
+    } catch (const SocketRequestor::SocketError& se) {
+        ASSERT_NE(std::string::npos, std::string(se.what()).find("too long"));
+    }
+
+    // Send back an error response
+    session.getMessages()->add(createAnswer(1, "error"));
+    ASSERT_THROW(doRequest(), CCSessionError);
+}
+
+// Helper function to create the release commands as we expect
+// them to be sent by the SocketRequestor class
+ConstElementPtr
+createExpectedRelease(const std::string& token) {
+    // create command arguments
+    const ElementPtr command_args = Element::createMap();
+    command_args->set("token", Element::create(token));
+
+    // create the envelope
+    const ElementPtr packet = Element::createList();
+    packet->add(Element::create("Boss"));
+    packet->add(Element::create("*"));
+    packet->add(createCommand("drop_socket", command_args));
+
+    return (packet);
+}
+
+TEST_F(SocketRequestorTest, testSocketReleaseMessages) {
+    ConstElementPtr expected_release;
+
+    session.getMessages()->add(createAnswer());
+
+    clearMsgQueue();
+    expected_release = createExpectedRelease("foo");
+    socketRequestor().releaseSocket("foo");
+    ASSERT_EQ(1, session.getMsgQueue()->size());
+    ASSERT_EQ(*expected_release, *(session.getMsgQueue()->get(0)));
+
+    session.getMessages()->add(createAnswer());
+    clearMsgQueue();
+    expected_release = createExpectedRelease("bar");
+    socketRequestor().releaseSocket("bar");
+    ASSERT_EQ(1, session.getMsgQueue()->size());
+    ASSERT_EQ(*expected_release, *(session.getMsgQueue()->get(0)));
+}
+
+TEST_F(SocketRequestorTest, testBadSocketReleaseAnswers) {
+    // Should fail if there is no answer at all
+    ASSERT_THROW(socketRequestor().releaseSocket("bar"),
+                 CCSessionError);
+
+    // Should also fail if the answer is an error
+    session.getMessages()->add(createAnswer(1, "error"));
+    ASSERT_THROW(socketRequestor().releaseSocket("bar"),
+                 SocketRequestor::SocketError);
+}
+
+// A helper function to impose a read timeout for the server socket
+// in order to avoid deadlock when the client side has a bug and doesn't
+// send expected data.
+// It returns true when the timeout is set successfully; otherwise false.
+bool
+setRecvTimo(int s) {
+    const struct timeval timeo = { 10, 0 }; // 10sec, arbitrary choice
+    if (setsockopt(s, SOL_SOCKET, SO_RCVTIMEO, &timeo, sizeof(timeo)) == 0) {
+        return (true);
+    }
+    if (errno == ENOPROTOOPT) { // deviant OS, give up using it.
+        return (false);
+    }
+    isc_throw(isc::Unexpected, "set RCVTIMEO failed: " << strerror(errno));
+}
+
+// Helper test class that creates a randomly named domain socket
+// Upon init, it will only reserve the name (and place an empty file in its
+// place).
+// When run() is called, it creates the socket, forks, and the child will
+// listen for a connection, then send all the data passed to run to that
+// connection, and then close the socket
+class TestSocket {
+public:
+    TestSocket() : fd_(-1) {
+        path_ = strdup("test_socket.XXXXXX");
+        // Misuse mkstemp to generate a file name.
+        const int f = mkstemp(path_);
+        if (f == -1) {
+            isc_throw(Unexpected, "mkstemp failed: " << strerror(errno));
+        }
+        // Just need the name, so immediately close
+        close(f);
+    }
+
+    ~TestSocket() {
+        cleanup();
+    }
+
+    void
+    cleanup() {
+        unlink(path_);
+        if (path_ != NULL) {
+            free(path_);
+            path_ = NULL;
+        }
+        if (fd_ != -1) {
+            close(fd_);
+            fd_ = -1;
+        }
+    }
+
+    // Returns the path used for the socket
+    const char* getPath() const {
+        return (path_);
+    }
+
+    // create socket, fork, and serve if child (child will exit when done).
+    // If the underlying system doesn't allow to set read timeout, tell the
+    // caller that via a false return value so that the caller can avoid
+    // performing tests that could result in a dead lock.
+    bool run(const std::vector<std::pair<std::string, int> >& data) {
+        create();
+        const bool timo_ok = setRecvTimo(fd_);
+        const int child_pid = fork();
+        if (child_pid == 0) {
+            serve(data);
+            exit(0);
+        } else {
+            // parent does not need fd anymore
+            close(fd_);
+            fd_ = -1;
+        }
+        return (timo_ok);
+    }
+private:
+    // Actually create the socket and listen on it
+    void
+    create() {
+        fd_ = socket(AF_UNIX, SOCK_STREAM, 0);
+        if (fd_ == -1) {
+            isc_throw(Unexpected, "Unable to create socket");
+        }
+        struct sockaddr_un socket_address;
+        socket_address.sun_family = AF_UNIX;
+        socklen_t len = strlen(path_);
+        if (len > sizeof(socket_address.sun_path)) {
+            isc_throw(Unexpected,
+                      "mkstemp() created a filename too long for sun_path");
+        }
+        strncpy(socket_address.sun_path, path_, len);
+#ifdef HAVE_SA_LEN
+        socket_address.sun_len = len;
+#endif
+
+        len += offsetof(struct sockaddr_un, sun_path);
+        // Remove the random file we created so we can reuse it for
+        // a domain socket connection. This contains a minor race condition
+        // but for the purposes of this test it should be small enough
+        unlink(path_);
+        if (bind(fd_, (const struct sockaddr*)&socket_address, len) == -1) {
+            isc_throw(Unexpected,
+                      "unable to bind to test domain socket " << path_ <<
+                      ": " << strerror(errno));
+        }
+
+        if (listen(fd_, 1) == -1) {
+            isc_throw(Unexpected,
+                      "unable to listen on test domain socket " << path_ <<
+                      ": " << strerror(errno));
+        }
+    }
+
+    // Accept one connection, then for each value of the vector,
+    // read the socket token from the connection and match the string
+    // part of the vector element, and send the integer part of the element
+    // using send_fd() (prepended by a status code 'ok').  For simplicity
+    // we assume the tokens are 4 bytes long; if the test case uses a
+    // different size of token the test will fail.
+    //
+    // There are a few specific exceptions;
+    // when the value is -1, it will send back an error value (signaling
+    // CREATOR_SOCKET_UNAVAILABLE)
+    // when the value is -2, it will send a byte signaling CREATOR_SOCKET_OK
+    // first, and then one byte from some string (i.e. bad data, not using
+    // send_fd())
+    //
+    // NOTE: client_fd could leak on exception.  This should be cleaned up.
+    // See the note about SocketSessionReceiver in socket_request.cc.
+    void
+    serve(const std::vector<std::pair<std::string, int> > data) {
+        const int client_fd = accept(fd_, NULL, NULL);
+        if (client_fd == -1) {
+            isc_throw(Unexpected, "Error in accept(): " << strerror(errno));
+        }
+        if (!setRecvTimo(client_fd)) {
+            // In the loop below we do blocking read.  To avoid deadlock
+            // when the parent is buggy we'll skip it unless we can
+            // set a read timeout on the socket.
+            return;
+        }
+        typedef std::pair<std::string, int> DataPair;
+        BOOST_FOREACH(DataPair cur_data, data) {
+            char buf[5];
+            memset(buf, 0, 5);
+            if (isc::util::io::read_data(client_fd, buf, 4) != 4) {
+                isc_throw(Unexpected, "unable to receive socket token");
+            }
+            if (cur_data.first != buf) {
+                isc_throw(Unexpected, "socket token mismatch: expected="
+                          << cur_data.first << ", actual=" << buf);
+            }
+
+            bool result;
+            if (cur_data.second == -1) {
+                // send 'CREATOR_SOCKET_UNAVAILABLE'
+                result = isc::util::io::write_data(client_fd, "0\n", 2);
+            } else if (cur_data.second == -2) {
+                // send 'CREATOR_SOCKET_OK' first
+                result = isc::util::io::write_data(client_fd, "1\n", 2);
+                if (result) {
+                    if (send(client_fd, "a", 1, 0) != 1) {
+                        result = false;
+                    }
+                }
+            } else {
+                // send 'CREATOR_SOCKET_OK' first
+                result = isc::util::io::write_data(client_fd, "1\n", 2);
+                if (result) {
+                    if (isc::util::io::send_fd(client_fd,
+                                               cur_data.second) != 0) {
+                        result = false;
+                    }
+                }
+            }
+            if (!result) {
+                isc_throw(Exception, "Error in send_fd(): " <<
+                          strerror(errno));
+            }
+        }
+        close(client_fd);
+    }
+
+    int fd_;
+    char* path_;
+};
+
+TEST_F(SocketRequestorTest, testSocketPassing) {
+    TestSocket ts;
+    std::vector<std::pair<std::string, int> > data;
+    data.push_back(std::pair<std::string, int>("foo\n", 1));
+    data.push_back(std::pair<std::string, int>("bar\n", 2));
+    data.push_back(std::pair<std::string, int>("foo\n", 3));
+    data.push_back(std::pair<std::string, int>("foo\n", 1));
+    data.push_back(std::pair<std::string, int>("foo\n", -1));
+    data.push_back(std::pair<std::string, int>("foo\n", -2));
+
+    // run() returns true iff we can specify read timeout so we avoid a
+    // deadlock.  Unless there's a bug the test should succeed even without the
+    // timeout, but we don't want to make the test hang up in case with an
+    // unexpected bug, so we'd rather skip most of the tests in that case.
+    const bool timo_ok = ts.run(data);
+    SocketRequestor::SocketID socket_id;
+    if (timo_ok) {
+        // 1 should be ok
+        addAnswer("foo", ts.getPath());
+        socket_id = doRequest();
+        ASSERT_EQ("foo", socket_id.second);
+        ASSERT_EQ(0, close(socket_id.first));
+
+        // 2 should be ok too
+        addAnswer("bar", ts.getPath());
+        socket_id = doRequest();
+        ASSERT_EQ("bar", socket_id.second);
+        ASSERT_EQ(0, close(socket_id.first));
+
+        // 3 should be ok too (reuse earlier token)
+        addAnswer("foo", ts.getPath());
+        socket_id = doRequest();
+        ASSERT_EQ("foo", socket_id.second);
+        ASSERT_EQ(0, close(socket_id.first));
+    }
+
+    // Create a second socket server, to test that multiple different
+    // domains sockets would work as well (even though we don't actually
+    // use that feature)
+    TestSocket ts2;
+    std::vector<std::pair<std::string, int> > data2;
+    data2.push_back(std::pair<std::string, int>("foo\n", 1));
+    const bool timo_ok2 = ts2.run(data2);
+
+    if (timo_ok2) {
+        // 1 should be ok
+        addAnswer("foo", ts2.getPath());
+        socket_id = doRequest();
+        ASSERT_EQ("foo", socket_id.second);
+        ASSERT_EQ(0, close(socket_id.first));
+    }
+
+    if (timo_ok) {
+        // Now use first socket again
+        addAnswer("foo", ts.getPath());
+        socket_id = doRequest();
+        ASSERT_EQ("foo", socket_id.second);
+        ASSERT_EQ(0, close(socket_id.first));
+
+        // -1 is a "normal" error
+        addAnswer("foo", ts.getPath());
+        ASSERT_THROW(doRequest(), SocketRequestor::SocketError);
+
+        // -2 is an unexpected error.  After this point it's not guaranteed the
+        // connection works as intended.
+        addAnswer("foo", ts.getPath());
+        ASSERT_THROW(doRequest(), SocketRequestor::SocketError);
+    }
+
+    // Vector is of first socket is now empty, so the socket should be gone
+    addAnswer("foo", ts.getPath());
+    ASSERT_THROW(doRequest(), SocketRequestor::SocketError);
+
+    // Vector is of second socket is now empty too, so the socket should be
+    // gone
+    addAnswer("foo", ts2.getPath());
+    ASSERT_THROW(doRequest(), SocketRequestor::SocketError);
 }
 
 }

+ 16 - 19
src/lib/util/io/fd.cc

@@ -23,23 +23,23 @@ namespace io {
 
 bool
 write_data(const int fd, const void *buffer_v, const size_t length) {
-
     const unsigned char* buffer(static_cast<const unsigned char*>(buffer_v));
     size_t remaining = length;  // Amount remaining to be written
 
+    // Just keep writing until all is written
     while (remaining > 0) {
-        ssize_t amount = write(fd, buffer, remaining);
-        if (amount == -1) {
-            // Some error.  Ignore interrupted system calls otherwise return
-            // an error indication.
-            if (errno != EINTR) {
-                return false;
+        const int written = write(fd, buffer, remaining);
+        if (written == -1) {
+            if (errno == EINTR) { // Just keep going
+                continue;
+            } else {
+                return (false);
             }
 
-        } else if (amount > 0) {
-            // Wrote "amount" bytes from the buffer
-            remaining -= amount;
-            buffer += amount;
+        } else if (written > 0) {
+            // Wrote "written" bytes from the buffer
+            remaining -= written;
+            buffer += written;
 
         } else {
             // Wrote zero bytes from the buffer. We should not get here as any
@@ -54,24 +54,21 @@ write_data(const int fd, const void *buffer_v, const size_t length) {
 
 ssize_t
 read_data(const int fd, void *buffer_v, const size_t length) {
-
     unsigned char* buffer(static_cast<unsigned char*>(buffer_v));
     size_t remaining = length;   // Amount remaining to be read
 
     while (remaining > 0) {
-        ssize_t amount = read(fd, buffer, remaining);
+        const int amount = read(fd, buffer, remaining);
         if (amount == -1) {
-            // Some error.  Ignore interrupted system calls otherwise return
-            // an error indication.
-            if (errno != EINTR) {
-                return -1;
+            if (errno == EINTR) { // Continue on interrupted call
+                continue;
+            } else {
+                return (-1);
             }
-
         } else if (amount > 0) {
             // Read "amount" bytes into the buffer
             remaining -= amount;
             buffer += amount;
-
         } else {
             // EOF - end the read
             break;