client_connection.cc 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. // Copyright (C) 2017 Internet Systems Consortium, Inc. ("ISC")
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this
  5. // file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6. #include <asiolink/asio_wrapper.h>
  7. #include <asiolink/interval_timer.h>
  8. #include <asiolink/unix_domain_socket.h>
  9. #include <cc/json_feed.h>
  10. #include <config/client_connection.h>
  11. #include <boost/bind.hpp>
  12. #include <boost/enable_shared_from_this.hpp>
  13. #include <array>
  14. using namespace isc::asiolink;
  15. namespace isc {
  16. namespace config {
  17. /// @brief Implementation of the @ref ClientConnection.
  18. class ClientConnectionImpl : public boost::enable_shared_from_this<ClientConnectionImpl> {
  19. public:
  20. /// @brief Constructor.
  21. ///
  22. /// @param io_service Reference to the IO service.
  23. explicit ClientConnectionImpl(IOService& io_service);
  24. /// @brief Starts asynchronous transaction with a remote endpoint.
  25. ///
  26. /// See @ref ClientConnection::start documentation for the details.
  27. ///
  28. /// @param socket_path Path to the socket description that the server
  29. /// is bound to.
  30. /// @param command Control command to be sent to the server.
  31. /// @param handler Pointer to the user suppiled callback function which
  32. /// should be invoked when transaction completes or when an error has
  33. /// occurred during the transaction.
  34. /// @param timeout Connection timeout in milliseconds.
  35. void start(const ClientConnection::SocketPath& socket_path,
  36. const ClientConnection::ControlCommand& command,
  37. const ClientConnection::Handler& handler,
  38. const ClientConnection::Timeout& timeout);
  39. /// @brief Closes the socket.
  40. void stop();
  41. /// @brief Starts asynchronous send.
  42. ///
  43. /// This method may be called multiple times internally when the command
  44. /// is large and can't be sent all at once.
  45. ///
  46. /// @param buffer Pointer to the buffer holding input data.
  47. /// @param length Length of the data in the input buffer.
  48. /// @param handler User supplied callback.
  49. void doSend(const void* buffer, const size_t length,
  50. const ClientConnection::Handler& handler);
  51. /// @brief Starts asynchronous receive from the server.
  52. ///
  53. /// This method may be called multiple times internally if the response
  54. /// is large. The @ref JSONFeed instance is used to detect the boundaries
  55. /// of the command within the stream. Once the entire command has been
  56. /// received the user callback is invoked and the instance of the
  57. /// @ref JSONFeed is returned.
  58. ///
  59. /// @param handler User supplied callback.
  60. void doReceive(const ClientConnection::Handler& handler);
  61. /// @brief Terminates the connection and invokes a user callback indicating
  62. /// an error.
  63. ///
  64. /// @param ec Error code.
  65. /// @param handler User callback.
  66. void terminate(const boost::system::error_code& ec,
  67. const ClientConnection::Handler& handler);
  68. /// @brief Callback invoked when the timeout occurs.
  69. ///
  70. /// It calls @ref terminate with the @c boost::asio::error::timed_out.
  71. void timeoutCallback(const ClientConnection::Handler& handler);
  72. private:
  73. /// @brief Unix domain socket used for communication with a server.
  74. UnixDomainSocket socket_;
  75. /// @brief Pointer to the @ref JSONFeed holding a response.
  76. ///
  77. ///It may be a null pointer until some part of a response has been received.
  78. JSONFeedPtr feed_;
  79. /// @brief Holds the entire command being transmitted over the unix
  80. /// socket.
  81. std::string current_command_;
  82. /// @brief Buffer into which chunks of the response are received.
  83. std::array<char, 1024> read_buf_;
  84. /// @brief Instance of the interval timer protecting against timeouts.
  85. IntervalTimer timer_;
  86. };
  87. ClientConnectionImpl::ClientConnectionImpl(IOService& io_service)
  88. : socket_(io_service), feed_(), current_command_(), timer_(io_service) {
  89. }
  90. void
  91. ClientConnectionImpl::start(const ClientConnection::SocketPath& socket_path,
  92. const ClientConnection::ControlCommand& command,
  93. const ClientConnection::Handler& handler,
  94. const ClientConnection::Timeout& timeout) {
  95. // Start the timer protecting against timeouts.
  96. timer_.setup(boost::bind(&ClientConnectionImpl::timeoutCallback,
  97. shared_from_this(), handler),
  98. timeout.timeout_, IntervalTimer::ONE_SHOT);
  99. // Store the command in the class member to make sure it is valid
  100. // the entire time.
  101. current_command_.assign(command.control_command_);
  102. // Pass self to lambda to make sure that the instance of this class
  103. // lives as long as the lambda is held for async connect.
  104. auto self(shared_from_this());
  105. // Start asynchronous connect. This will return immediatelly.
  106. socket_.asyncConnect(socket_path.socket_path_,
  107. [this, self, command, handler](const boost::system::error_code& ec) {
  108. // We failed to connect so we can't proceed. Simply clean up
  109. // and invoke the user callback to signal an error.
  110. if (ec) {
  111. // This doesn't throw.
  112. terminate(ec, handler);
  113. } else {
  114. // Connection successful. Transmit the command to the remote
  115. // endpoint asynchronously.
  116. doSend(current_command_.c_str(), current_command_.length(),
  117. handler);
  118. }
  119. });
  120. }
  121. void
  122. ClientConnectionImpl::stop() {
  123. try {
  124. socket_.close();
  125. } catch (...) {
  126. // Suppress errors related to closing a socket. We can't really help
  127. // if an error occurred.
  128. }
  129. }
  130. void
  131. ClientConnectionImpl::doSend(const void* buffer, const size_t length,
  132. const ClientConnection::Handler& handler) {
  133. // Pass self to lambda to make sure that the instance of this class
  134. // lives as long as the lambda is held for async send.
  135. auto self(shared_from_this());
  136. // Start asynchronous transmission of the command. This will return
  137. // immediatelly.
  138. socket_.asyncSend(buffer, length,
  139. [this, self, buffer, length, handler]
  140. (const boost::system::error_code& ec, size_t bytes_transferred) {
  141. // An error has occurred while sending. Close the connection and
  142. // signal an error.
  143. if (ec) {
  144. // This doesn't throw.
  145. terminate(ec, handler);
  146. } else {
  147. // If the number of bytes we have managed to send so far is
  148. // lower than the amount of data we're trying to send, we
  149. // have to schedule another send to deliver the rest of
  150. // the data.
  151. if (bytes_transferred < length) {
  152. doSend(static_cast<const char*>(buffer) + bytes_transferred,
  153. length - bytes_transferred, handler);
  154. } else {
  155. // We have sent all the data. Start receiving a response.
  156. doReceive(handler);
  157. }
  158. }
  159. });
  160. }
  161. void
  162. ClientConnectionImpl::doReceive(const ClientConnection::Handler& handler) {
  163. // Pass self to lambda to make sure that the instance of this class
  164. // lives as long as the lambda is held for async receive.
  165. auto self(shared_from_this());
  166. socket_.asyncReceive(&read_buf_[0], read_buf_.size(),
  167. [this, self, handler]
  168. (const boost::system::error_code& ec, size_t length) {
  169. // An error has occurred while receiving the data. Close the connection
  170. // and signal an error.
  171. if (ec) {
  172. // This doesn't throw.
  173. terminate(ec, handler);
  174. } else {
  175. // Lazy initialization of the JSONFeed. The feed will be "parsing"
  176. // received JSON stream and will detect when the whole response
  177. // has been received.
  178. if (!feed_) {
  179. feed_.reset(new JSONFeed());
  180. feed_->initModel();
  181. }
  182. // Put everything we have received so far into the feed and process
  183. // the data.
  184. feed_->postBuffer(&read_buf_[0], length);
  185. feed_->poll();
  186. // If the feed indicates that only a part of the response has been
  187. // received, schedule another receive to get more data.
  188. if (feed_->needData()) {
  189. doReceive(handler);
  190. } else {
  191. // We have received the entire response, let's call the handler
  192. // and indicate success.
  193. terminate(ec, handler);
  194. }
  195. }
  196. });
  197. }
  198. void
  199. ClientConnectionImpl::terminate(const boost::system::error_code& ec,
  200. const ClientConnection::Handler& handler) {
  201. try {
  202. stop();
  203. current_command_.clear();
  204. handler(ec, feed_);
  205. } catch (...) {
  206. // None of these operations should throw. In particular, the handler
  207. // should not throw but if it has been misimplemented, we want to make
  208. // sure we don't emit any exceptions from here.
  209. }
  210. }
  211. void
  212. ClientConnectionImpl::timeoutCallback(const ClientConnection::Handler& handler) {
  213. // Timeout has occurred. The remote server didn't provide the entire
  214. // response within the given time frame. Let's close the connection
  215. // and signal the timeout.
  216. terminate(boost::asio::error::timed_out, handler);
  217. }
  218. ClientConnection::ClientConnection(asiolink::IOService& io_service)
  219. : impl_(new ClientConnectionImpl(io_service)) {
  220. }
  221. ClientConnection::~ClientConnection() {
  222. stop();
  223. }
  224. void
  225. ClientConnection::start(const ClientConnection::SocketPath& socket_path,
  226. const ClientConnection::ControlCommand& command,
  227. const Handler& handler,
  228. const ClientConnection::Timeout& timeout) {
  229. impl_->start(socket_path, command, handler, timeout);
  230. }
  231. void
  232. ClientConnection::stop() {
  233. impl_->stop();
  234. }
  235. } // end of namespace config
  236. } // end of namespace isc