Browse Source

[5189] Use ClientConnection to connect to the servers.

Marcin Siodelski 8 years ago
parent
commit
7a0c0c2f14

+ 35 - 21
src/bin/agent/ca_command_mgr.cc

@@ -14,6 +14,8 @@
 #include <asiolink/unix_domain_socket.h>
 #include <cc/command_interpreter.h>
 #include <cc/data.h>
+#include <cc/json_feed.h>
+#include <config/client_connection.h>
 #include <boost/pointer_cast.hpp>
 #include <iterator>
 #include <string>
@@ -25,6 +27,13 @@ using namespace isc::data;
 using namespace isc::hooks;
 using namespace isc::process;
 
+namespace {
+
+/// @brief Client side connection timeout.
+const long CONNECTION_TIMEOUT = 5000;
+
+}
+
 namespace isc {
 namespace agent {
 
@@ -186,34 +195,39 @@ CtrlAgentCommandMgr::forwardCommand(const std::string& service,
     std::string socket_name = socket_info->get("socket-name")->stringValue();
 
     // Forward command and receive reply.
-    IOService io_service;
-    UnixDomainSocket unix_socket(io_service);
-    size_t receive_len;
-    try {
-        unix_socket.connect(socket_name);
-        std::string wire_command = command->toWire();
-        unix_socket.write(&wire_command[0], wire_command.size());
-        receive_len = unix_socket.receive(&receive_buf_[0], receive_buf_.size());
-
-    } catch (const std::exception& ex) {
+    IOServicePtr io_service(new IOService());;
+    ClientConnection conn(*io_service);
+    boost::system::error_code received_ec;
+    ConstJSONFeedPtr received_feed;
+    conn.start(ClientConnection::SocketPath(socket_name),
+               ClientConnection::ControlCommand(command->toWire()),
+               [&io_service, &received_ec, &received_feed]
+               (const boost::system::error_code& ec, ConstJSONFeedPtr feed) {
+                   // Capture error code and parsed data.
+                   received_ec = ec;
+                   received_feed = feed;
+                   // Stop the IO service so as we can continue.
+                   io_service->stop();
+               }, ClientConnection::Timeout(CONNECTION_TIMEOUT));
+    io_service->run();
+
+    if (received_ec) {
         isc_throw(CommandForwardingError, "unable to forward command to the "
-                  << service << " service: " << ex.what() << ". The server "
-                  "is likely to be offline");
+                  << service << " service: " << received_ec.message()
+                  << ". The server is likely to be offline");
     }
 
-    // This is really not possible right now, but when we migrate to the
-    // solution using timeouts it is possible that the response is not
-    // received.
-    if (receive_len == 0) {
-        isc_throw(CommandForwardingError, "internal server error: no answer"
-                  " received from the server to the forwarded message");
+    // This shouldn't happen because the fact that there was no time out indicates
+    // that the whole response has been read and it should be stored within the
+    // feed. But, let's check to prevent assertions.
+    if (!received_feed) {
+        isc_throw(CommandForwardingError, "internal server error: empty response"
+                  " received from the unix domain socket");
     }
 
-    std::string reply(&receive_buf_[0], receive_len);
-
     ConstElementPtr answer;
     try {
-        answer = Element::fromJSON(reply);
+        answer = received_feed->toElement();
 
         LOG_INFO(agent_logger, CTRL_AGENT_COMMAND_FORWARDED)
             .arg(cmd_name).arg(service);

+ 11 - 19
src/bin/agent/tests/ca_command_mgr_unittests.cc

@@ -230,28 +230,17 @@ public:
         // to this we need to run the server side socket at the same time.
         // Running IO service in a thread guarantees that the server responds
         // as soon as it receives the control command.
-        isc::util::thread::Thread(boost::bind(&CtrlAgentCommandMgrTest::runIO,
-                                              getIOService(), server_socket_,
-                                              expected_responses));
+        isc::util::thread::Thread th(boost::bind(&IOService::run, getIOService().get()));
 
         ConstElementPtr command = createCommand("foo", service);
         ConstElementPtr answer = mgr_.handleCommand("foo", ConstElementPtr(),
                                                     command);
 
-        checkAnswer(answer, expected_result0, expected_result1, expected_result2);
-    }
+        getIOService()->stop();
 
-    /// @brief Runs IO service until number of sent responses is lower than
-    /// expected.
-    ///
-    /// @param server_socket Pointer to the server socket.
-    /// @param expected_responses Number of expected responses.
-    static void runIO(IOServicePtr& io_service,
-                      const test::TestServerUnixSocketPtr& server_socket,
-                      const size_t expected_responses) {
-        while (server_socket->getResponseNum() < expected_responses) {
-            io_service->run_one();
-        }
+        th.wait();
+
+        checkAnswer(answer, expected_result0, expected_result1, expected_result2);
     }
 
     /// @brief a convenience reference to control agent command manager
@@ -322,7 +311,7 @@ TEST_F(CtrlAgentCommandMgrTest, noService) {
 TEST_F(CtrlAgentCommandMgrTest, invalidAnswer) {
     testForward(CtrlAgentCfgContext::TYPE_DHCP6, "dhcp6",
                 isc::config::CONTROL_RESULT_ERROR, -1, -1, 1,
-                "{ \"result\": 0");
+                "{ \"result\": }");
 }
 
 /// Check that error is returned to the client if the forwarding socket is
@@ -359,13 +348,16 @@ TEST_F(CtrlAgentCommandMgrTest, forwardListCommands) {
     // to this we need to run the server side socket at the same time.
     // Running IO service in a thread guarantees that the server responds
     // as soon as it receives the control command.
-    isc::util::thread::Thread(boost::bind(&CtrlAgentCommandMgrTest::runIO,
-                                          getIOService(), server_socket_, 1));
+    isc::util::thread::Thread th(boost::bind(&IOService::run, getIOService().get()));
 
     ConstElementPtr command = createCommand("list-commands", "dhcp4");
     ConstElementPtr answer = mgr_.handleCommand("list-commands", ConstElementPtr(),
                                                 command);
 
+    getIOService()->stop();
+
+    th.wait();
+
     // Answer of 3 is specific to the stub response we send when the
     // command is forwarded. So having this value returned means that
     // the command was forwarded as expected.

+ 20 - 37
src/lib/asiolink/testutils/test_server_unix_socket.cc

@@ -7,6 +7,7 @@
 #include <asiolink/asio_wrapper.h>
 #include <asiolink/testutils/test_server_unix_socket.h>
 #include <boost/bind.hpp>
+#include <boost/enable_shared_from_this.hpp>
 #include <boost/shared_ptr.hpp>
 #include <functional>
 #include <set>
@@ -30,7 +31,7 @@ typedef std::function<void()> SentResponseCallback;
 /// @brief Connection to the server over unix domain socket.
 ///
 /// It reads the data over the socket, sends responses and closes a socket.
-class Connection {
+class Connection : public boost::enable_shared_from_this<Connection> {
 public:
 
     /// @brief Constructor.
@@ -44,11 +45,16 @@ public:
     /// server sends a response.
     Connection(const UnixSocketPtr& unix_socket,
                const std::string custom_response,
-               const SentResponseCallback& sent_response_callback)
+               SentResponseCallback sent_response_callback)
         : socket_(unix_socket), custom_response_(custom_response),
           sent_response_callback_(sent_response_callback) {
+    }
+
+    void start() {
        socket_->async_read_some(boost::asio::buffer(&raw_buf_[0], raw_buf_.size()),
-           boost::bind(&Connection::readHandler, this, _1, _2));
+           boost::bind(&Connection::readHandler, shared_from_this(),
+                       boost::asio::placeholders::error,
+                       boost::asio::placeholders::bytes_transferred));
     }
 
     /// @brief Handler invoked when data have been received over the socket.
@@ -80,15 +86,12 @@ public:
                 boost::asio::buffer(response.c_str(), response.size()));
         }
 
+        start();
+
         // Invoke callback function to notify that the response has been sent.
         sent_response_callback_();
     }
 
-    /// @brief Closes the socket.
-    void stop() {
-        socket_->close();
-    }
-
 private:
 
     /// @brief Pointer to the unix domain socket.
@@ -120,13 +123,12 @@ public:
     ///
     /// @param io_service Reference to the IO service.
     ConnectionPool(IOService& io_service)
-        : io_service_(io_service), connections_(), next_socket_(),
+        : io_service_(io_service), next_socket_(),
           response_num_(0) {
     }
 
     /// @brief Destructor.
     ~ConnectionPool() {
-        stopAll();
     }
 
     /// @brief Creates new unix domain socket and returns it.
@@ -154,28 +156,11 @@ public:
         ConnectionPtr conn(new Connection(next_socket_, custom_response, [this] {
             ++response_num_;
         }));
+        conn->start();
 
-        connections_.insert(conn);
         next_socket_.reset();
     }
 
-    /// @brief Stops the given connection.
-    ///
-    /// @param conn Pointer to the connection to be stopped.
-    void stop(const ConnectionPtr& conn) {
-        conn->stop();
-        connections_.erase(conn);
-    }
-
-    /// @brief Stops all connections.
-    void stopAll() {
-        for (auto conn = connections_.begin(); conn != connections_.end();
-             ++conn) {
-            (*conn)->stop();
-        }
-        connections_.clear();
-    }
-
     /// @brief Returns number of responses sent so far.
     size_t getResponseNum() const {
         return (response_num_);
@@ -186,9 +171,6 @@ private:
     /// @brief Reference to the IO service.
     IOService& io_service_;
 
-    /// @brief Container holding established connections.
-    std::set<ConnectionPtr> connections_;
-
     /// @brief Holds pointer to the generated socket.
     ///
     /// This socket will be used by the next connection.
@@ -212,7 +194,6 @@ TestServerUnixSocket::TestServerUnixSocket(IOService& io_service,
 }
 
 TestServerUnixSocket::~TestServerUnixSocket() {
-    connection_pool_->stopAll();
 }
 
 void
@@ -228,8 +209,7 @@ TestServerUnixSocket::generateCustomResponse(const uint64_t response_size) {
 
 void
 TestServerUnixSocket::startTimer(const long test_timeout) {
-    test_timer_.setup(boost::bind(&TestServerUnixSocket::timeoutHandler,
-                                  shared_from_this()),
+    test_timer_.setup(boost::bind(&TestServerUnixSocket::timeoutHandler, this),
                       test_timeout, IntervalTimer::ONE_SHOT);
 }
 
@@ -242,7 +222,10 @@ TestServerUnixSocket::bindServerSocket() {
 }
 
 void
-TestServerUnixSocket::acceptHandler(const boost::system::error_code&) {
+TestServerUnixSocket::acceptHandler(const boost::system::error_code& ec) {
+    if (ec) {
+        return;
+    }
     connection_pool_->start(custom_response_);
     accept();
 }
@@ -250,7 +233,8 @@ TestServerUnixSocket::acceptHandler(const boost::system::error_code&) {
 void
 TestServerUnixSocket::accept() {
     server_acceptor_.async_accept(*(connection_pool_->getSocket()),
-        boost::bind(&TestServerUnixSocket::acceptHandler, shared_from_this(), _1));
+        boost::bind(&TestServerUnixSocket::acceptHandler, this,
+                    boost::asio::placeholders::error));
 }
 
 void
@@ -265,7 +249,6 @@ TestServerUnixSocket::getResponseNum() const {
     return (connection_pool_->getResponseNum());
 }
 
-
 } // end of namespace isc::asiolink::test
 } // end of namespace isc::asiolink
 } // end of namespace isc

+ 1 - 8
src/lib/asiolink/testutils/test_server_unix_socket.h

@@ -10,7 +10,6 @@
 #include <config.h>
 #include <asiolink/interval_timer.h>
 #include <asiolink/io_service.h>
-#include <boost/enable_shared_from_this.hpp>
 #include <boost/shared_ptr.hpp>
 #include <gtest/gtest.h>
 #include <list>
@@ -44,8 +43,7 @@ class ConnectionPool;
 /// This class uses @c shared_from_this() to pass its instance to the
 /// @c boost::bind function, thus the caller must store shared pointer
 /// to this object.
-class TestServerUnixSocket
-    : public boost::enable_shared_from_this<TestServerUnixSocket> {
+class TestServerUnixSocket {
 public:
 
     /// @brief Constructor.
@@ -67,11 +65,6 @@ public:
     /// @param test_timeout Test timeout in milliseconds.
     void startTimer(const long test_timeout);
 
-    /// @brief Starts timer for detecting test timeout.
-    ///
-    /// @param test_timeout Test timeout in milliseconds.
-    void startTimer(const long test_timeout);
-
     /// @brief Generates response of a given length.
     ///
     /// @param response_size Desired response size.

+ 12 - 32
src/lib/config/client_connection.cc

@@ -40,7 +40,7 @@ public:
     /// @param timeout Connection timeout in milliseconds.
     void start(const ClientConnection::SocketPath& socket_path,
                const ClientConnection::ControlCommand& command,
-               const ClientConnection::Handler& handler,
+               ClientConnection::Handler handler,
                const ClientConnection::Timeout& timeout);
 
     /// @brief Closes the socket.
@@ -55,7 +55,7 @@ public:
     /// @param length Length of the data in the input buffer.
     /// @param handler User supplied callback.
     void doSend(const void* buffer, const size_t length,
-                const ClientConnection::Handler& handler);
+                ClientConnection::Handler handler);
 
     /// @brief Starts asynchronous receive from the server.
     ///
@@ -66,7 +66,7 @@ public:
     /// @ref JSONFeed is returned.
     ///
     /// @param handler User supplied callback.
-    void doReceive(const ClientConnection::Handler& handler);
+    void doReceive(ClientConnection::Handler handler);
 
     /// @brief Terminates the connection and invokes a user callback indicating
     /// an error.
@@ -74,12 +74,12 @@ public:
     /// @param ec Error code.
     /// @param handler User callback.
     void terminate(const boost::system::error_code& ec,
-                   const ClientConnection::Handler& handler);
+                   ClientConnection::Handler handler);
 
     /// @brief Callback invoked when the timeout occurs.
     ///
     /// It calls @ref terminate with the @c boost::asio::error::timed_out.
-    void timeoutCallback(const ClientConnection::Handler& handler);
+    void timeoutCallback(ClientConnection::Handler handler);
 
 private:
 
@@ -109,7 +109,7 @@ ClientConnectionImpl::ClientConnectionImpl(IOService& io_service)
 void
 ClientConnectionImpl::start(const ClientConnection::SocketPath& socket_path,
                             const ClientConnection::ControlCommand& command,
-                            const ClientConnection::Handler& handler,
+                            ClientConnection::Handler handler,
                             const ClientConnection::Timeout& timeout) {
     // Start the timer protecting against timeouts.
     timer_.setup(boost::bind(&ClientConnectionImpl::timeoutCallback,
@@ -142,19 +142,8 @@ ClientConnectionImpl::start(const ClientConnection::SocketPath& socket_path,
 }
 
 void
-ClientConnectionImpl::stop() {
-    try {
-        socket_.close();
-
-    } catch (...) {
-        // Suppress errors related to closing a socket. We can't really help
-        // if an error occurred.
-    }
-}
-
-void
 ClientConnectionImpl::doSend(const void* buffer, const size_t length,
-                             const ClientConnection::Handler& handler) {
+                             ClientConnection::Handler handler) {
     // Pass self to lambda to make sure that the instance of this class
     // lives as long as the lambda is held for async send.
     auto self(shared_from_this());
@@ -187,7 +176,7 @@ ClientConnectionImpl::doSend(const void* buffer, const size_t length,
 }
 
 void
-ClientConnectionImpl::doReceive(const ClientConnection::Handler& handler) {
+ClientConnectionImpl::doReceive(ClientConnection::Handler handler) {
     // Pass self to lambda to make sure that the instance of this class
     // lives as long as the lambda is held for async receive.
     auto self(shared_from_this());
@@ -201,6 +190,7 @@ ClientConnectionImpl::doReceive(const ClientConnection::Handler& handler) {
             terminate(ec, handler);
 
         } else {
+            std::string x(&read_buf_[0], length);
             // Lazy initialization of the JSONFeed. The feed will be "parsing"
             // received JSON stream and will detect when the whole response
             // has been received.
@@ -228,9 +218,8 @@ ClientConnectionImpl::doReceive(const ClientConnection::Handler& handler) {
 
 void
 ClientConnectionImpl::terminate(const boost::system::error_code& ec,
-                                const ClientConnection::Handler& handler) {
+                                ClientConnection::Handler handler) {
     try {
-        stop();
         current_command_.clear();
         handler(ec, feed_);
 
@@ -242,7 +231,7 @@ ClientConnectionImpl::terminate(const boost::system::error_code& ec,
 }
 
 void
-ClientConnectionImpl::timeoutCallback(const ClientConnection::Handler& handler) {
+ClientConnectionImpl::timeoutCallback(ClientConnection::Handler handler) {
     // Timeout has occurred. The remote server didn't provide the entire
     // response within the given time frame. Let's close the connection
     // and signal the timeout.
@@ -253,23 +242,14 @@ ClientConnection::ClientConnection(asiolink::IOService& io_service)
     : impl_(new ClientConnectionImpl(io_service)) {
 }
 
-ClientConnection::~ClientConnection() {
-    stop();
-}
-
 void
 ClientConnection::start(const ClientConnection::SocketPath& socket_path,
                         const ClientConnection::ControlCommand& command,
-                        const Handler& handler,
+                        ClientConnection::Handler handler,
                         const ClientConnection::Timeout& timeout) {
     impl_->start(socket_path, command, handler, timeout);
 }
 
-void
-ClientConnection::stop() {
-    impl_->stop();
-}
-
 
 } // end of namespace config
 } // end of namespace isc

+ 1 - 9
src/lib/config/client_connection.h

@@ -110,11 +110,6 @@ public:
     /// @param io_service Reference to the IO service.
     explicit ClientConnection(asiolink::IOService& io_service);
 
-    /// @brief Destructor.
-    ///
-    /// Closes current connection.
-    ~ClientConnection();
-
     /// @brief Starts asynchronous transaction with a remote endpoint.
     ///
     /// Starts asynchronous connection with the remote endpoint. If the
@@ -144,10 +139,7 @@ public:
     /// occurred during the transaction.
     /// @param timeout Connection timeout in milliseconds.
     void start(const SocketPath& socket_path, const ControlCommand& command,
-               const Handler& handler, const Timeout& timeout = Timeout(10000));
-
-    /// @brief Closes the connection.
-    void stop();
+               Handler handler, const Timeout& timeout = Timeout(10000));
 
 private: