Parcourir la source

[master] Merge branch 'trac5189'

# Conflicts:
#	src/bin/agent/ca_command_mgr.h
#	src/bin/agent/tests/ca_command_mgr_unittests.cc
#	src/lib/asiolink/testutils/test_server_unix_socket.cc
Marcin Siodelski il y a 8 ans
Parent
commit
e48d9399c2

+ 37 - 21
src/bin/agent/ca_command_mgr.cc

@@ -14,6 +14,8 @@
 #include <asiolink/unix_domain_socket.h>
 #include <cc/command_interpreter.h>
 #include <cc/data.h>
+#include <cc/json_feed.h>
+#include <config/client_connection.h>
 #include <boost/pointer_cast.hpp>
 #include <iterator>
 #include <string>
@@ -25,6 +27,14 @@ using namespace isc::data;
 using namespace isc::hooks;
 using namespace isc::process;
 
+namespace {
+
+/// @brief Client side connection timeout.
+/// @todo Make it configurable.
+const long CONNECTION_TIMEOUT = 5000;
+
+}
+
 namespace isc {
 namespace agent {
 
@@ -186,34 +196,40 @@ CtrlAgentCommandMgr::forwardCommand(const std::string& service,
     std::string socket_name = socket_info->get("socket-name")->stringValue();
 
     // Forward command and receive reply.
-    IOService io_service;
-    UnixDomainSocket unix_socket(io_service);
-    size_t receive_len;
-    try {
-        unix_socket.connect(socket_name);
-        std::string wire_command = command->toWire();
-        unix_socket.write(&wire_command[0], wire_command.size());
-        receive_len = unix_socket.receive(&receive_buf_[0], receive_buf_.size());
-
-    } catch (const std::exception& ex) {
+    IOServicePtr io_service(new IOService());;
+    ClientConnection conn(*io_service);
+    boost::system::error_code received_ec;
+    ConstJSONFeedPtr received_feed;
+    conn.start(ClientConnection::SocketPath(socket_name),
+               ClientConnection::ControlCommand(command->toWire()),
+               [&io_service, &received_ec, &received_feed]
+               (const boost::system::error_code& ec, ConstJSONFeedPtr feed) {
+                   // Capture error code and parsed data.
+                   received_ec = ec;
+                   received_feed = feed;
+                   // Got the IO service so stop IO service. This causes to
+                   // stop IO service when all handlers have been invoked.
+                   io_service->stopWork();
+               }, ClientConnection::Timeout(CONNECTION_TIMEOUT));
+    io_service->run();
+
+    if (received_ec) {
         isc_throw(CommandForwardingError, "unable to forward command to the "
-                  << service << " service: " << ex.what() << ". The server "
-                  "is likely to be offline");
+                  << service << " service: " << received_ec.message()
+                  << ". The server is likely to be offline");
     }
 
-    // This is really not possible right now, but when we migrate to the
-    // solution using timeouts it is possible that the response is not
-    // received.
-    if (receive_len == 0) {
-        isc_throw(CommandForwardingError, "internal server error: no answer"
-                  " received from the server to the forwarded message");
+    // This shouldn't happen because the fact that there was no time out indicates
+    // that the whole response has been read and it should be stored within the
+    // feed. But, let's check to prevent assertions.
+    if (!received_feed) {
+        isc_throw(CommandForwardingError, "internal server error: empty response"
+                  " received from the unix domain socket");
     }
 
-    std::string reply(&receive_buf_[0], receive_len);
-
     ConstElementPtr answer;
     try {
-        answer = Element::fromJSON(reply);
+        answer = received_feed->toElement();
 
         LOG_INFO(agent_logger, CTRL_AGENT_COMMAND_FORWARDED)
             .arg(cmd_name).arg(service);

+ 0 - 4
src/bin/agent/ca_command_mgr.h

@@ -11,7 +11,6 @@
 #include <exceptions/exceptions.h>
 #include <boost/noncopyable.hpp>
 #include <boost/shared_ptr.hpp>
-#include <array>
 
 namespace isc {
 namespace agent {
@@ -109,9 +108,6 @@ private:
     /// thus the constructor is private.
     CtrlAgentCommandMgr();
 
-    /// @brief Buffer into which responses to forwarded commands are stored.
-    std::array<char, 65535> receive_buf_;
-
 };
 
 } // end of namespace isc::agent

+ 45 - 31
src/bin/agent/tests/ca_command_mgr_unittests.cc

@@ -171,12 +171,14 @@ public:
     ///
     /// @param response Stub response to be sent from the server socket to the
     /// client.
-    void bindServerSocket(const std::string& response) {
+    /// @param use_thread Indicates if the IO service will be ran in thread.
+    void bindServerSocket(const std::string& response,
+                          const bool use_thread = false) {
         server_socket_.reset(new test::TestServerUnixSocket(*getIOService(),
                                                             unixSocketFilePath(),
-                                                            TEST_TIMEOUT,
                                                             response));
-        server_socket_->bindServerSocket();
+        server_socket_->startTimer(TEST_TIMEOUT);
+        server_socket_->bindServerSocket(use_thread);
     }
 
     /// @brief Creates command with no arguments.
@@ -224,43 +226,35 @@ public:
         // Configure client side socket.
         configureControlSocket(server_type);
         // Create server side socket.
-        bindServerSocket(server_response);
+        bindServerSocket(server_response, true);
 
         // The client side communication is synchronous. To be able to respond
-        // to this we need to run the server side socket at the same time.
-        // Running IO service in a thread guarantees that the server responds
-        // as soon as it receives the control command.
-        isc::util::thread::Thread th(boost::bind(&CtrlAgentCommandMgrTest::runIO,
-                                                 getIOService(), server_socket_,
-                                                 expected_responses));
+        // to this we need to run the server side socket at the same time as the
+        // client. Running IO service in a thread guarantees that the server
+        //responds as soon as it receives the control command.
+        isc::util::thread::Thread th(boost::bind(&IOService::run,
+                                                 getIOService().get()));
+
+
+        // Wait for the IO service in thread to actually run.
+        server_socket_->waitForRunning();
 
         ConstElementPtr command = createCommand("foo", service);
         ConstElementPtr answer = mgr_.handleCommand("foo", ConstElementPtr(),
                                                     command);
 
+        // Cancel all asynchronous operations and let the handlers to be invoked
+        // with operation_aborted error code.
+        server_socket_->stopServer();
+        getIOService()->stopWork();
+
+        // Wait for the thread to finish.
         th.wait();
 
+        EXPECT_EQ(expected_responses, server_socket_->getResponseNum());
         checkAnswer(answer, expected_result0, expected_result1, expected_result2);
     }
 
-    /// @brief Runs IO service until number of sent responses is lower than
-    /// expected.
-    ///
-    /// @param server_socket Pointer to the server socket.
-    /// @param expected_responses Number of expected responses.
-    static void runIO(IOServicePtr& io_service,
-                      const test::TestServerUnixSocketPtr& server_socket,
-                      const size_t expected_responses) {
-        while (server_socket->getResponseNum() < expected_responses) {
-            io_service->run_one();
-        }
-    }
-
-
-    CtrlAgentCommandMgrTest* getTestSelf() {
-        return (this);
-    }
-
     /// @brief a convenience reference to control agent command manager
     CtrlAgentCommandMgr& mgr_;
 
@@ -329,6 +323,18 @@ TEST_F(CtrlAgentCommandMgrTest, noService) {
 TEST_F(CtrlAgentCommandMgrTest, invalidAnswer) {
     testForward(CtrlAgentCfgContext::TYPE_DHCP6, "dhcp6",
                 isc::config::CONTROL_RESULT_ERROR, -1, -1, 1,
+                "{ \"result\": }");
+}
+
+/// Check that connection is dropped if it takes too long. The test checks
+/// client's behavior when partial JSON is returned. Client will be waiting
+/// for the '}' and will timeout because it is never received.
+/// @todo Currently this test is disabled because we don't have configurable
+/// timeout value. It is hardcoded to 5 sec, which is too long for the
+/// unit test to run.
+TEST_F(CtrlAgentCommandMgrTest, DISABLED_connectionTimeout) {
+    testForward(CtrlAgentCfgContext::TYPE_DHCP6, "dhcp6",
+                isc::config::CONTROL_RESULT_ERROR, -1, -1, 1,
                 "{ \"result\": 0");
 }
 
@@ -360,19 +366,27 @@ TEST_F(CtrlAgentCommandMgrTest, forwardListCommands) {
     // Configure client side socket.
     configureControlSocket(CtrlAgentCfgContext::TYPE_DHCP4);
     // Create server side socket.
-    bindServerSocket("{ \"result\" : 3 }");
+    bindServerSocket("{ \"result\" : 3 }", true);
 
     // The client side communication is synchronous. To be able to respond
     // to this we need to run the server side socket at the same time.
     // Running IO service in a thread guarantees that the server responds
     // as soon as it receives the control command.
-    isc::util::thread::Thread th(boost::bind(&CtrlAgentCommandMgrTest::runIO,
-                                             getIOService(), server_socket_, 1));
+    isc::util::thread::Thread th(boost::bind(&IOService::run, getIOService().get()));
+
+    // Wait for the IO service in thread to actually run.
+    server_socket_->waitForRunning();
 
     ConstElementPtr command = createCommand("list-commands", "dhcp4");
     ConstElementPtr answer = mgr_.handleCommand("list-commands", ConstElementPtr(),
                                                 command);
 
+    // Cancel all asynchronous operations and let the handlers to be invoked
+    // with operation_aborted error code.
+    server_socket_->stopServer();
+    getIOService()->stopWork();
+
+    // Wait for the thread to finish.
     th.wait();
 
     // Answer of 3 is specific to the stub response we send when the

+ 15 - 3
src/lib/asiolink/io_service.cc

@@ -1,4 +1,4 @@
-// Copyright (C) 2011-2016 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2011-2017 Internet Systems Consortium, Inc. ("ISC")
 //
 // This Source Code Form is subject to the terms of the Mozilla Public
 // License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -10,6 +10,7 @@
 
 #include <unistd.h>             // for some IPC/network system calls
 #include <netinet/in.h>
+#include <boost/shared_ptr.hpp>
 #include <sys/socket.h>
 
 namespace isc {
@@ -40,7 +41,7 @@ public:
     /// \brief The constructor
     IOServiceImpl() :
         io_service_(),
-        work_(io_service_)
+        work_(new boost::asio::io_service::work(io_service_))
     {};
     /// \brief The destructor.
     ~IOServiceImpl() {};
@@ -76,6 +77,12 @@ public:
     /// This will return the control to the caller of the \c run() method.
     void stop() { io_service_.stop();} ;
 
+    /// \brief Removes IO service work object to let it finish running
+    /// when all handlers have been invoked.
+    void stopWork() {
+        work_.reset();
+    }
+
     /// \brief Return the native \c io_service object used in this wrapper.
     ///
     /// This is a short term work around to support other Kea modules
@@ -89,7 +96,7 @@ public:
     }
 private:
     boost::asio::io_service io_service_;
-    boost::asio::io_service::work work_;
+    boost::shared_ptr<boost::asio::io_service::work> work_;
 };
 
 IOService::IOService() {
@@ -120,6 +127,11 @@ IOService::stop() {
     io_impl_->stop();
 }
 
+void
+IOService::stopWork() {
+    io_impl_->stopWork();
+}
+
 boost::asio::io_service&
 IOService::get_io_service() {
     return (io_impl_->get_io_service());

+ 5 - 1
src/lib/asiolink/io_service.h

@@ -1,4 +1,4 @@
-// Copyright (C) 2011-2015 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2011-2017 Internet Systems Consortium, Inc. ("ISC")
 //
 // This Source Code Form is subject to the terms of the Mozilla Public
 // License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -64,6 +64,10 @@ public:
     /// This will return the control to the caller of the \c run() method.
     void stop();
 
+    /// \brief Removes IO service work object to let it finish running
+    /// when all handlers have been invoked.
+    void stopWork();
+
     /// \brief Return the native \c io_service object used in this wrapper.
     ///
     /// This is a short term work around to support other Kea modules

+ 176 - 7
src/lib/asiolink/tests/unix_domain_socket_unittest.cc

@@ -33,9 +33,13 @@ public:
     /// @brief Constructor.
     ///
     /// Removes unix socket descriptor before the test.
-    UnixDomainSocketTest() : io_service_(),
-                             test_socket_(io_service_, unixSocketFilePath(),
-                                          TEST_TIMEOUT) {
+    UnixDomainSocketTest() :
+        io_service_(),
+        test_socket_(new test::TestServerUnixSocket(io_service_,
+                                                    unixSocketFilePath())),
+        response_(),
+        read_buf_() {
+        test_socket_->startTimer(TEST_TIMEOUT);
         removeUnixSocketFile();
     }
 
@@ -69,18 +73,58 @@ public:
         static_cast<void>(remove(unixSocketFilePath().c_str()));
     }
 
+    /// @brief Performs asynchronous receive on unix domain socket.
+    ///
+    /// This function performs partial read from the unix domain socket.
+    /// It uses @c read_buf_ or small size to ensure that the buffer fills
+    /// in before all that have been read. The partial responses are
+    /// appended to the @c response_ class member.
+    ///
+    /// If the response received so far is shorter than the expected
+    /// response, another partial read is scheduled.
+    ///
+    /// @param socket Reference to the unix domain socket.
+    /// @param expected_response Expected response.
+    void doReceive(UnixDomainSocket& socket,
+                   const std::string& expected_response) {
+        socket.asyncReceive(&read_buf_[0], read_buf_.size(),
+        [this, &socket, expected_response]
+            (const boost::system::error_code& ec, size_t length) {
+            if (!ec) {
+                // Append partial response received and see if the
+                // size of the response received so far is still
+                // smaller than expected. If it is, schedule another
+                // partial read.
+                response_.append(&read_buf_[0], length);
+                if (expected_response.size() > response_.size()) {
+                    doReceive(socket, expected_response);
+                }
+
+            } else if (ec.value() != boost::asio::error::operation_aborted) {
+                ADD_FAILURE() << "error occurred while asynchronously receiving"
+                    " data via unix domain socket: " << ec.message();
+            }
+        });
+    }
+
     /// @brief IO service used by the tests.
     IOService io_service_;
 
     /// @brief Server side unix socket used in these tests.
-    test::TestServerUnixSocket test_socket_;
+    test::TestServerUnixSocketPtr test_socket_;
+
+    /// @brief String containing a response received with @c doReceive.
+    std::string response_;
+
+    /// @brief Read buffer used by @c doReceive.
+    std::array<char, 2> read_buf_;
 };
 
 // This test verifies that the client can send data over the unix
 // domain socket and receive a response.
 TEST_F(UnixDomainSocketTest, sendReceive) {
     // Start the server.
-    test_socket_.bindServerSocket();
+    test_socket_->bindServerSocket();
 
     // Setup client side.
     UnixDomainSocket socket(io_service_);
@@ -95,7 +139,8 @@ TEST_F(UnixDomainSocketTest, sendReceive) {
     ASSERT_EQ(outbound_data.size(), sent_size);
 
     // Run IO service to generate server's response.
-    while (test_socket_.getResponseNum() < 1) {
+    while ((test_socket_->getResponseNum() < 1) &&
+           (!test_socket_->isStopped())) {
         io_service_.run_one();
     }
 
@@ -109,6 +154,77 @@ TEST_F(UnixDomainSocketTest, sendReceive) {
     EXPECT_EQ("received foo", response);
 }
 
+// This test verifies that the client can send the data over the unix
+// domain socket and receive a response asynchronously.
+TEST_F(UnixDomainSocketTest, asyncSendReceive) {
+    // Start the server.
+    test_socket_->bindServerSocket();
+
+    // Setup client side.
+    UnixDomainSocket socket(io_service_);
+
+    // We're going to asynchronously connect to the server. The boolean value
+    // below will be modified by the connect handler function (lambda) invoked
+    // when the connection is established or if an error occurs.
+    bool connect_handler_invoked = false;
+    ASSERT_NO_THROW(socket.asyncConnect(unixSocketFilePath(),
+        [this, &connect_handler_invoked](const boost::system::error_code& ec) {
+            // Indicate that the handler has been called so as the loop below gets
+            // interrupted.
+            connect_handler_invoked = true;
+            // Operation aborted indicates that IO service has been stopped. This
+            // shouldn't happen here.
+            if (ec && (ec.value() != boost::asio::error::operation_aborted)) {
+                ADD_FAILURE() << "error occurred while asynchronously connecting"
+                    " via unix domain socket: " << ec.message();
+            }
+        }
+    ));
+    // Run IO service until connect handler is invoked.
+    while (!connect_handler_invoked && (!test_socket_->isStopped())) {
+        io_service_.run_one();
+    }
+
+    // We are going to asynchronously send the 'foo' over the unix socket.
+    const std::string outbound_data = "foo";
+    size_t sent_size = 0;
+    ASSERT_NO_THROW(socket.asyncSend(outbound_data.c_str(), outbound_data.size(),
+        [this, &sent_size](const boost::system::error_code& ec, size_t length) {
+        // If we have been successful sending the data, record the number of
+        // bytes we have sent.
+        if (!ec) {
+            sent_size = length;
+
+        } else if (ec.value() != boost::asio::error::operation_aborted) {
+            ADD_FAILURE() << "error occurred while asynchronously sending the"
+            " data over unix domain socket: " << ec.message();
+        }
+    }
+    ));
+
+    // Run IO service to generate server's response.
+    while ((test_socket_->getResponseNum() < 1) &&
+           (!test_socket_->isStopped())) {
+        io_service_.run_one();
+    }
+
+    // There is no guarantee that all data have been sent so we only check that
+    // some data have been sent.
+    ASSERT_GT(sent_size, 0);
+
+    std::string expected_response = "received foo";
+    doReceive(socket, expected_response);
+
+    // Run IO service until we get the full response from the server.
+    while ((response_.size() < expected_response.size()) &&
+           !test_socket_->isStopped()) {
+        io_service_.run_one();
+    }
+
+    // Check that the entire response has been received and is correct.
+    EXPECT_EQ(expected_response, response_);
+}
+
 // This test verifies that UnixDomainSocketError exception is thrown
 // on attempt to connect, write or receive when the server socket
 // is not available.
@@ -123,11 +239,64 @@ TEST_F(UnixDomainSocketTest, clientErrors) {
                  UnixDomainSocketError);
 }
 
+// This test verifies that an error is returned on attempt to asynchronously
+// connect, write or receive when the server socket is not available.
+TEST_F(UnixDomainSocketTest, asyncClientErrors) {
+    UnixDomainSocket socket(io_service_);
+
+    // Asynchronous operations signal errors through boost::system::error_code
+    // object passed to the handler function. This object casts to boolean.
+    // In case of success the object casts to false. In case of an error it
+    // casts to true. The actual error codes can be retrieved by comparing the
+    // ec objects to predefined error objects. We don't check for the actual
+    // errors here, because it is not certain that the same error codes would
+    // be returned on various operating systems.
+
+    // In the following tests we use C++11 lambdas as callbacks.
+
+    // Connect
+    bool connect_handler_invoked = false;
+    socket.asyncConnect(unixSocketFilePath(),
+        [this, &connect_handler_invoked](const boost::system::error_code& ec) {
+        connect_handler_invoked = true;
+        EXPECT_TRUE(ec);
+    });
+    while (!connect_handler_invoked && !test_socket_->isStopped()) {
+        io_service_.run_one();
+    }
+
+    // Send
+    const std::string outbound_data = "foo";
+    bool send_handler_invoked = false;
+    socket.asyncSend(outbound_data.c_str(), outbound_data.size(),
+        [this, &send_handler_invoked]
+        (const boost::system::error_code& ec, size_t length) {
+        send_handler_invoked = true;
+        EXPECT_TRUE(ec);
+    });
+    while (!send_handler_invoked && !test_socket_->isStopped()) {
+        io_service_.run_one();
+    }
+
+    // Receive
+    bool receive_handler_invoked = false;
+    std::array<char, 1024> read_buf;
+    socket.asyncReceive(&read_buf[0], read_buf.size(),
+        [this, &receive_handler_invoked]
+        (const boost::system::error_code& ec, size_t length) {
+        receive_handler_invoked = true;
+        EXPECT_TRUE(ec);
+    });
+    while (!receive_handler_invoked && !test_socket_->isStopped()) {
+        io_service_.run_one();
+    }
+}
+
 // Check that native socket descriptor is returned correctly when
 // the socket is connected.
 TEST_F(UnixDomainSocketTest, getNative) {
     // Start the server.
-    test_socket_.bindServerSocket();
+    test_socket_->bindServerSocket();
 
     // Setup client side.
     UnixDomainSocket socket(io_service_);

+ 74 - 14
src/lib/asiolink/testutils/test_server_unix_socket.cc

@@ -7,9 +7,11 @@
 #include <asiolink/asio_wrapper.h>
 #include <asiolink/testutils/test_server_unix_socket.h>
 #include <boost/bind.hpp>
+#include <boost/enable_shared_from_this.hpp>
 #include <boost/shared_ptr.hpp>
 #include <functional>
 #include <set>
+#include <sstream>
 
 using namespace boost::asio::local;
 
@@ -29,7 +31,7 @@ typedef std::function<void()> SentResponseCallback;
 /// @brief Connection to the server over unix domain socket.
 ///
 /// It reads the data over the socket, sends responses and closes a socket.
-class Connection {
+class Connection : public boost::enable_shared_from_this<Connection> {
 public:
 
     /// @brief Constructor.
@@ -43,11 +45,22 @@ public:
     /// server sends a response.
     Connection(const UnixSocketPtr& unix_socket,
                const std::string custom_response,
-               const SentResponseCallback& sent_response_callback)
+               SentResponseCallback sent_response_callback)
         : socket_(unix_socket), custom_response_(custom_response),
           sent_response_callback_(sent_response_callback) {
+    }
+
+    /// @brief Starts asynchronous read from the socket.
+    void start() {
        socket_->async_read_some(boost::asio::buffer(&raw_buf_[0], raw_buf_.size()),
-           boost::bind(&Connection::readHandler, this, _1, _2));
+           boost::bind(&Connection::readHandler, shared_from_this(),
+                       boost::asio::placeholders::error,
+                       boost::asio::placeholders::bytes_transferred));
+    }
+
+    /// @brief Closes the socket.
+    void stop() {
+        socket_->close();
     }
 
     /// @brief Handler invoked when data have been received over the socket.
@@ -79,15 +92,12 @@ public:
                 boost::asio::buffer(response.c_str(), response.size()));
         }
 
+        start();
+
         // Invoke callback function to notify that the response has been sent.
         sent_response_callback_();
     }
 
-    /// @brief Closes the socket.
-    void stop() {
-        socket_->close();
-    }
-
 private:
 
     /// @brief Pointer to the unix domain socket.
@@ -153,6 +163,7 @@ public:
         ConnectionPtr conn(new Connection(next_socket_, custom_response, [this] {
             ++response_num_;
         }));
+        conn->start();
 
         connections_.insert(conn);
         next_socket_.reset();
@@ -200,28 +211,59 @@ private:
 
 TestServerUnixSocket::TestServerUnixSocket(IOService& io_service,
                                            const std::string& socket_file_path,
-                                           const long test_timeout,
                                            const std::string& custom_response)
     : io_service_(io_service),
       server_endpoint_(socket_file_path),
       server_acceptor_(io_service_.get_io_service()),
       test_timer_(io_service_),
       custom_response_(custom_response),
-      connection_pool_(new ConnectionPool(io_service)) {
+      connection_pool_(new ConnectionPool(io_service)),
+      stopped_(false),
+      running_(false) {
+}
+
+TestServerUnixSocket::~TestServerUnixSocket() {
+    server_acceptor_.close();
+}
+
+void
+TestServerUnixSocket::generateCustomResponse(const uint64_t response_size) {
+    std::ostringstream s;
+    s << "{";
+    while (s.tellp() < response_size) {
+        s << "\"param\": \"value\",";
+    }
+    s << "}";
+    custom_response_ = s.str();
+}
+
+void
+TestServerUnixSocket::startTimer(const long test_timeout) {
     test_timer_.setup(boost::bind(&TestServerUnixSocket::timeoutHandler, this),
                       test_timeout, IntervalTimer::ONE_SHOT);
 }
 
-TestServerUnixSocket::~TestServerUnixSocket() {
+void
+TestServerUnixSocket::stopServer() {
+    test_timer_.cancel();
+    server_acceptor_.cancel();
     connection_pool_->stopAll();
 }
 
 void
-TestServerUnixSocket::bindServerSocket() {
+TestServerUnixSocket::bindServerSocket(const bool use_thread) {
     server_acceptor_.open();
     server_acceptor_.bind(server_endpoint_);
     server_acceptor_.listen();
     accept();
+
+    // When threads are in use, we need to post a handler which will be invoked
+    // when the thread has already started and the IO service is running. The
+    // main thread can move forward when it receives this signal from the handler.
+    if (use_thread) {
+        io_service_.post(boost::bind(&TestServerUnixSocket::signalRunning,
+                                     this));
+    }
 }
 
 void
@@ -237,13 +279,32 @@ TestServerUnixSocket::acceptHandler(const boost::system::error_code& ec) {
 void
 TestServerUnixSocket::accept() {
     server_acceptor_.async_accept(*(connection_pool_->getSocket()),
-        boost::bind(&TestServerUnixSocket::acceptHandler, this, _1));
+        boost::bind(&TestServerUnixSocket::acceptHandler, this,
+                    boost::asio::placeholders::error));
+}
+
+void
+TestServerUnixSocket::signalRunning() {
+    {
+        isc::util::thread::Mutex::Locker lock(mutex_);
+        running_ = true;
+    }
+    condvar_.signal();
+}
+
+void
+TestServerUnixSocket::waitForRunning() {
+    isc::util::thread::Mutex::Locker lock(mutex_);
+    while (!running_) {
+        condvar_.wait(mutex_);
+    }
 }
 
 void
 TestServerUnixSocket::timeoutHandler() {
     ADD_FAILURE() << "Timeout occurred while running the test!";
     io_service_.stop();
+    stopped_ = true;
 }
 
 size_t
@@ -251,7 +312,6 @@ TestServerUnixSocket::getResponseNum() const {
     return (connection_pool_->getResponseNum());
 }
 
-
 } // end of namespace isc::asiolink::test
 } // end of namespace isc::asiolink
 } // end of namespace isc

+ 68 - 3
src/lib/asiolink/testutils/test_server_unix_socket.h

@@ -10,9 +10,12 @@
 #include <config.h>
 #include <asiolink/interval_timer.h>
 #include <asiolink/io_service.h>
+#include <util/threads/thread.h>
+#include <util/threads/sync.h>
 #include <boost/shared_ptr.hpp>
 #include <gtest/gtest.h>
 #include <list>
+#include <stdint.h>
 #include <string>
 
 namespace isc {
@@ -38,6 +41,10 @@ class ConnectionPool;
 /// the number of responses sent by the server is greater than
 /// expected. The number of responses sent so far can be retrieved
 /// using @ref TestServerUnixSocket::getResponseNum.
+///
+/// This class uses @c shared_from_this() to pass its instance to the
+/// @c boost::bind function, thus the caller must store shared pointer
+/// to this object.
 class TestServerUnixSocket {
 public:
 
@@ -45,11 +52,9 @@ public:
     ///
     /// @param io_service IO service.
     /// @param socket_file_path Socket file path.
-    /// @param test_timeout Test timeout in milliseconds.
     /// @param custom_response Custom response to be sent to the client.
     TestServerUnixSocket(IOService& io_service,
                          const std::string& socket_file_path,
-                         const long test_timeout,
                          const std::string& custom_response = "");
 
     /// @brief Destructor.
@@ -57,8 +62,26 @@ public:
     /// Closes active connections.
     ~TestServerUnixSocket();
 
+    /// @brief Starts timer for detecting test timeout.
+    ///
+    /// @param test_timeout Test timeout in milliseconds.
+    void startTimer(const long test_timeout);
+
+    /// @brief Cancels all asynchronous operations.
+    void stopServer();
+
+    /// @brief Generates response of a given length.
+    ///
+    /// Note: The response may be a few bytes larger than requested.
+    ///
+    /// @param response_size Desired response size.
+    void generateCustomResponse(const uint64_t response_size);
+
     /// @brief Creates and binds server socket.
-    void bindServerSocket();
+    ///
+    /// @param use_thread Boolean value indicating if the IO service
+    /// is running in thread.
+    void bindServerSocket(const bool use_thread = false);
 
     /// @brief Server acceptor handler.
     ///
@@ -73,11 +96,32 @@ public:
     /// @brief Return number of responses sent so far to the clients.
     size_t getResponseNum() const;
 
+    /// @brief Indicates if the server has been stopped.
+    bool isStopped() {
+        return (stopped_);
+    }
+
+    /// @brief Waits for the server signal that it is running.
+    ///
+    /// When the caller starts the service he indicates whether
+    /// IO service will be running in thread or not. If threads
+    /// are used the caller has to wait for the IO service to
+    /// actually run. In such case this function should be invoked
+    /// which waits for a posted callback to be executed. When this
+    /// happens it means that IO service is running and the main
+    /// thread can move forward.
+    void waitForRunning();
+
 private:
 
     /// @brief Asynchronously accept new connections.
     void accept();
 
+    /// @brief Handler invoked to signal that server is running.
+    ///
+    /// This is used only when thread is used to run IO service.
+    void signalRunning();
+
     /// @brief IO service used by the tests.
     IOService& io_service_;
 
@@ -94,6 +138,27 @@ private:
 
     /// @brief Pool of connections.
     boost::shared_ptr<ConnectionPool> connection_pool_;
+
+    /// @brief Indicates if IO service has been stopped as a result of
+    /// a timeout.
+    bool stopped_;
+
+    /// @brief Indicates if the server in a thread is running.
+    bool running_;
+
+    /// @brief Mutex used by the server.
+    ///
+    /// Mutex is used in situations when server's IO service is being run in a
+    /// thread to synchronize this thread with a main thread using
+    /// @ref signalRunning and @ref waitForRunning.
+    isc::util::thread::Mutex mutex_;
+
+    /// @brief Conditional variable used by the server.
+    ///
+    /// Conditional variable is used in situations when server's IO service is
+    /// being run in a thread to synchronize this thread with a main thread
+    /// using @ref signalRunning and @ref waitForRunning.
+    isc::util::thread::CondVar condvar_;
 };
 
 /// @brief Pointer to the @ref TestServerUnixSocket.

+ 227 - 3
src/lib/asiolink/unix_domain_socket.cc

@@ -6,6 +6,7 @@
 
 #include <asiolink/asio_wrapper.h>
 #include <asiolink/unix_domain_socket.h>
+#include <boost/enable_shared_from_this.hpp>
 #include <iostream>
 using namespace boost::asio::local;
 
@@ -13,7 +14,7 @@ namespace isc {
 namespace asiolink {
 
 /// @brief Implementation of the unix domain socket.
-class UnixDomainSocketImpl {
+class UnixDomainSocketImpl : public boost::enable_shared_from_this<UnixDomainSocketImpl> {
 public:
 
     /// @brief Constructor.
@@ -30,6 +31,120 @@ public:
         close();
     }
 
+    /// @brief Asynchronously connects to an endpoint.
+    ///
+    /// This method schedules asynchronous connect and installs the
+    /// @ref UnixDomainSocketImpl::connectHandler as a callback.
+    ///
+    /// @param endpoint Reference to an endpoint to connect to.
+    /// @param handler User supplied handler to be invoked when the connection
+    /// is established or when error is signalled.
+    void asyncConnect(const stream_protocol::endpoint& endpoint,
+                      const UnixDomainSocket::ConnectHandler& handler);
+
+    /// @brief Local handler invoked as a result of asynchronous connection.
+    ///
+    /// This is a wrapper around the user supplied callback. It ignores
+    /// EINPROGRESS errors which are observed on some operating systems as
+    /// a result of trying to connect asynchronously. This error code doesn't
+    /// necessarily indicate a problem and the subsequent attempts to read
+    /// and write to the socket will succeed. Therefore, the handler simply
+    /// overrides this error code with success status. The user supplied
+    /// handler doesn't need to deal with the EINPROGRESS error codes.
+    ///
+    /// @param remote_handler User supplied callback.
+    /// @param ec Error code returned as a result of connection.
+    void connectHandler(const UnixDomainSocket::ConnectHandler& remote_handler,
+                        const boost::system::error_code& ec);
+
+    /// @brief Asynchronously sends data over the socket.
+    ///
+    /// This method schedules an asynchronous send and installs the
+    /// @ref UnixDomainSocketImpl::sendHandler as a callback.
+    ///
+    /// @param data Pointer to data to be sent.
+    /// @param length Number of bytes to be sent.
+    /// @param handler Callback to be invoked when data have been sent or an
+    /// sending error is signalled.
+    void asyncSend(const void* data, const size_t length,
+                   const UnixDomainSocket::Handler& handler);
+
+    /// @brief Asynchronously sends the data over the socket.
+    ///
+    /// This method is called by the @ref asyncSend and the @ref sendHandler
+    /// if the asynchronous send has to be repeated as a result of receiving
+    /// EAGAIN or EWOULDBLOCK.
+    ///
+    /// @param buffer Buffers holding the data to be sent.
+    /// @param handler User supplied callback to be invoked when data have
+    /// been sent or sending error is signalled.
+    void doSend(const boost::asio::const_buffers_1& buffer,
+                const UnixDomainSocket::Handler& handler);
+
+
+    /// @brief Local handler invoked as a result of asynchronous send.
+    ///
+    /// This handler is invoked as a result of asynchronous send. It is a
+    /// wrapper callback around the user supplied callback. It handles
+    /// EWOULDBLOCK and EAGAIN errors by retrying an asynchronous send.
+    /// These errors are often returned on some operating systems, even
+    /// though one would expect that asynchronous operation would not
+    /// return such errors. Because these errors are handled by the
+    /// wrapper callback, the user supplied callback never receives
+    /// these errors.
+    ///
+    /// @param remote_handler User supplied callback.
+    /// @param buffer Buffers holding the data to be sent.
+    /// @param ec Error code returned as a result of sending the data.
+    /// @param length Length of the data sent.
+    void sendHandler(const UnixDomainSocket::Handler& remote_handler,
+                     const boost::asio::const_buffers_1& buffer,
+                     const boost::system::error_code& ec,
+                     size_t length);
+
+    /// @brief Asynchronously receive data over the socket.
+    ///
+    /// This method schedules asynchronous receive and installs the
+    /// @ref UnixDomainSocketImpl::receiveHandler is a callback.
+    ///
+    /// @param data Pointer to a buffer into which the data should be read.
+    /// @param length Length of the buffer.
+    /// @param handler User supplied callback invoked when data have been
+    /// received or an error is signalled.
+    void asyncReceive(void* data, const size_t length,
+                      const UnixDomainSocket::Handler& handler);
+
+    /// @brief Asynchronously receives the data over the socket.
+    ///
+    /// This method is called @ref asyncReceive and @ref receiveHandler when
+    /// EWOULDBLOCK or EAGAIN is returned.
+    ///
+    /// @param buffer A buffer into which the data should be received.
+    /// @param handler User supplied callback invoked when data have been
+    /// received on an error is signalled.
+    void doReceive(const boost::asio::mutable_buffers_1& buffer,
+                   const UnixDomainSocket::Handler& handler);
+
+    /// @brief Local handler invoked as a result of asynchronous receive.
+    ///
+    /// This handler is invoked as a result of asynchronous receive. It is a
+    /// wrapper callback around the user supplied callback. It handles
+    /// EWOULDBLOCK and EAGAIN by retrying to asynchronously receive the
+    /// data. These errors are often returned on some operating systems, even
+    /// though one would expect that asynchronous operation would not
+    /// return such errors. Because these errors are handled by the
+    /// wrapper callback, the user supplied callback never receives
+    /// these errors.
+    ///
+    /// @param remote_handler User supplied callback.
+    /// @param buffer Buffer into which the data are received.
+    /// @param ec Error code returned as a result of asynchronous receive.
+    /// @param length Size of the received data.
+    void receiveHandler(const UnixDomainSocket::Handler& remote_handler,
+                        const boost::asio::mutable_buffers_1& buffer,
+                        const boost::system::error_code& ec,
+                        size_t length);
+
     /// @brief Closes the socket.
     void close();
 
@@ -38,6 +153,98 @@ public:
 };
 
 void
+UnixDomainSocketImpl::asyncConnect(const stream_protocol::endpoint& endpoint,
+                                   const UnixDomainSocket::ConnectHandler& handler) {
+    using namespace std::placeholders;
+
+    UnixDomainSocket::ConnectHandler local_handler =
+        std::bind(&UnixDomainSocketImpl::connectHandler, shared_from_this(),
+                  handler, _1);
+    socket_.async_connect(endpoint, local_handler);
+}
+
+void
+UnixDomainSocketImpl::connectHandler(const UnixDomainSocket::ConnectHandler& remote_handler,
+                                     const boost::system::error_code& ec) {
+    // It was observed on Debian and Fedora that asynchronous connect may result
+    // in EINPROGRESS error. This doesn't really indicate a problem with a
+    // connection. If we continue transmitting data over the socket it will
+    // succeed. So we suppress this error and return 'success' to the user's
+    // handler.
+    if (ec.value() == boost::asio::error::in_progress) {
+        remote_handler(boost::system::error_code());
+    } else {
+        remote_handler(ec);
+    }
+}
+
+void
+UnixDomainSocketImpl::asyncSend(const void* data, const size_t length,
+                                const UnixDomainSocket::Handler& handler) {
+    doSend(boost::asio::buffer(data, length), handler);
+}
+
+void
+UnixDomainSocketImpl::doSend(const boost::asio::const_buffers_1& buffer,
+                             const UnixDomainSocket::Handler& handler) {
+    using namespace std::placeholders;
+
+    UnixDomainSocket::Handler local_handler =
+        std::bind(&UnixDomainSocketImpl::sendHandler, shared_from_this(),
+                  handler, buffer, _1, _2);
+    socket_.async_send(buffer, local_handler);
+}
+
+void
+UnixDomainSocketImpl::sendHandler(const UnixDomainSocket::Handler& remote_handler,
+                                  const boost::asio::const_buffers_1& buffer,
+                                  const boost::system::error_code& ec,
+                                  size_t length) {
+    // The asynchronous send may return EWOULDBLOCK or EAGAIN on some
+    // operating systems. In this case, we simply retry hoping that it
+    // will succeed next time. The user's callback never sees these
+    // errors.
+    if ((ec.value() == boost::asio::error::would_block) ||
+        (ec.value() == boost::asio::error::try_again)) {
+        doSend(buffer, remote_handler);
+    }
+    remote_handler(ec, length);
+}
+
+void
+UnixDomainSocketImpl::asyncReceive(void* data, const size_t length,
+                                   const UnixDomainSocket::Handler& handler) {
+    doReceive(boost::asio::buffer(data, length), handler);
+}
+
+void
+UnixDomainSocketImpl::doReceive(const boost::asio::mutable_buffers_1& buffer,
+                                const UnixDomainSocket::Handler& handler) {
+    using namespace std::placeholders;
+
+    UnixDomainSocket::Handler local_handler =
+        std::bind(&UnixDomainSocketImpl::receiveHandler, shared_from_this(),
+                  handler, buffer, _1, _2);
+    socket_.async_receive(buffer, 0, local_handler);
+}
+
+void
+UnixDomainSocketImpl::receiveHandler(const UnixDomainSocket::Handler& remote_handler,
+                                     const boost::asio::mutable_buffers_1& buffer,
+                                     const boost::system::error_code& ec,
+                                     size_t length) {
+    // The asynchronous receive may return EWOULDBLOCK or EAGAIN on some
+    // operating systems. In this case, we simply retry hoping that it
+    // will succeed next time. The user's callback never sees these
+    // errors.
+    if ((ec.value() == boost::asio::error::would_block) ||
+        (ec.value() == boost::asio::error::try_again)) {
+        doReceive(buffer, remote_handler);
+    }
+    remote_handler(ec, length);
+}
+
+void
 UnixDomainSocketImpl::close() {
     static_cast<void>(socket_.close());
 }
@@ -65,6 +272,11 @@ UnixDomainSocket::connect(const std::string& path) {
     }
 }
 
+void
+UnixDomainSocket::asyncConnect(const std::string& path, const ConnectHandler& handler) {
+    impl_->asyncConnect(stream_protocol::endpoint(path.c_str()), handler);
+}
+
 size_t
 UnixDomainSocket::write(const void* data, size_t length) {
     boost::system::error_code ec;
@@ -78,6 +290,12 @@ UnixDomainSocket::write(const void* data, size_t length) {
     return (res);
 }
 
+void
+UnixDomainSocket::asyncSend(const void* data, const size_t length,
+                            const Handler& handler) {
+    impl_->asyncSend(data, length, handler);
+}
+
 size_t
 UnixDomainSocket::receive(void* data, size_t length) {
     boost::system::error_code ec;
@@ -89,9 +307,15 @@ UnixDomainSocket::receive(void* data, size_t length) {
 }
 
 void
+UnixDomainSocket::asyncReceive(void* data, const size_t length,
+                               const Handler& handler) {
+    impl_->asyncReceive(data, length, handler);
+}
+
+void
 UnixDomainSocket::close() {
     impl_->close();
 }
 
-}
-}
+} // end of namespace asiolink
+} // end of namespace isc

+ 37 - 0
src/lib/asiolink/unix_domain_socket.h

@@ -10,6 +10,7 @@
 #include <asiolink/io_service.h>
 #include <asiolink/io_socket.h>
 #include <boost/shared_ptr.hpp>
+#include <functional>
 #include <string>
 
 namespace isc {
@@ -29,6 +30,13 @@ class UnixDomainSocketImpl;
 class UnixDomainSocket : public IOSocket {
 public:
 
+    /// @brief Callback type used in call to @ref UnixDomainSocket::asyncConnect.
+    typedef std::function<void(const boost::system::error_code&)> ConnectHandler;
+
+    /// @brief Callback type used in calls to @ref UnixDomainSocket::asyncSend
+    /// and @ref UnixDomainSocket::asyncReceive.
+    typedef std::function<void(const boost::system::error_code&, size_t)> Handler;
+
     /// @brief Constructor.
     ///
     /// @param io_service Reference to IOService to be used by this
@@ -48,6 +56,15 @@ public:
     /// @throw UnixDomainSocketError if error occurs.
     void connect(const std::string& path);
 
+    /// @brief Asynchronously connects the socket to the specified endpoint.
+    ///
+    /// Always returns immediatelly.
+    ///
+    /// @param path Path to the unix socket to which we should connect.
+    /// @param handler Callback to be invoked when connection is established or
+    /// a connection error is signalled.
+    void asyncConnect(const std::string& path, const ConnectHandler& handler);
+
     /// @brief Writes specified amount of data to a socket.
     ///
     /// @param data Pointer to data to be written.
@@ -57,6 +74,16 @@ public:
     /// @throw UnixDomainSocketError if error occurs.
     size_t write(const void* data, size_t length);
 
+    /// @brief Asynchronously sends data over the socket.
+    ///
+    /// Always returns immediatelly.
+    ///
+    /// @param data Pointer to data to be sent.
+    /// @param length Number of bytes to be sent.
+    /// @param handler Callback to be invoked when data have been sent or
+    /// sending error is signalled.
+    void asyncSend(const void* data, const size_t length, const Handler& handler);
+
     /// @brief Receives data from a socket.
     ///
     /// @param [out] data Pointer to a location into which the read data should
@@ -67,6 +94,16 @@ public:
     /// @throw UnixDomainSocketError if error occurs.
     size_t receive(void* data, size_t length);
 
+    /// @brief Asynchronously receives data over the socket.
+    ///
+    /// Always returns immediatelly.
+    /// @param [out] data Pointer to a location into which the read data should
+    /// be stored.
+    /// @param length Length of the buffer.
+    /// @param handler Callback to be invoked when data have been received or an
+    /// error is signalled.
+    void asyncReceive(void* data, const size_t length, const Handler& handler);
+
     /// @brief Closes the socket.
     void close();
 

+ 3 - 1
src/lib/cc/Makefile.am

@@ -8,9 +8,11 @@ lib_LTLIBRARIES = libkea-cc.la
 libkea_cc_la_SOURCES = data.cc data.h
 libkea_cc_la_SOURCES += cfg_to_element.h dhcp_config_error.h
 libkea_cc_la_SOURCES += command_interpreter.cc command_interpreter.h
+libkea_cc_la_SOURCES += json_feed.cc json_feed.h
 libkea_cc_la_SOURCES += simple_parser.cc simple_parser.h
 
-libkea_cc_la_LIBADD  = $(top_builddir)/src/lib/exceptions/libkea-exceptions.la
+libkea_cc_la_LIBADD  = $(top_builddir)/src/lib/util/libkea-util.la
+libkea_cc_la_LIBADD += $(top_builddir)/src/lib/exceptions/libkea-exceptions.la
 libkea_cc_la_LIBADD += $(BOOST_LIBS)
 
 libkea_cc_la_LDFLAGS = -no-undefined -version-info 2:0:0

+ 312 - 0
src/lib/cc/json_feed.cc

@@ -0,0 +1,312 @@
+// Copyright (C) 2017 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <cc/data.h>
+#include <cc/json_feed.h>
+#include <boost/bind.hpp>
+
+using namespace isc::data;
+using namespace isc::util;
+
+namespace isc {
+namespace config {
+
+const int JSONFeed::RECEIVE_START_ST;
+const int JSONFeed::WHITESPACE_BEFORE_JSON_ST;
+const int JSONFeed::JSON_START_ST;
+const int JSONFeed::INNER_JSON_ST;
+const int JSONFeed::JSON_END_ST;
+const int JSONFeed::FEED_OK_ST;
+const int JSONFeed::FEED_FAILED_ST;
+
+const int JSONFeed::DATA_READ_OK_EVT;
+const int JSONFeed::NEED_MORE_DATA_EVT;
+const int JSONFeed::MORE_DATA_PROVIDED_EVT;
+const int JSONFeed::FEED_OK_EVT;
+const int JSONFeed::FEED_FAILED_EVT;
+
+JSONFeed::JSONFeed()
+    : StateModel(), buffer_(), error_message_(), open_scopes_(0),
+      output_() {
+}
+
+void
+JSONFeed::initModel() {
+    // Intialize dictionaries of events and states.
+    initDictionaries();
+
+    // Set the current state to starting state and enter the run loop.
+    setState(RECEIVE_START_ST);
+
+    // Parsing starts from here.
+    postNextEvent(START_EVT);
+}
+
+void
+JSONFeed::poll() {
+    try {
+        // Process the input data until no more data is available or until
+        // JSON feed ends with matching closing brace.
+        do {
+            getState(getCurrState())->run();
+
+        } while (!isModelDone() && (getNextEvent() != NOP_EVT) &&
+                 (getNextEvent() != NEED_MORE_DATA_EVT));
+    } catch (const std::exception& ex) {
+        abortModel(ex.what());
+    }
+}
+
+bool
+JSONFeed::needData() const {
+    return ((getNextEvent() == NEED_MORE_DATA_EVT) ||
+            (getNextEvent() == START_EVT));
+}
+
+bool
+JSONFeed::feedOk() const {
+    return ((getNextEvent() == END_EVT) &&
+            (getLastEvent() == FEED_OK_EVT));
+}
+
+ElementPtr
+JSONFeed::toElement() const {
+    if (needData()) {
+        isc_throw(JSONFeedError, "unable to retrieve the data form the"
+                  " JSON feed while parsing hasn't finished");
+    }
+    try {
+        return (Element::fromWire(output_));
+
+    } catch (const std::exception& ex) {
+        isc_throw(JSONFeedError, ex.what());
+    }
+}
+
+void
+JSONFeed::postBuffer(const void* buf, const size_t buf_size) {
+    if (buf_size > 0) {
+        // The next event is NEED_MORE_DATA_EVT when the parser wants to
+        // signal that more data is needed. This method is called to supply
+        // more data and thus it should change the next event to
+        // MORE_DATA_PROVIDED_EVT.
+        if (getNextEvent() == NEED_MORE_DATA_EVT) {
+            transition(getCurrState(), MORE_DATA_PROVIDED_EVT);
+        }
+        buffer_.insert(buffer_.end(), static_cast<const char*>(buf),
+                       static_cast<const char*>(buf) + buf_size);
+    }
+}
+
+void
+JSONFeed::defineEvents() {
+    StateModel::defineEvents();
+
+    // Define JSONFeed specific events.
+    defineEvent(DATA_READ_OK_EVT, "DATA_READ_OK_EVT");
+    defineEvent(NEED_MORE_DATA_EVT, "NEED_MORE_DATA_EVT");
+    defineEvent(MORE_DATA_PROVIDED_EVT, "MORE_DATA_PROVIDED_EVT");
+    defineEvent(FEED_OK_EVT, "FEED_OK_EVT");
+    defineEvent(FEED_FAILED_EVT, "FEED_FAILED_EVT");
+}
+
+void
+JSONFeed::verifyEvents() {
+    StateModel::verifyEvents();
+
+    getEvent(DATA_READ_OK_EVT);
+    getEvent(NEED_MORE_DATA_EVT);
+    getEvent(MORE_DATA_PROVIDED_EVT);
+    getEvent(FEED_OK_EVT);
+    getEvent(FEED_FAILED_EVT);
+}
+
+void
+JSONFeed::defineStates() {
+    // Call parent class implementation first.
+    StateModel::defineStates();
+
+    defineState(RECEIVE_START_ST, "RECEIVE_START_ST",
+                boost::bind(&JSONFeed::receiveStartHandler, this));
+    defineState(WHITESPACE_BEFORE_JSON_ST, "WHITESPACE_BEFORE_JSON_ST",
+                boost::bind(&JSONFeed::whiteSpaceBeforeJSONHandler, this));
+    defineState(INNER_JSON_ST, "INNER_JSON_ST",
+                boost::bind(&JSONFeed::innerJSONHandler, this));
+    defineState(JSON_END_ST, "JSON_END_ST",
+                boost::bind(&JSONFeed::endJSONHandler, this));
+}
+
+void
+JSONFeed::feedFailure(const std::string& error_msg) {
+    error_message_ = error_msg + " : " + getContextStr();
+    transition(FEED_FAILED_ST, FEED_FAILED_EVT);
+}
+
+void
+JSONFeed::onModelFailure(const std::string& explanation) {
+    if (error_message_.empty()) {
+        error_message_ = explanation;
+    }
+}
+
+bool
+JSONFeed::popNextFromBuffer(char& next) {
+    // If there are any characters in the buffer, pop next.
+    if (!buffer_.empty()) {
+        next = buffer_.front();
+        buffer_.pop_front();
+        return (true);
+    }
+    return (false);
+}
+
+char
+JSONFeed::getNextFromBuffer() {
+    unsigned int ev = getNextEvent();
+    char c = '\0';
+    // The caller should always provide additional data when the
+    // NEED_MORE_DATA_EVT occurrs. If the next event is still
+    // NEED_MORE_DATA_EVT it indicates that the caller hasn't provided
+    // the data.
+    if (ev == NEED_MORE_DATA_EVT) {
+        isc_throw(JSONFeedError,
+                  "JSONFeed requires new data to progress, but no data"
+                  " have been provided. The transaction is aborted to avoid"
+                  " a deadlock.");
+
+    } else {
+        // Try to pop next character from the buffer.
+        const bool data_exist = popNextFromBuffer(c);
+        if (!data_exist) {
+            // There is no more data so it is really not possible that we're
+            // at MORE_DATA_PROVIDED_EVT.
+            if (ev == MORE_DATA_PROVIDED_EVT) {
+                isc_throw(JSONFeedError,
+                          "JSONFeed state indicates that new data have been"
+                          " provided to be parsed, but the transaction buffer"
+                          " contains no new data.");
+
+            } else {
+                // If there is no more data we should set NEED_MORE_DATA_EVT
+                // event to indicate that new data should be provided.
+                transition(getCurrState(), NEED_MORE_DATA_EVT);
+            }
+        }
+    }
+    return (c);
+}
+
+void
+JSONFeed::invalidEventError(const std::string& handler_name,
+                            const unsigned int event) {
+    isc_throw(JSONFeedError, handler_name << ": "
+              << " invalid event " << getEventLabel(static_cast<int>(event)));
+}
+
+void
+JSONFeed::receiveStartHandler() {
+    char c = getNextFromBuffer();
+    if (getNextEvent() != NEED_MORE_DATA_EVT) {
+        switch(getNextEvent()) {
+        case START_EVT:
+            switch (c) {
+            case '\t':
+            case '\n':
+            case '\r':
+            case ' ':
+                transition(WHITESPACE_BEFORE_JSON_ST, DATA_READ_OK_EVT);
+                return;
+
+            case '{':
+            case '[':
+                output_.push_back(c);
+                ++open_scopes_;
+                transition(INNER_JSON_ST, DATA_READ_OK_EVT);
+                return;
+
+            default:
+                feedFailure("invalid first character " + std::string(1, c));
+            }
+
+        default:
+            invalidEventError("receiveStartHandler", getNextEvent());
+        }
+    }
+}
+
+void
+JSONFeed::whiteSpaceBeforeJSONHandler() {
+    char c = getNextFromBuffer();
+    if (getNextEvent() != NEED_MORE_DATA_EVT) {
+        switch (c) {
+        case '\t':
+        case '\n':
+        case '\r':
+        case ' ':
+            transition(getCurrState(), DATA_READ_OK_EVT);
+            break;
+
+        case '{':
+        case '[':
+            output_.push_back(c);
+            ++open_scopes_;
+            transition(INNER_JSON_ST, DATA_READ_OK_EVT);
+            break;
+
+        default:
+            feedFailure("invalid character " + std::string(1, c));
+        }
+    }
+}
+
+void
+JSONFeed::innerJSONHandler() {
+    char c = getNextFromBuffer();
+    if (getNextEvent() != NEED_MORE_DATA_EVT) {
+        output_.push_back(c);
+
+        switch(c) {
+        case '{':
+        case '[':
+            transition(getCurrState(), DATA_READ_OK_EVT);
+            ++open_scopes_;
+            break;
+
+        case '}':
+        case ']':
+            if (--open_scopes_ == 0) {
+                transition(JSON_END_ST, FEED_OK_EVT);
+
+            } else {
+                transition(getCurrState(), DATA_READ_OK_EVT);
+            }
+            break;
+
+        default:
+            transition(getCurrState(), DATA_READ_OK_EVT);
+        }
+    }
+}
+
+void
+JSONFeed::endJSONHandler() {
+    switch (getNextEvent()) {
+    case FEED_OK_EVT:
+        transition(END_ST, END_EVT);
+        break;
+
+    case FEED_FAILED_EVT:
+        abortModel("reading into JSON feed failed");
+        break;
+
+    default:
+        invalidEventError("endJSONHandler", getNextEvent());
+    }
+}
+
+
+} // end of namespace config
+} // end of namespace isc

+ 276 - 0
src/lib/cc/json_feed.h

@@ -0,0 +1,276 @@
+// Copyright (C) 2017 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef JSON_FEED_H
+#define JSON_FEED_H
+
+#include <cc/data.h>
+#include <exceptions/exceptions.h>
+#include <util/state_model.h>
+#include <boost/shared_ptr.hpp>
+#include <list>
+#include <stdint.h>
+#include <string>
+
+namespace isc {
+namespace config {
+
+class JSONFeed;
+
+/// @brief Pointer to the @ref JSONFeed.
+typedef boost::shared_ptr<JSONFeed> JSONFeedPtr;
+
+/// @brief Pointer to the const @ref JSONFeed.
+typedef boost::shared_ptr<const JSONFeed> ConstJSONFeedPtr;
+
+/// @brief A generic exception thrown upon an error in the @ref JSONFeed.
+class JSONFeedError : public Exception {
+public:
+    JSONFeedError(const char* file, size_t line, const char* what) :
+        isc::Exception(file, line, what) { };
+};
+
+/// @brief State model for asynchronous read of data in JSON format.
+///
+/// Kea control channel uses stream sockets for forwarding commands received
+/// by the Kea Control Agent to respective Kea services. The responses may
+/// contain large amounts of data (e.g. lease queries may return thousands
+/// of leases). Such responses rarely fit into a single data buffer and
+/// require multiple calls to receive/read or asynchronous receive/read.
+///
+/// A receiver performing multiple reads from a socket must be able to
+/// locate the boundaries of the command within the data stream. The
+/// @ref JSONFeed state model solves this problem.
+///
+/// When the partial data is read from the stream socket it should be provided
+/// to the @ref JSONFeed using @ref JSONFeed::postBuffer and then the
+/// @ref JSONFeed::poll should be called to start processing the received
+/// data. The actual JSON structure can be preceded by whitespaces. When first
+/// occurrence of one of the '{' or '[' characters is found in the stream it is
+/// considered a beginning of the JSON structure. The model includes an internal
+/// counter of new '{' and '[' occurrences. The counter increases one of these
+/// characters is found. When any of the '}' or ']' is found, the counter
+/// is decreased. When the counter is decreased to 0 it indicates that the
+/// entire JSON structure has been received and processed.
+///
+/// Note that this mechanism doesn't check if the JSON structure is well
+/// formed. It merely detects the end of the JSON structure if this structure
+/// is well formed. The structure is validated when @ref JSONFeed::toElement
+/// is called to retrieve the data structures encapsulated with
+/// @ref isc::data::Element objects.
+class JSONFeed : public util::StateModel {
+public:
+
+    /// @name States supported by the @ref JSONFeed
+    ///
+    //@{
+
+    /// @brief State indicating a beginning of a feed.
+    static const int RECEIVE_START_ST = SM_DERIVED_STATE_MIN + 1;
+
+    /// @brief Skipping whitespaces before actual JSON.
+    static const int WHITESPACE_BEFORE_JSON_ST = SM_DERIVED_STATE_MIN + 2;
+
+    /// @brief Found first opening brace or square bracket.
+    static const int JSON_START_ST = SM_DERIVED_STATE_MIN + 3;
+
+    /// @brief Parsing JSON.
+    static const int INNER_JSON_ST = SM_DERIVED_STATE_MIN + 4;
+
+    /// @brief Found last closing brace or square bracket.
+    static const int JSON_END_ST = SM_DERIVED_STATE_MIN + 5;
+
+    /// @brief Found opening and closing brace or square bracket.
+    ///
+    /// This doesn't however indicate that the JSON is well formed. It
+    /// only means that matching closing brace or square bracket was
+    /// found.
+    static const int FEED_OK_ST = SM_DERIVED_STATE_MIN + 100;
+
+    /// @brief Invalid syntax detected.
+    ///
+    /// For example, non matching braces or invalid characters found.
+    static const int FEED_FAILED_ST = SM_DERIVED_STATE_MIN + 101;
+
+    //@}
+
+
+    /// @name Events used during data processing.
+    ///
+    //@{
+
+    /// @brief Chunk of data successfully read and parsed.
+    static const int DATA_READ_OK_EVT = SM_DERIVED_EVENT_MIN + 1;
+
+    /// @brief Unable to proceed with parsing until new data is provided.
+    static const int NEED_MORE_DATA_EVT = SM_DERIVED_EVENT_MIN + 2;
+
+    /// @brief New data provided and parsing should continue.
+    static const int MORE_DATA_PROVIDED_EVT = SM_DERIVED_EVENT_MIN + 3;
+
+    /// @brief Found opening brace and the matching closing brace.
+    static const int FEED_OK_EVT = SM_DERIVED_EVENT_MIN + 100;
+
+    /// @brief Invalid syntax detected.
+    static const int FEED_FAILED_EVT = SM_DERIVED_EVENT_MIN + 101;
+
+    //@}
+
+    /// @brief Constructor.
+    JSONFeed();
+
+    /// @brief Initializes state model.
+    ///
+    /// Initializes events and states. It sets the model to @c RECEIVE_START_ST
+    /// and the next event to @c START_EVT.
+    void initModel();
+
+    /// @brief Runs the model as long as data is available.
+    ///
+    /// It processes the input data character by character until it reaches the
+    /// end of the input buffer, in which case it returns. The next event is set
+    /// to @c NEED_MORE_DATA_EVT to indicate the need for providing additional
+    /// data using @ref JSONFeed::postBuffer. This function also returns when
+    /// the end of the JSON structure has been detected or when an error has
+    /// occurred.
+    void poll();
+
+    /// @brief Checks if the model needs additional data to continue.
+    ///
+    /// The caller can use this method to check if the model expects additional
+    /// data to be provided to finish processing input data.
+    ///
+    /// @return true if more data is needed, false otherwise.
+    bool needData() const;
+
+    /// @brief Checks if the data have been successfully processed.
+    bool feedOk() const;
+
+    /// @brief Returns error string when data processing has failed.
+    std::string getErrorMessage() const {
+        return (error_message_);
+    }
+
+    /// @brief Returns processed data as a structure of @ref isc::data::Element
+    /// objects.
+    ///
+    /// @throw JSONFeedError if the received JSON is not well formed.
+    data::ElementPtr toElement() const;
+
+    /// @brief Receives additional data read from a data stream.
+    ///
+    /// A caller invokes this method to pass additional chunk of data received
+    /// from the stream.
+    ///
+    /// @param buf Pointer to a buffer holding additional input data.
+    /// @param buf_size Size of the data in the input buffer.
+    void postBuffer(const void* buf, const size_t buf_size);
+
+
+private:
+
+    /// @brief Make @ref runModel private to make sure that the caller uses
+    /// @ref poll method instead.
+    using StateModel::runModel;
+
+    /// @brief Define events used by the feed.
+    virtual void defineEvents();
+
+    /// @brief Verifies events used by the feed.
+    virtual void verifyEvents();
+
+    /// @brief Defines states of the feed.
+    virtual void defineStates();
+
+    /// @brief Transition to failure state.
+    ///
+    /// This method transitions the model to @ref FEED_FAILED_ST and
+    /// sets next event to FEED_FAILED_EVT.
+    ///
+    /// @param error_msg Error message explaining the failure.
+    void feedFailure(const std::string& error_msg);
+
+    /// @brief A method called when state model fails.
+    ///
+    /// @param explanation Error message explaining the reason for failure.
+    virtual void onModelFailure(const std::string& explanation);
+
+    /// @brief Retrieves next byte of data from the buffer.
+    ///
+    /// During normal operation, when there is no more data in the buffer,
+    /// the NEED_MORE_DATA_EVT is set as next event to signal the need for
+    /// calling @ref JSONFeed::postBuffer.
+    ///
+    /// @throw JSONFeedError If current event is already set to
+    /// NEED_MORE_DATA_EVT or MORE_DATA_PROVIDED_EVT. In the former case, it
+    /// indicates that the caller failed to provide new data using
+    /// @ref JSONFeed::postBuffer. The latter case is highly unlikely
+    /// as it indicates that no new data were provided but the state of the
+    /// parser was changed from NEED_MORE_DATA_EVT or the data were provided
+    /// but the data buffer is empty. In both cases, it is a programming
+    /// error.
+    char getNextFromBuffer();
+
+    /// @brief This method is called when invalid event occurred in a particular
+    /// state.
+    ///
+    /// This method simply throws @ref JSONFeedError informing about invalid
+    /// event occurring for the particular state. The error message includes
+    /// the name of the handler in which the exception has been thrown.
+    /// It also includes the event which caused the exception.
+    ///
+    /// @param handler_name Name of the handler in which the exception is
+    /// thrown.
+    /// @param event An event which caused the exception.
+    ///
+    /// @throw JSONFeedError.
+    void invalidEventError(const std::string& handler_name,
+                           const unsigned int event);
+
+    /// @brief Tries to read next byte from buffer.
+    ///
+    /// @param [out] next A reference to the variable where read data should be
+    /// stored.
+    ///
+    /// @return true if character was successfully read, false otherwise.
+    bool popNextFromBuffer(char& next);
+
+    /// @name State handlers.
+    ///
+    //@{
+
+    /// @brief Handler for RECEIVE_START_ST.
+    void receiveStartHandler();
+
+    /// @brief Handler for WHITESPACE_BEFORE_JSON_ST.
+    void whiteSpaceBeforeJSONHandler();
+
+    /// @brief Handle for the FIRST_BRACE_ST.
+    void innerJSONHandler();
+
+    /// @brief Handler for the JSON_END_ST.
+    void endJSONHandler();
+
+    //@}
+
+    /// @brief Internal buffer from which the feed reads data.
+    std::list<char> buffer_;
+
+    /// @brief Error message set by @ref onModelFailure.
+    std::string error_message_;
+
+    /// @brief A counter increased when '{' or '[' is found and decreased when
+    /// '}' or ']' is found in the stream.
+    uint64_t open_scopes_;
+
+    /// @brief Holds processed data.
+    std::string output_;
+};
+
+} // end of namespace config
+} // end of namespace isc
+
+#endif // JSON_FEED_H

+ 3 - 1
src/lib/cc/tests/Makefile.am

@@ -15,7 +15,9 @@ TESTS =
 if HAVE_GTEST
 TESTS += run_unittests
 run_unittests_SOURCES = command_interpreter_unittests.cc data_unittests.cc
-run_unittests_SOURCES += data_file_unittests.cc run_unittests.cc
+run_unittests_SOURCES += data_file_unittests.cc
+run_unittests_SOURCES += json_feed_unittests.cc
+run_unittests_SOURCES += run_unittests.cc
 run_unittests_SOURCES += simple_parser_unittest.cc
 run_unittests_CPPFLAGS = $(AM_CPPFLAGS) $(GTEST_INCLUDES)
 run_unittests_LDFLAGS = $(AM_LDFLAGS) $(GTEST_LDFLAGS)

+ 174 - 0
src/lib/cc/tests/json_feed_unittests.cc

@@ -0,0 +1,174 @@
+// Copyright (C) 2017 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+#include <cc/data.h>
+#include <cc/json_feed.h>
+#include <gtest/gtest.h>
+#include <sstream>
+#include <string>
+
+using namespace isc::config;
+using namespace isc::data;
+
+namespace {
+
+/// @brief Test fixture class for @ref JSONFeed class.
+class JSONFeedTest : public ::testing::Test {
+public:
+
+    /// @brief Constructor.
+    ///
+    /// Initializes @ref json_map_ and @ref json_list_ which hold reference
+    /// JSON structures.
+    JSONFeedTest()
+        : json_map_(), json_list_() {
+        ElementPtr m = Element::fromJSON(createJSON());
+        ElementPtr l = Element::createList();
+        l->add(m);
+        json_map_ = m;
+        json_list_ = l;
+    }
+
+    /// @brief Creates a JSON map holding 20 elements.
+    ///
+    /// Each map value is a list of 20 elements.
+    std::string createJSON() const {
+        // Create a list of 20 elements.
+        ElementPtr list_element = Element::createList();
+        for (unsigned i = 0; i < 20; ++i) {
+            std::ostringstream s;
+            s << "list_element" << i;
+            list_element->add(Element::create(s.str()));
+        }
+
+        // Create a map of 20 elements. Each map element holds a list
+        // of 20 elements.
+        ElementPtr map_element = Element::createMap();
+        for (unsigned i = 0; i < 20; ++i) {
+            std::ostringstream s;
+            s << "map_element" << i;
+            map_element->set(s.str(), list_element);
+        }
+
+        return (prettyPrint(map_element));
+    }
+
+    /// @brief Test that the JSONFeed correctly recognizes the beginning
+    /// and the end of the JSON structure.
+    ///
+    /// @param input_json A string holding an input JSON structure.
+    /// @param expected_output A structure holding expected output from the
+    /// @ref JSONFeed::toElement.
+    void testRead(const std::string& input_json,
+                  const ConstElementPtr& expected_output) {
+        JSONFeed feed;
+        ASSERT_NO_THROW(feed.initModel());
+
+        // Post the data into the feed in 10 bytes long chunks.
+        size_t chunk = 10;
+
+        for (size_t i = 0; i < input_json.size(); i += chunk) {
+            bool done = false;
+            // When we're near the end of the data stream, the chunk length may
+            // vary.
+            if (i + chunk >= input_json.size()) {
+                chunk = input_json.size() - i;
+                done = true;
+            }
+            // Feed the parser with a data chunk and parse it.
+            feed.postBuffer(&input_json[i], chunk);
+            feed.poll();
+            if (!done) {
+                ASSERT_TRUE(feed.needData());
+            }
+        }
+
+        // Convert parsed/collected data in the feed into the structure of
+        // elements.
+        ConstElementPtr element_from_feed = feed.toElement();
+        EXPECT_TRUE(element_from_feed->equals(*expected_output));
+    }
+
+    /// @brief Test that the @ref JSONFeed signals an error when the input
+    /// string holds invalid data.
+    ///
+    /// @param input_json A string holding an input JSON structire.
+    void testInvalidRead(const std::string& input_json) {
+        JSONFeed feed;
+        ASSERT_NO_THROW(feed.initModel());
+
+        ASSERT_NO_THROW(feed.postBuffer(&input_json[0], input_json.size()));
+        ASSERT_NO_THROW(feed.poll());
+
+        EXPECT_FALSE(feed.needData());
+        EXPECT_FALSE(feed.feedOk());
+    }
+
+    /// @brief JSON map holding a number of lists.
+    ConstElementPtr json_map_;
+
+    /// @brief JSON list holding a map of lists.
+    ConstElementPtr json_list_;
+
+};
+
+// This test verifies that a JSON structure starting with '{' is accepted
+// and parsed.
+TEST_F(JSONFeedTest, startWithBrace) {
+    std::string json = createJSON();
+    testRead(json, json_map_);
+}
+
+// This test verifies that a JSON structure starting with '[' is accepted
+// and parsed.
+TEST_F(JSONFeedTest, startWithSquareBracket) {
+    std::string json = createJSON();
+    json = std::string("[") + json + std::string("]");
+    testRead(json, json_list_);
+}
+
+// This test verifies that input JSON can be preceded with whitespaces.
+TEST_F(JSONFeedTest, startWithWhitespace) {
+    std::string json = createJSON();
+    json = std::string("  \r\r\t  ") + json;
+    testRead(json, json_map_);
+}
+
+// This test verifies that an empty map is accepted and parsed.
+TEST_F(JSONFeedTest, emptyMap) {
+    std::string json = "{}";
+    testRead(json, Element::createMap());
+}
+
+// This test verifies that an empty list is accepted and parsed.
+TEST_F(JSONFeedTest, emptyList) {
+    std::string json = "[  ]";
+    testRead(json, Element::createList());
+}
+
+// This test verifies that an error is signalled when a JSON structure
+// is preceded by invalid character.
+TEST_F(JSONFeedTest, unexpectedCharacter) {
+    std::string json = "a {}";
+    testInvalidRead(json);
+}
+
+// This test verfies that an error is signalled when a JSON structure
+// lacks an opening brace character.
+TEST_F(JSONFeedTest, noOpeningBrace) {
+    std::string json = "\"x\": \"y\" }";
+    testInvalidRead(json);
+}
+
+// This test verifies that an error is signalled when a JSON structure
+// lacks an opening square bracket.
+TEST_F(JSONFeedTest, noOpeningSquareBracket) {
+    std::string json = "\"x\", \"y\" ]";
+    testInvalidRead(json);
+}
+
+} // end of anonymous namespace.

+ 1 - 0
src/lib/config/Makefile.am

@@ -16,6 +16,7 @@ lib_LTLIBRARIES = libkea-cfgclient.la
 libkea_cfgclient_la_SOURCES = config_data.h config_data.cc
 libkea_cfgclient_la_SOURCES += module_spec.h module_spec.cc
 libkea_cfgclient_la_SOURCES += base_command_mgr.cc base_command_mgr.h
+libkea_cfgclient_la_SOURCES += client_connection.cc client_connection.h
 libkea_cfgclient_la_SOURCES += command_mgr.cc command_mgr.h
 libkea_cfgclient_la_SOURCES += command_socket.cc command_socket.h
 libkea_cfgclient_la_SOURCES += command_socket_factory.cc command_socket_factory.h

+ 258 - 0
src/lib/config/client_connection.cc

@@ -0,0 +1,258 @@
+// Copyright (C) 2017 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <asiolink/asio_wrapper.h>
+#include <asiolink/interval_timer.h>
+#include <asiolink/unix_domain_socket.h>
+#include <cc/json_feed.h>
+#include <config/client_connection.h>
+#include <boost/bind.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <array>
+
+using namespace isc::asiolink;
+
+namespace isc {
+namespace config {
+
+/// @brief Implementation of the @ref ClientConnection.
+class ClientConnectionImpl : public boost::enable_shared_from_this<ClientConnectionImpl> {
+public:
+
+    /// @brief Constructor.
+    ///
+    /// @param io_service Reference to the IO service.
+    explicit ClientConnectionImpl(IOService& io_service);
+
+    /// @brief Starts asynchronous transaction with a remote endpoint.
+    ///
+    /// See @ref ClientConnection::start documentation for the details.
+    ///
+    /// @param socket_path Path to the socket description that the server
+    /// is bound to.
+    /// @param command Control command to be sent to the server.
+    /// @param handler Pointer to the user suppiled callback function which
+    /// should be invoked when transaction completes or when an error has
+    /// occurred during the transaction.
+    /// @param timeout Connection timeout in milliseconds.
+    void start(const ClientConnection::SocketPath& socket_path,
+               const ClientConnection::ControlCommand& command,
+               ClientConnection::Handler handler,
+               const ClientConnection::Timeout& timeout);
+
+    /// @brief Closes the socket.
+    void stop();
+
+    /// @brief Starts asynchronous send.
+    ///
+    /// This method may be called multiple times internally when the command
+    /// is large and can't be sent all at once.
+    ///
+    /// @param buffer Pointer to the buffer holding input data.
+    /// @param length Length of the data in the input buffer.
+    /// @param handler User supplied callback invoked after the chunk of data
+    /// has been sent.
+    void doSend(const void* buffer, const size_t length,
+                ClientConnection::Handler handler);
+
+    /// @brief Starts asynchronous receive from the server.
+    ///
+    /// This method may be called multiple times internally if the response
+    /// is large. The @ref JSONFeed instance is used to detect the boundaries
+    /// of the command within the stream. Once the entire command has been
+    /// received the user callback is invoked and the instance of the
+    /// @ref JSONFeed is returned.
+    ///
+    /// @param handler User supplied callback.
+    void doReceive(ClientConnection::Handler handler);
+
+    /// @brief Terminates the connection and invokes a user callback indicating
+    /// an error.
+    ///
+    /// @param ec Error code.
+    /// @param handler User callback.
+    void terminate(const boost::system::error_code& ec,
+                   ClientConnection::Handler handler);
+
+    /// @brief Callback invoked when the timeout occurs.
+    ///
+    /// It calls @ref terminate with the @c boost::asio::error::timed_out.
+    void timeoutCallback(ClientConnection::Handler handler);
+
+private:
+
+    /// @brief Unix domain socket used for communication with a server.
+    UnixDomainSocket socket_;
+
+    /// @brief Pointer to the @ref JSONFeed holding a response.
+    ///
+    ///It may be a null pointer until some part of a response has been received.
+    JSONFeedPtr feed_;
+
+    /// @brief Holds the entire command being transmitted over the unix
+    /// socket.
+    std::string current_command_;
+
+    /// @brief Buffer into which chunks of the response are received.
+    std::array<char, 1024> read_buf_;
+
+    /// @brief Instance of the interval timer protecting against timeouts.
+    IntervalTimer timer_;
+};
+
+ClientConnectionImpl::ClientConnectionImpl(IOService& io_service)
+    : socket_(io_service), feed_(), current_command_(), timer_(io_service) {
+}
+
+void
+ClientConnectionImpl::start(const ClientConnection::SocketPath& socket_path,
+                            const ClientConnection::ControlCommand& command,
+                            ClientConnection::Handler handler,
+                            const ClientConnection::Timeout& timeout) {
+    // Start the timer protecting against timeouts.
+    timer_.setup(boost::bind(&ClientConnectionImpl::timeoutCallback,
+                             this, handler),
+                 timeout.timeout_, IntervalTimer::ONE_SHOT);
+
+    // Store the command in the class member to make sure it is valid
+    // the entire time.
+    current_command_.assign(command.control_command_);
+
+    // Pass self to lambda to make sure that the instance of this class
+    // lives as long as the lambda is held for async connect.
+    auto self(shared_from_this());
+    // Start asynchronous connect. This will return immediatelly.
+    socket_.asyncConnect(socket_path.socket_path_,
+    [this, self, command, handler](const boost::system::error_code& ec) {
+        // We failed to connect so we can't proceed. Simply clean up
+        // and invoke the user callback to signal an error.
+        if (ec) {
+            // This doesn't throw.
+            terminate(ec, handler);
+
+        } else {
+            // Connection successful. Transmit the command to the remote
+            // endpoint asynchronously.
+            doSend(current_command_.c_str(), current_command_.length(),
+                   handler);
+        }
+    });
+}
+
+void
+ClientConnectionImpl::doSend(const void* buffer, const size_t length,
+                             ClientConnection::Handler handler) {
+    // Pass self to lambda to make sure that the instance of this class
+    // lives as long as the lambda is held for async send.
+    auto self(shared_from_this());
+    // Start asynchronous transmission of the command. This will return
+    // immediatelly.
+    socket_.asyncSend(buffer, length,
+        [this, self, buffer, length, handler]
+        (const boost::system::error_code& ec, size_t bytes_transferred) {
+        // An error has occurred while sending. Close the connection and
+        // signal an error.
+        if (ec) {
+            // This doesn't throw.
+            terminate(ec, handler);
+
+        } else {
+            // If the number of bytes we have managed to send so far is
+            // lower than the amount of data we're trying to send, we
+            // have to schedule another send to deliver the rest of
+            // the data.
+            if (bytes_transferred < length) {
+                doSend(static_cast<const char*>(buffer) + bytes_transferred,
+                       length - bytes_transferred, handler);
+
+            } else {
+                // We have sent all the data. Start receiving a response.
+                doReceive(handler);
+            }
+        }
+    });
+}
+
+void
+ClientConnectionImpl::doReceive(ClientConnection::Handler handler) {
+    // Pass self to lambda to make sure that the instance of this class
+    // lives as long as the lambda is held for async receive.
+    auto self(shared_from_this());
+    socket_.asyncReceive(&read_buf_[0], read_buf_.size(),
+        [this, self, handler]
+        (const boost::system::error_code& ec, size_t length) {
+        // An error has occurred while receiving the data. Close the connection
+        // and signal an error.
+        if (ec) {
+            // This doesn't throw.
+            terminate(ec, handler);
+
+        } else {
+            std::string x(&read_buf_[0], length);
+            // Lazy initialization of the JSONFeed. The feed will be "parsing"
+            // received JSON stream and will detect when the whole response
+            // has been received.
+            if (!feed_) {
+                feed_.reset(new JSONFeed());
+                feed_->initModel();
+            }
+            // Put everything we have received so far into the feed and process
+            // the data.
+            feed_->postBuffer(&read_buf_[0], length);
+            feed_->poll();
+            // If the feed indicates that only a part of the response has been
+            // received, schedule another receive to get more data.
+            if (feed_->needData()) {
+                doReceive(handler);
+
+            } else {
+                // We have received the entire response, let's call the handler
+                // and indicate success.
+                terminate(ec, handler);
+            }
+        }
+    });
+}
+
+void
+ClientConnectionImpl::terminate(const boost::system::error_code& ec,
+                                ClientConnection::Handler handler) {
+    try {
+        timer_.cancel();
+        socket_.close();
+        current_command_.clear();
+        handler(ec, feed_);
+
+    } catch (...) {
+        // None of these operations should throw. In particular, the handler
+        // should not throw but if it has been misimplemented, we want to make
+        // sure we don't emit any exceptions from here.
+    }
+}
+
+void
+ClientConnectionImpl::timeoutCallback(ClientConnection::Handler handler) {
+    // Timeout has occurred. The remote server didn't provide the entire
+    // response within the given time frame. Let's close the connection
+    // and signal the timeout.
+    terminate(boost::asio::error::timed_out, handler);
+}
+
+ClientConnection::ClientConnection(asiolink::IOService& io_service)
+    : impl_(new ClientConnectionImpl(io_service)) {
+}
+
+void
+ClientConnection::start(const ClientConnection::SocketPath& socket_path,
+                        const ClientConnection::ControlCommand& command,
+                        ClientConnection::Handler handler,
+                        const ClientConnection::Timeout& timeout) {
+    impl_->start(socket_path, command, handler, timeout);
+}
+
+
+} // end of namespace config
+} // end of namespace isc

+ 157 - 0
src/lib/config/client_connection.h

@@ -0,0 +1,157 @@
+// Copyright (C) 2017 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef CLIENT_CONNECTION_H
+#define CLIENT_CONNECTION_H
+
+#include <asiolink/io_service.h>
+#include <cc/json_feed.h>
+#include <boost/shared_ptr.hpp>
+#include <functional>
+
+namespace isc {
+namespace config {
+
+class ClientConnectionImpl;
+
+/// @brief Represents client side connection over the unix domain socket.
+///
+/// This class represents a client side connection between the controlling
+/// client and the server exposing control API over a unix domain socket.
+/// In particular, this class is used by the Kea Control Agent to establish
+/// connections with respective Kea services to forward received commands.
+/// As of Kea 1.2 the servers can handle a single connection at the time.
+/// In the future, we're planning to support multiple simulatenous connections.
+/// In this case, each connection will be handled by a unique instance of the
+/// @ref ClientConnection class.
+///
+/// The @ref ClientConnection supports asynchronous connections. A caller
+/// creates an instance of the @ref ClientConnection and calls
+/// @ref ClientConnection::start to start asynchronous communication with
+/// a remote server. The caller provides a pointer to the callback function
+/// (handler) which will be called when the communication with the server
+/// completes, i.e. the command is sent to the server and the response
+/// from the server is received. If an error occurs, the callback is
+/// invoked with an error code indicating a reason for the failure.
+///
+/// The documentation of the @ref ClientConnection::start explains the
+/// sequence of operations performed by this class.
+///
+/// Even though the @ref ClientConnection is asynchronous in nature, it
+/// can also be used in cases requiring synchronous communication. As it
+/// has been already mentioned, the servers in Kea 1.2 do not support
+/// multiple concurrent connections. The following pseudo code demonstrates
+/// how to perform synchronous transaction using this class.
+///
+/// @code
+/// IOService io_service;
+/// ClientConnection conn(io_service);
+/// bool cb_invoked = false;
+/// conn.start(ClientConnection::SocketPath("/tmp/kea.sock"),
+///            ClientConnection::ControlCommand(command),
+///            [this, &cb_invoked](const boost::system::error_code& ec,
+///                   const ConstJSONFeedPtr& feed) {
+///                cb_invoked = true;
+///                if (ec) {
+///                    ... handle error here ...
+///                } else {
+///                    ... use feed to retrieve the response ...
+///                }
+///            }
+/// );
+/// while (!cb_invoked) {
+///     io_service.run_one();
+/// }
+/// @endcode
+///
+class ClientConnection {
+public:
+
+    /// @name Structures used for strong typing.
+    ///
+    //@{
+
+    /// @brief Encapsulates socket path.
+    struct SocketPath {
+        explicit SocketPath(const std::string& socket_path)
+            : socket_path_(socket_path) { }
+
+        std::string socket_path_;
+    };
+
+    /// @brief Encapsulates control command.
+    struct ControlCommand {
+        explicit ControlCommand(const std::string control_command)
+            : control_command_(control_command) { }
+
+        std::string control_command_;
+    };
+
+    /// @brief Encapsulates timeout value.
+    struct Timeout {
+        explicit Timeout(const long timeout)
+            : timeout_(timeout) { }
+
+        long timeout_;
+    };
+
+    //@}
+
+    /// @brief Type of the callback invoked when the communication with
+    /// the server is complete or an error has occurred.
+    typedef std::function<void(const boost::system::error_code& ec,
+                               const ConstJSONFeedPtr& feed)> Handler;
+
+    /// @brief Constructor.
+    ///
+    /// @param io_service Reference to the IO service.
+    explicit ClientConnection(asiolink::IOService& io_service);
+
+    /// @brief Starts asynchronous transaction with a remote endpoint.
+    ///
+    /// Starts asynchronous connection with the remote endpoint. If the
+    /// connection is successful, the control command is asynchronously
+    /// sent to the remote endpoint. When the entire command has been sent,
+    /// the response is read asynchronously, possibly in multiple chunks.
+    ///
+    /// The timeout is specified for the entire transaction in milliseconds.
+    /// If the transaction takes longer than the timeout value the connection
+    /// is closed and the callback is called with the error code of
+    /// @c boost::asio::error::timed_out.
+    ///
+    /// In other cases, the callback is called with the error code returned
+    /// by the boost asynchronous operations. If the transaction is successful
+    /// the 'success' status is indicated with the error code. In addition
+    /// the instance of the @ref JSONFeed is returned to the caller. It can
+    /// be used to retrieve parsed response from the server. Note that the
+    /// response may still be malformed, even if no error is signalled in
+    /// the handler. The @ref JSONFeed::toElement will return a parsing
+    /// error if the JSON appears to be malformed.
+    ///
+    /// @param socket_path Path to the socket description that the server
+    /// is bound to.
+    /// @param command Control command to be sent to the server.
+    /// @param handler Pointer to the user supplied callback function which
+    /// should be invoked when transaction completes or when an error has
+    /// occurred during the transaction.
+    /// @param timeout Connection timeout in milliseconds.
+    void start(const SocketPath& socket_path, const ControlCommand& command,
+               Handler handler, const Timeout& timeout = Timeout(5000));
+
+private:
+
+    /// @brief Pointer to the implementation.
+    boost::shared_ptr<ClientConnectionImpl> impl_;
+
+};
+
+/// @brief Type of the pointer to the @ref ClientConnection object.
+typedef boost::shared_ptr<ClientConnection> ClientConnectionPtr;
+
+} // end of namespace config
+} // end of namespace isc
+
+#endif // CLIENT_CONNECTION_H

+ 3 - 1
src/lib/config/tests/Makefile.am

@@ -19,6 +19,7 @@ TESTS =
 if HAVE_GTEST
 TESTS += run_unittests
 run_unittests_SOURCES = module_spec_unittests.cc
+run_unittests_SOURCES += client_connection_unittests.cc
 run_unittests_SOURCES += command_socket_factory_unittests.cc
 run_unittests_SOURCES += config_data_unittests.cc run_unittests.cc
 run_unittests_SOURCES += command_mgr_unittests.cc
@@ -26,7 +27,8 @@ run_unittests_SOURCES += command_mgr_unittests.cc
 run_unittests_CPPFLAGS = $(AM_CPPFLAGS) $(GTEST_INCLUDES)
 run_unittests_LDFLAGS = $(AM_LDFLAGS) $(CRYPTO_LDFLAGS) $(GTEST_LDFLAGS)
 
-run_unittests_LDADD = $(top_builddir)/src/lib/config/libkea-cfgclient.la
+run_unittests_LDADD  = $(top_builddir)/src/lib/asiolink/testutils/libasiolinktest.la
+run_unittests_LDADD += $(top_builddir)/src/lib/config/libkea-cfgclient.la
 run_unittests_LDADD += $(top_builddir)/src/lib/dhcp/libkea-dhcp++.la
 run_unittests_LDADD += $(top_builddir)/src/lib/asiolink/libkea-asiolink.la
 run_unittests_LDADD += $(top_builddir)/src/lib/cc/libkea-cc.la

+ 186 - 0
src/lib/config/tests/client_connection_unittests.cc

@@ -0,0 +1,186 @@
+// Copyright (C) 2017 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+#include <asiolink/asio_wrapper.h>
+#include <asiolink/io_service.h>
+#include <asiolink/testutils/test_server_unix_socket.h>
+#include <cc/json_feed.h>
+#include <config/client_connection.h>
+#include <gtest/gtest.h>
+#include <cstdlib>
+#include <sstream>
+#include <string>
+
+using namespace isc::asiolink;
+using namespace isc::config;
+
+namespace {
+
+/// @brief Test unix socket file name.
+const std::string TEST_SOCKET = "test-socket";
+
+/// @brief Test timeout in ms.
+const long TEST_TIMEOUT = 10000;
+
+/// Test fixture class for @ref ClientConnection.
+class ClientConnectionTest : public ::testing::Test {
+public:
+
+    /// @brief Constructor.
+    ///
+    /// Removes unix socket descriptor before the test.
+    ClientConnectionTest() :
+        io_service_(),
+        test_socket_(new test::TestServerUnixSocket(io_service_,
+                                                    unixSocketFilePath())) {
+        removeUnixSocketFile();
+    }
+
+    /// @brief Destructor.
+    ///
+    /// Removes unix socket descriptor after the test.
+    virtual ~ClientConnectionTest() {
+        removeUnixSocketFile();
+    }
+
+    /// @brief Returns socket file path.
+    ///
+    /// If the KEA_SOCKET_TEST_DIR environment variable is specified, the
+    /// socket file is created in the location pointed to by this variable.
+    /// Otherwise, it is created in the build directory.
+    ///
+    /// The KEA_SOCKET_TEST_DIR is typically used to overcome the problem of
+    /// a system limit on the unix socket file path (usually 102 or 103 characters).
+    /// When Kea build is located in the nested directories with absolute path
+    /// exceeding this limit, the test system should be configured to set
+    /// the KEA_SOCKET_TEST_DIR environmental variable to point to an alternative
+    /// location, e.g. /tmp, with an absolute path length being within the
+    /// allowed range.
+    static std::string unixSocketFilePath() {
+        std::ostringstream s;
+        const char* env = getenv("KEA_SOCKET_TEST_DIR");
+        if (env) {
+            s << std::string(env);
+        } else {
+            s << TEST_DATA_BUILDDIR;
+        }
+
+        s << "/" << TEST_SOCKET;
+        return (s.str());
+    }
+
+    /// @brief Removes unix socket descriptor.
+    void removeUnixSocketFile() {
+        static_cast<void>(remove(unixSocketFilePath().c_str()));
+    }
+
+    /// @brief IO service used by the tests.
+    IOService io_service_;
+
+    /// @brief Server side unix socket used in these tests.
+    test::TestServerUnixSocketPtr test_socket_;
+};
+
+// Tests successful transaction: connect, send command and receive a
+// response.
+TEST_F(ClientConnectionTest, success) {
+    // Start timer protecting against test timeouts.
+    test_socket_->startTimer(TEST_TIMEOUT);
+
+    // Start the server.
+    test_socket_->bindServerSocket();
+    test_socket_->generateCustomResponse(2048);
+
+    // Create some valid command.
+    std::string command = "{ \"command\": \"list-commands\" }";
+
+    ClientConnection conn(io_service_);
+
+    // This boolean value will indicate when the callback function is invoked
+    // at the end of the transaction (whether it is successful or unsuccessful).
+    bool handler_invoked = false;
+    conn.start(ClientConnection::SocketPath(unixSocketFilePath()),
+               ClientConnection::ControlCommand(command),
+        [this, &handler_invoked](const boost::system::error_code& ec,
+                                 const ConstJSONFeedPtr& feed) {
+        // Indicate that the handler has been called to break from the
+        // while loop below.
+        handler_invoked = true;
+        // The ec should contain no error.
+        ASSERT_FALSE(ec);
+        // The JSONFeed should be present and it should contain a valid
+        // response.
+        ASSERT_TRUE(feed);
+        EXPECT_TRUE(feed->feedOk()) << feed->getErrorMessage();
+    });
+    // Run the connection.
+    while (!handler_invoked && !test_socket_->isStopped()) {
+        io_service_.run_one();
+    }
+}
+
+// This test checks that a timeout is signalled when the communication
+// takes too long.
+TEST_F(ClientConnectionTest, timeout) {
+    // The server will return only partial JSON response (lacking closing
+    // brace). The client will wait for closing brace and eventually the
+    // connection should time out.
+    test_socket_.reset(new test::TestServerUnixSocket(io_service_,
+                                                      unixSocketFilePath(),
+                                                      "{ \"command\": \"foo\""));
+    test_socket_->startTimer(TEST_TIMEOUT);
+
+    // Start the server.
+    test_socket_->bindServerSocket();
+
+    // Command to be sent to the server.
+    std::string command = "{ \"command\": \"list-commands\" }";
+
+    ClientConnection conn(io_service_);
+
+    // This boolean value will be set to true when the callback is invoked.
+    bool handler_invoked = false;
+    conn.start(ClientConnection::SocketPath(unixSocketFilePath()),
+              ClientConnection::ControlCommand(command),
+    [this, &handler_invoked](const boost::system::error_code& ec,
+                             const ConstJSONFeedPtr& feed) {
+        // Indicate that the callback has been invoked to break the loop
+        // below.
+        handler_invoked = true;
+        ASSERT_TRUE(ec);
+        EXPECT_TRUE(ec.value() == boost::asio::error::timed_out);
+    }, ClientConnection::Timeout(1000));
+
+    while (!handler_invoked && !test_socket_->isStopped()) {
+        io_service_.run_one();
+    }
+}
+
+// This test checks that an error is returned when the client is unable
+// to connect to the server.
+TEST_F(ClientConnectionTest, connectionError) {
+    // Create the new connection but do not bind the server socket.
+    // The connection should be refused and an error returned.
+    ClientConnection conn(io_service_);
+
+    std::string command = "{ \"command\": \"list-commands\" }";
+
+    bool handler_invoked = false;
+    conn.start(ClientConnection::SocketPath(unixSocketFilePath()),
+               ClientConnection::ControlCommand(command),
+    [this, &handler_invoked](const boost::system::error_code& ec,
+           const ConstJSONFeedPtr& feed) {
+        handler_invoked = true;
+        ASSERT_TRUE(ec);
+    });
+
+    while (!handler_invoked && !test_socket_->isStopped()) {
+        io_service_.run_one();
+    }
+}
+
+} // end of anonymous namespace