|
@@ -5,6 +5,7 @@
|
|
|
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
|
|
#include <asiolink/asio_wrapper.h>
|
|
|
+#include <asiolink/interval_timer.h>
|
|
|
#include <asiolink/io_service.h>
|
|
|
#include <asiolink/unix_domain_socket.h>
|
|
|
#include <asiolink/unix_domain_socket_acceptor.h>
|
|
@@ -12,6 +13,7 @@
|
|
|
#include <config/command_mgr.h>
|
|
|
#include <cc/data.h>
|
|
|
#include <cc/command_interpreter.h>
|
|
|
+#include <cc/json_feed.h>
|
|
|
#include <dhcp/iface_mgr.h>
|
|
|
#include <config/config_log.h>
|
|
|
#include <boost/bind.hpp>
|
|
@@ -26,6 +28,12 @@ using namespace isc::data;
|
|
|
|
|
|
namespace {
|
|
|
|
|
|
+/// @brief Maximum size of the data chunk sent/received over the socket.
|
|
|
+const size_t BUF_SIZE = 8192;
|
|
|
+
|
|
|
+/// @brief Default connection timeout in seconds.
|
|
|
+const unsigned short DEFAULT_CONNECTION_TIMEOUT = 10;
|
|
|
+
|
|
|
class ConnectionPool;
|
|
|
|
|
|
/// @brief Represents a single connection over control socket.
|
|
@@ -40,26 +48,40 @@ public:
|
|
|
/// This constructor registers a socket of this connection in the Interface
|
|
|
/// Manager to cause the blocking call to @c select() to return as soon as
|
|
|
/// a transmission over the control socket is received.
|
|
|
- Connection(const boost::shared_ptr<UnixDomainSocket>& socket,
|
|
|
- ConnectionPool& connection_pool)
|
|
|
- : socket_(socket), connection_pool_(connection_pool),
|
|
|
+ ///
|
|
|
+ /// @param io_service IOService object used to handle the asio operations
|
|
|
+ /// @param socket Pointer to the object representing a socket which is used
|
|
|
+ /// for data transmission.
|
|
|
+ /// @param connection_pool Reference to the connection pool to which this
|
|
|
+ /// connection belongs.
|
|
|
+ /// @param timeout Connection timeout (in seconds).
|
|
|
+ Connection(const IOServicePtr& io_service,
|
|
|
+ const boost::shared_ptr<UnixDomainSocket>& socket,
|
|
|
+ ConnectionPool& connection_pool,
|
|
|
+ const unsigned short timeout)
|
|
|
+ : socket_(socket), timeout_timer_(*io_service), timeout_(timeout),
|
|
|
+ buf_(), response_(), connection_pool_(connection_pool), feed_(),
|
|
|
response_in_progress_(false) {
|
|
|
+
|
|
|
+ LOG_INFO(command_logger, COMMAND_SOCKET_CONNECTION_OPENED)
|
|
|
+ .arg(socket_->getNative());
|
|
|
+
|
|
|
// Callback value of 0 is used to indicate that callback function is
|
|
|
// not installed.
|
|
|
isc::dhcp::IfaceMgr::instance().addExternalSocket(socket_->getNative(), 0);
|
|
|
+ // Initialize state model for receiving and preparsing commands.
|
|
|
+ feed_.initModel();
|
|
|
+
|
|
|
+ // Start timer for detecting timeouts.
|
|
|
+ timeout_timer_.setup(boost::bind(&Connection::timeoutHandler, this),
|
|
|
+ timeout_ * 1000, IntervalTimer::ONE_SHOT);
|
|
|
}
|
|
|
|
|
|
- /// @brief Start asynchronous read over the unix domain socket.
|
|
|
+ /// @brief Destructor.
|
|
|
///
|
|
|
- /// This method doesn't block. Once the transmission is received over the
|
|
|
- /// socket, the @c Connection::receiveHandler callback is invoked to
|
|
|
- /// process received data.
|
|
|
- void start() {
|
|
|
- socket_->asyncReceive(&buf_[0], sizeof(buf_),
|
|
|
- boost::bind(&Connection::receiveHandler,
|
|
|
- shared_from_this(), _1, _2));
|
|
|
-
|
|
|
-
|
|
|
+ /// Cancels timeout timer if one is scheduled.
|
|
|
+ ~Connection() {
|
|
|
+ timeout_timer_.cancel();
|
|
|
}
|
|
|
|
|
|
/// @brief Close current connection.
|
|
@@ -75,28 +97,99 @@ public:
|
|
|
|
|
|
isc::dhcp::IfaceMgr::instance().deleteExternalSocket(socket_->getNative());
|
|
|
socket_->close();
|
|
|
+ timeout_timer_.cancel();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /// @brief Gracefully terminates current connection.
|
|
|
+ ///
|
|
|
+ /// This method should be called prior to closing the socket to initiate
|
|
|
+ /// graceful shutdown.
|
|
|
+ void terminate();
|
|
|
+
|
|
|
+ /// @brief Start asynchronous read over the unix domain socket.
|
|
|
+ ///
|
|
|
+ /// This method doesn't block. Once the transmission is received over the
|
|
|
+ /// socket, the @c Connection::receiveHandler callback is invoked to
|
|
|
+ /// process received data.
|
|
|
+ void doReceive() {
|
|
|
+ socket_->asyncReceive(&buf_[0], sizeof(buf_),
|
|
|
+ boost::bind(&Connection::receiveHandler,
|
|
|
+ shared_from_this(), _1, _2));
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /// @brief Starts asynchronous send over the unix domain socket.
|
|
|
+ ///
|
|
|
+ /// This method doesn't block. Once the send operation (that covers the whole
|
|
|
+ /// data if it's small or first BUF_SIZE bytes if its large) is completed, the
|
|
|
+ /// @c Connection::sendHandler callback is invoked. That handler will either
|
|
|
+ /// close the connection gracefully if all data has been sent, or will
|
|
|
+ /// call @ref doSend() again to send the next chunk of data.
|
|
|
+ void doSend() {
|
|
|
+ size_t chunk_size = (response_.size() < BUF_SIZE) ? response_.size() : BUF_SIZE;
|
|
|
+ socket_->asyncSend(&response_[0], chunk_size,
|
|
|
+ boost::bind(&Connection::sendHandler, shared_from_this(), _1, _2));
|
|
|
+ }
|
|
|
+
|
|
|
/// @brief Handler invoked when the data is received over the control
|
|
|
/// socket.
|
|
|
///
|
|
|
+ /// It collects received data into the @c isc::config::JSONFeed object and
|
|
|
+ /// schedules additional asynchronous read of data if this object signals
|
|
|
+ /// that command is incomplete. When the entire command is received, the
|
|
|
+ /// handler processes this command and asynchronously responds to the
|
|
|
+ /// controlling client.
|
|
|
+ //
|
|
|
+ ///
|
|
|
/// @param ec Error code.
|
|
|
/// @param bytes_transferred Number of bytes received.
|
|
|
void receiveHandler(const boost::system::error_code& ec,
|
|
|
size_t bytes_transferred);
|
|
|
|
|
|
+
|
|
|
+ /// @brief Handler invoked when the data is sent over the control socket.
|
|
|
+ ///
|
|
|
+ /// If there are still data to be sent, another asynchronous send is
|
|
|
+ /// scheduled. When the entire command is sent, the connection is shutdown
|
|
|
+ /// and closed.
|
|
|
+ ///
|
|
|
+ /// @param ec Error code.
|
|
|
+ /// @param bytes_transferred Number of bytes sent.
|
|
|
+ void sendHandler(const boost::system::error_code& ec,
|
|
|
+ size_t bytes_trasferred);
|
|
|
+
|
|
|
+ /// @brief Handler invoked when timeout has occurred.
|
|
|
+ ///
|
|
|
+ /// Asynchronously sends a response to the client indicating that the
|
|
|
+ /// timeout has occurred.
|
|
|
+ void timeoutHandler();
|
|
|
+
|
|
|
private:
|
|
|
|
|
|
/// @brief Pointer to the socket used for transmission.
|
|
|
boost::shared_ptr<UnixDomainSocket> socket_;
|
|
|
|
|
|
+ /// @brief Interval timer used to detect connection timeouts.
|
|
|
+ IntervalTimer timeout_timer_;
|
|
|
+
|
|
|
+ /// @brief Connection timeout (in seconds)
|
|
|
+ unsigned short timeout_;
|
|
|
+
|
|
|
/// @brief Buffer used for received data.
|
|
|
- std::array<char, 65535> buf_;
|
|
|
+ std::array<char, BUF_SIZE> buf_;
|
|
|
+
|
|
|
+ /// @brief Response created by the server.
|
|
|
+ std::string response_;
|
|
|
|
|
|
/// @brief Reference to the pool of connections.
|
|
|
ConnectionPool& connection_pool_;
|
|
|
|
|
|
+ /// @brief State model used to receive data over the connection and detect
|
|
|
+ /// when the command ends.
|
|
|
+ JSONFeed feed_;
|
|
|
+
|
|
|
/// @brief Boolean flag indicating if the request to stop connection is a
|
|
|
/// result of server reconfiguration.
|
|
|
bool response_in_progress_;
|
|
@@ -114,7 +207,7 @@ public:
|
|
|
///
|
|
|
/// @param connection Pointer to the new connection object.
|
|
|
void start(const ConnectionPtr& connection) {
|
|
|
- connection->start();
|
|
|
+ connection->doReceive();
|
|
|
connections_.insert(connection);
|
|
|
}
|
|
|
|
|
@@ -122,8 +215,13 @@ public:
|
|
|
///
|
|
|
/// @param connection Pointer to the new connection object.
|
|
|
void stop(const ConnectionPtr& connection) {
|
|
|
- connection->stop();
|
|
|
- connections_.erase(connection);
|
|
|
+ try {
|
|
|
+ connection->stop();
|
|
|
+ connections_.erase(connection);
|
|
|
+ } catch (const std::exception& ex) {
|
|
|
+ LOG_ERROR(command_logger, COMMAND_SOCKET_CONNECTION_CLOSE_FAIL)
|
|
|
+ .arg(ex.what());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/// @brief Stops all connections which are allowed to stop.
|
|
@@ -135,10 +233,6 @@ public:
|
|
|
connections_.clear();
|
|
|
}
|
|
|
|
|
|
- size_t getConnectionsNum() const {
|
|
|
- return (connections_.size());
|
|
|
- }
|
|
|
-
|
|
|
private:
|
|
|
|
|
|
/// @brief Pool of connections.
|
|
@@ -146,6 +240,16 @@ private:
|
|
|
|
|
|
};
|
|
|
|
|
|
+void
|
|
|
+Connection::terminate() {
|
|
|
+ try {
|
|
|
+ socket_->shutdown();
|
|
|
+
|
|
|
+ } catch (const std::exception& ex) {
|
|
|
+ LOG_ERROR(command_logger, COMMAND_SOCKET_CONNECTION_SHUTDOWN_FAIL)
|
|
|
+ .arg(ex.what());
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
void
|
|
|
Connection::receiveHandler(const boost::system::error_code& ec,
|
|
@@ -156,15 +260,13 @@ Connection::receiveHandler(const boost::system::error_code& ec,
|
|
|
// connection pool.
|
|
|
LOG_INFO(command_logger, COMMAND_SOCKET_CLOSED_BY_FOREIGN_HOST)
|
|
|
.arg(socket_->getNative());
|
|
|
- connection_pool_.stop(shared_from_this());
|
|
|
|
|
|
} else if (ec.value() != boost::asio::error::operation_aborted) {
|
|
|
LOG_ERROR(command_logger, COMMAND_SOCKET_READ_FAIL)
|
|
|
.arg(ec.value()).arg(socket_->getNative());
|
|
|
}
|
|
|
|
|
|
- /// @todo: Should we close the connection, similar to what is already
|
|
|
- /// being done for bytes_transferred == 0.
|
|
|
+ connection_pool_.stop(shared_from_this());
|
|
|
return;
|
|
|
|
|
|
} else if (bytes_transferred == 0) {
|
|
@@ -176,21 +278,35 @@ Connection::receiveHandler(const boost::system::error_code& ec,
|
|
|
LOG_DEBUG(command_logger, DBG_COMMAND, COMMAND_SOCKET_READ)
|
|
|
.arg(bytes_transferred).arg(socket_->getNative());
|
|
|
|
|
|
- ConstElementPtr cmd, rsp;
|
|
|
+ ConstElementPtr rsp;
|
|
|
|
|
|
try {
|
|
|
+ // Received some data over the socket. Append them to the JSON feed
|
|
|
+ // to see if we have reached the end of command.
|
|
|
+ feed_.postBuffer(&buf_[0], bytes_transferred);
|
|
|
+ feed_.poll();
|
|
|
+ // If we haven't yet received the full command, continue receiving.
|
|
|
+ if (feed_.needData()) {
|
|
|
+ doReceive();
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- // Try to interpret it as JSON.
|
|
|
- std::string sbuf(&buf_[0], bytes_transferred);
|
|
|
- cmd = Element::fromJSON(sbuf, true);
|
|
|
-
|
|
|
- response_in_progress_ = true;
|
|
|
+ // Received entire command. Parse the command into JSON.
|
|
|
+ if (feed_.feedOk()) {
|
|
|
+ ConstElementPtr cmd = feed_.toElement();
|
|
|
+ response_in_progress_ = true;
|
|
|
|
|
|
- // If successful, then process it as a command.
|
|
|
- rsp = CommandMgr::instance().processCommand(cmd);
|
|
|
+ // If successful, then process it as a command.
|
|
|
+ rsp = CommandMgr::instance().processCommand(cmd);
|
|
|
|
|
|
- response_in_progress_ = false;
|
|
|
+ response_in_progress_ = false;
|
|
|
|
|
|
+ } else {
|
|
|
+ // Failed to parse command as JSON or process the received command.
|
|
|
+ // This exception will be caught below and the error response will
|
|
|
+ // be sent.
|
|
|
+ isc_throw(BadValue, feed_.getErrorMessage());
|
|
|
+ }
|
|
|
|
|
|
} catch (const Exception& ex) {
|
|
|
LOG_WARN(command_logger, COMMAND_PROCESS_ERROR1).arg(ex.what());
|
|
@@ -203,35 +319,76 @@ Connection::receiveHandler(const boost::system::error_code& ec,
|
|
|
rsp = createAnswer(CONTROL_RESULT_ERROR,
|
|
|
"internal server error: no response generated");
|
|
|
|
|
|
+ } else {
|
|
|
+
|
|
|
+ // Let's convert JSON response to text. Note that at this stage
|
|
|
+ // the rsp pointer is always set.
|
|
|
+ response_ = rsp->str();
|
|
|
+
|
|
|
+ doSend();
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
- // Let's convert JSON response to text. Note that at this stage
|
|
|
- // the rsp pointer is always set.
|
|
|
- std::string txt = rsp->str();
|
|
|
- size_t len = txt.length();
|
|
|
- if (len > 65535) {
|
|
|
- // Hmm, our response is too large. Let's send the first
|
|
|
- // 64KB and hope for the best.
|
|
|
- LOG_ERROR(command_logger, COMMAND_SOCKET_RESPONSE_TOOLARGE).arg(len);
|
|
|
+ // Close the connection if we have sent the entire response.
|
|
|
+ connection_pool_.stop(shared_from_this());
|
|
|
+}
|
|
|
+
|
|
|
+void
|
|
|
+Connection::sendHandler(const boost::system::error_code& ec,
|
|
|
+ size_t bytes_transferred) {
|
|
|
+ if (ec) {
|
|
|
+ // If an error occurred, log this error and stop the connection.
|
|
|
+ if (ec.value() != boost::asio::error::operation_aborted) {
|
|
|
+ LOG_ERROR(command_logger, COMMAND_SOCKET_WRITE_FAIL)
|
|
|
+ .arg(socket_->getNative()).arg(ec.message());
|
|
|
+ }
|
|
|
+
|
|
|
+ } else {
|
|
|
+ // No error. We are in a process of sending a response. Need to
|
|
|
+ // remove the chunk that we have managed to sent with the previous
|
|
|
+ // attempt.
|
|
|
+ response_.erase(0, bytes_transferred);
|
|
|
+
|
|
|
+ LOG_DEBUG(command_logger, DBG_COMMAND, COMMAND_SOCKET_WRITE)
|
|
|
+ .arg(bytes_transferred).arg(response_.size())
|
|
|
+ .arg(socket_->getNative());
|
|
|
|
|
|
- len = 65535;
|
|
|
+ // Check if there is any data left to be sent and sent it.
|
|
|
+ if (!response_.empty()) {
|
|
|
+ doSend();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Gracefully shutdown the connection and close the socket if
|
|
|
+ // we have sent the whole response.
|
|
|
+ terminate();
|
|
|
}
|
|
|
|
|
|
+ // All data sent or an error has occurred. Close the connection.
|
|
|
+ connection_pool_.stop(shared_from_this());
|
|
|
+}
|
|
|
+
|
|
|
+void
|
|
|
+Connection::timeoutHandler() {
|
|
|
+ LOG_INFO(command_logger, COMMAND_SOCKET_CONNECTION_TIMEOUT)
|
|
|
+ .arg(socket_->getNative());
|
|
|
+
|
|
|
try {
|
|
|
- // Send the data back over socket.
|
|
|
- socket_->write(txt.c_str(), len);
|
|
|
+ socket_->cancel();
|
|
|
|
|
|
} catch (const std::exception& ex) {
|
|
|
- // Response transmission failed. Since the response failed, it doesn't
|
|
|
- // make sense to send any status codes. Let's log it and be done with
|
|
|
- // it.
|
|
|
- LOG_ERROR(command_logger, COMMAND_SOCKET_WRITE_FAIL)
|
|
|
- .arg(len).arg(socket_->getNative()).arg(ex.what());
|
|
|
+ LOG_ERROR(command_logger, COMMAND_SOCKET_CONNECTION_CANCEL_FAIL)
|
|
|
+ .arg(socket_->getNative())
|
|
|
+ .arg(ex.what());
|
|
|
}
|
|
|
|
|
|
- connection_pool_.stop(shared_from_this());
|
|
|
+ ConstElementPtr rsp = createAnswer(CONTROL_RESULT_ERROR, "Connection over"
|
|
|
+ " control channel timed out");
|
|
|
+ response_ = rsp->str();
|
|
|
+ doSend();
|
|
|
}
|
|
|
|
|
|
+
|
|
|
}
|
|
|
|
|
|
namespace isc {
|
|
@@ -244,7 +401,7 @@ public:
|
|
|
/// @brief Constructor.
|
|
|
CommandMgrImpl()
|
|
|
: io_service_(), acceptor_(), socket_(), socket_name_(),
|
|
|
- connection_pool_() {
|
|
|
+ connection_pool_(), timeout_(DEFAULT_CONNECTION_TIMEOUT) {
|
|
|
}
|
|
|
|
|
|
/// @brief Opens acceptor service allowing the control clients to connect.
|
|
@@ -274,6 +431,9 @@ public:
|
|
|
|
|
|
/// @brief Pool of connections.
|
|
|
ConnectionPool connection_pool_;
|
|
|
+
|
|
|
+ /// @brief Connection timeout
|
|
|
+ unsigned short timeout_;
|
|
|
};
|
|
|
|
|
|
void
|
|
@@ -333,8 +493,14 @@ CommandMgrImpl::doAccept() {
|
|
|
acceptor_->asyncAccept(*socket_, [this](const boost::system::error_code& ec) {
|
|
|
if (!ec) {
|
|
|
// New connection is arriving. Start asynchronous transmission.
|
|
|
- ConnectionPtr connection(new Connection(socket_, connection_pool_));
|
|
|
+ ConnectionPtr connection(new Connection(io_service_, socket_,
|
|
|
+ connection_pool_,
|
|
|
+ timeout_));
|
|
|
connection_pool_.start(connection);
|
|
|
+
|
|
|
+ } else if (ec.value() != boost::asio::error::operation_aborted) {
|
|
|
+ LOG_ERROR(command_logger, COMMAND_SOCKET_ACCEPT_FAIL)
|
|
|
+ .arg(acceptor_->getNative()).arg(ec.message());
|
|
|
}
|
|
|
|
|
|
// Unless we're stopping the service, start accepting connections again.
|
|
@@ -385,5 +551,11 @@ CommandMgr::setIOService(const IOServicePtr& io_service) {
|
|
|
impl_->io_service_ = io_service;
|
|
|
}
|
|
|
|
|
|
+void
|
|
|
+CommandMgr::setConnectionTimeout(const unsigned short timeout) {
|
|
|
+ impl_->timeout_ = timeout;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
}; // end of isc::config
|
|
|
}; // end of isc
|