io_fetch.cc 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. // Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC")
  2. //
  3. // Permission to use, copy, modify, and/or distribute this software for any
  4. // purpose with or without fee is hereby granted, provided that the above
  5. // copyright notice and this permission notice appear in all copies.
  6. //
  7. // THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
  8. // REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
  9. // AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
  10. // INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
  11. // LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
  12. // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
  13. // PERFORMANCE OF THIS SOFTWARE.
  14. #include <config.h>
  15. #include <unistd.h> // for some IPC/network system calls
  16. #include <sys/socket.h>
  17. #include <netinet/in.h>
  18. #include <boost/bind.hpp>
  19. #include <boost/scoped_ptr.hpp>
  20. #include <boost/date_time/posix_time/posix_time_types.hpp>
  21. #include <dns/message.h>
  22. #include <dns/messagerenderer.h>
  23. #include <dns/opcode.h>
  24. #include <dns/rcode.h>
  25. #include <log/logger.h>
  26. #include <asiolink/qid_gen.h>
  27. #include <asio.hpp>
  28. #include <asio/deadline_timer.hpp>
  29. #include <asiolink/asiodef.h>
  30. #include <asiolink/io_address.h>
  31. #include <asiolink/io_asio_socket.h>
  32. #include <asiolink/io_endpoint.h>
  33. #include <asiolink/io_fetch.h>
  34. #include <asiolink/io_service.h>
  35. #include <asiolink/tcp_endpoint.h>
  36. #include <asiolink/tcp_socket.h>
  37. #include <asiolink/udp_endpoint.h>
  38. #include <asiolink/udp_socket.h>
  39. using namespace asio;
  40. using namespace isc::dns;
  41. using namespace isc::log;
  42. using namespace std;
  43. namespace asiolink {
  44. /// Use the ASIO logger
  45. isc::log::Logger logger("asiolink");
  46. /// \brief IOFetch Data
  47. ///
  48. /// The data for IOFetch is held in a separate struct pointed to by a shared_ptr
  49. /// object. This is because the IOFetch object will be copied often (it is used
  50. /// as a coroutine and passed as callback to many async_*() functions) and we
  51. /// want keep the same data). Organising the data in this way keeps copying to
  52. /// a minimum.
  53. struct IOFetchData {
  54. // The first two members are shared pointers to a base class because what is
  55. // actually instantiated depends on whether the fetch is over UDP or TCP,
  56. // which is not known until construction of the IOFetch. Use of a shared
  57. // pointer here is merely to ensure deletion when the data object is deleted.
  58. boost::scoped_ptr<IOAsioSocket<IOFetch> > socket;
  59. ///< Socket to use for I/O
  60. boost::scoped_ptr<IOEndpoint> remote; ///< Where the fetch was sent
  61. isc::dns::Question question; ///< Question to be asked
  62. isc::dns::OutputBufferPtr msgbuf; ///< Wire buffer for question
  63. isc::dns::OutputBufferPtr received; ///< Received data put here
  64. IOFetch::Callback* callback; ///< Called on I/O Completion
  65. asio::deadline_timer timer; ///< Timer to measure timeouts
  66. IOFetch::Protocol protocol; ///< Protocol being used
  67. size_t cumulative; ///< Cumulative received amount
  68. size_t expected; ///< Expected amount of data
  69. size_t offset; ///< Offset to receive data
  70. bool stopped; ///< Have we stopped running?
  71. int timeout; ///< Timeout in ms
  72. // In case we need to log an error, the origin of the last asynchronous
  73. // I/O is recorded. To save time and simplify the code, this is recorded
  74. // as the ID of the error message that would be generated if the I/O failed.
  75. // This means that we must make sure that all possible "origins" take the
  76. // same arguments in their message in the same order.
  77. isc::log::MessageID origin; ///< Origin of last asynchronous I/O
  78. uint8_t staging[IOFetch::STAGING_LENGTH];
  79. ///< Temporary array for received data
  80. /// \brief Constructor
  81. ///
  82. /// Just fills in the data members of the IOFetchData structure
  83. ///
  84. /// \param proto Either IOFetch::TCP or IOFetch::UDP.
  85. /// \param service I/O Service object to handle the asynchronous
  86. /// operations.
  87. /// \param query DNS question to send to the upstream server.
  88. /// \param address IP address of upstream server
  89. /// \param port Port to use for the query
  90. /// \param buff Output buffer into which the response (in wire format)
  91. /// is written (if a response is received).
  92. /// \param cb Callback object containing the callback to be called
  93. /// when we terminate. The caller is responsible for managing this
  94. /// object and deleting it if necessary.
  95. /// \param wait Timeout for the fetch (in ms).
  96. ///
  97. /// TODO: May need to alter constructor (see comment 4 in Trac ticket #554)
  98. IOFetchData(IOFetch::Protocol proto, IOService& service,
  99. const isc::dns::Question& query, const IOAddress& address,
  100. uint16_t port, isc::dns::OutputBufferPtr& buff, IOFetch::Callback* cb,
  101. int wait)
  102. :
  103. socket((proto == IOFetch::UDP) ?
  104. static_cast<IOAsioSocket<IOFetch>*>(
  105. new UDPSocket<IOFetch>(service)) :
  106. static_cast<IOAsioSocket<IOFetch>*>(
  107. new TCPSocket<IOFetch>(service))
  108. ),
  109. remote((proto == IOFetch::UDP) ?
  110. static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
  111. static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
  112. ),
  113. question(query),
  114. msgbuf(new isc::dns::OutputBuffer(512)),
  115. received(buff),
  116. callback(cb),
  117. timer(service.get_io_service()),
  118. protocol(proto),
  119. cumulative(0),
  120. expected(0),
  121. offset(0),
  122. stopped(false),
  123. timeout(wait),
  124. origin(ASIO_UNKORIGIN),
  125. staging()
  126. {}
  127. };
  128. /// IOFetch Constructor - just initialize the private data
  129. IOFetch::IOFetch(Protocol protocol, IOService& service,
  130. const isc::dns::Question& question, const IOAddress& address, uint16_t port,
  131. OutputBufferPtr& buff, Callback* cb, int wait)
  132. :
  133. data_(new IOFetchData(protocol, service, question, address,
  134. port, buff, cb, wait))
  135. {
  136. }
  137. // Return protocol in use.
  138. IOFetch::Protocol
  139. IOFetch::getProtocol() const {
  140. return (data_->protocol);
  141. }
  142. /// The function operator is implemented with the "stackless coroutine"
  143. /// pattern; see internal/coroutine.h for details.
  144. void
  145. IOFetch::operator()(asio::error_code ec, size_t length) {
  146. if (data_->stopped) {
  147. return;
  148. } else if (ec) {
  149. logIOFailure(ec);
  150. return;
  151. }
  152. CORO_REENTER (this) {
  153. /// Generate the upstream query and render it to wire format
  154. /// This is done in a different scope to allow inline variable
  155. /// declarations.
  156. {
  157. Message msg(Message::RENDER);
  158. msg.setQid(QidGenerator::getInstance().generateQid());
  159. msg.setOpcode(Opcode::QUERY());
  160. msg.setRcode(Rcode::NOERROR());
  161. msg.setHeaderFlag(Message::HEADERFLAG_RD);
  162. msg.addQuestion(data_->question);
  163. MessageRenderer renderer(*data_->msgbuf);
  164. msg.toWire(renderer);
  165. }
  166. // If we timeout, we stop, which will can cancel outstanding I/Os and
  167. // shutdown everything.
  168. if (data_->timeout != -1) {
  169. data_->timer.expires_from_now(boost::posix_time::milliseconds(
  170. data_->timeout));
  171. data_->timer.async_wait(boost::bind(&IOFetch::stop, *this,
  172. TIME_OUT));
  173. }
  174. // Open a connection to the target system. For speed, if the operation
  175. // is synchronous (i.e. UDP operation) we bypass the yield.
  176. data_->origin = ASIO_OPENSOCK;
  177. if (data_->socket->isOpenSynchronous()) {
  178. data_->socket->open(data_->remote.get(), *this);
  179. } else {
  180. CORO_YIELD data_->socket->open(data_->remote.get(), *this);
  181. }
  182. // Begin an asynchronous send, and then yield. When the send completes,
  183. // we will resume immediately after this point.
  184. data_->origin = ASIO_SENDSOCK;
  185. CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(),
  186. data_->msgbuf->getLength(), data_->remote.get(), *this);
  187. // Now receive the response. Since TCP may not receive the entire
  188. // message in one operation, we need to loop until we have received
  189. // it. (This can't be done within the asyncReceive() method because
  190. // each I/O operation will be done asynchronously and between each one
  191. // we need to yield ... and we *really* don't want to set up another
  192. // coroutine within that method.) So after each receive (and yield),
  193. // we check if the operation is complete and if not, loop to read again.
  194. //
  195. // Another concession to TCP is that the amount of is contained in the
  196. // first two bytes. This leads to two problems:
  197. //
  198. // a) We don't want those bytes in the return buffer.
  199. // b) They may not both arrive in the first I/O.
  200. //
  201. // So... we need to loop until we have at least two bytes, then store
  202. // the expected amount of data. Then we need to loop until we have
  203. // received all the data before copying it back to the user's buffer.
  204. // And we want to minimise the amount of copying...
  205. data_->origin = ASIO_RECVSOCK;
  206. data_->cumulative = 0; // No data yet received
  207. data_->offset = 0; // First data into start of buffer
  208. do {
  209. CORO_YIELD data_->socket->asyncReceive(data_->staging,
  210. static_cast<size_t>(STAGING_LENGTH),
  211. data_->offset,
  212. data_->remote.get(), *this);
  213. } while (!data_->socket->processReceivedData(data_->staging, length,
  214. data_->cumulative, data_->offset,
  215. data_->expected, data_->received));
  216. // Finished with this socket, so close it. This will not generate an
  217. // I/O error, but reset the origin to unknown in case we change this.
  218. data_->origin = ASIO_UNKORIGIN;
  219. data_->socket->close();
  220. /// We are done
  221. stop(SUCCESS);
  222. }
  223. }
  224. // Function that stops the coroutine sequence. It is called either when the
  225. // query finishes or when the timer times out. Either way, it sets the
  226. // "stopped_" flag and cancels anything that is in progress.
  227. //
  228. // As the function may be entered multiple times as things wind down, it checks
  229. // if the stopped_ flag is already set. If it is, the call is a no-op.
  230. void
  231. IOFetch::stop(Result result) {
  232. if (!data_->stopped) {
  233. // Mark the fetch as stopped to prevent other completion callbacks
  234. // (invoked because of the calls to cancel()) from executing the
  235. // cancel calls again.
  236. //
  237. // In a single threaded environment, the callbacks won't be invoked
  238. // until this one completes. In a multi-threaded environment, they may
  239. // well be, in which case the testing (and setting) of the stopped_
  240. // variable should be done inside a mutex (and the stopped_ variable
  241. // declared as "volatile").
  242. //
  243. // The numeric arguments indicate the debug level, with the lower
  244. // numbers indicating the most important information. The relative
  245. // values are somewhat arbitrary.
  246. //
  247. // Although Logger::debug checks the debug flag internally, doing it
  248. // below before calling Logger::debug avoids the overhead of a string
  249. // conversion in the common case when debug is not enabled.
  250. //
  251. // TODO: Update testing of stopped_ if threads are used.
  252. data_->stopped = true;
  253. switch (result) {
  254. case TIME_OUT:
  255. if (logger.isDebugEnabled(1)) {
  256. logger.debug(20, ASIO_RECVTMO,
  257. data_->remote->getAddress().toText().c_str(),
  258. static_cast<int>(data_->remote->getPort()));
  259. }
  260. break;
  261. case SUCCESS:
  262. if (logger.isDebugEnabled(50)) {
  263. logger.debug(30, ASIO_FETCHCOMP,
  264. data_->remote->getAddress().toText().c_str(),
  265. static_cast<int>(data_->remote->getPort()));
  266. }
  267. break;
  268. case STOPPED:
  269. // Fetch has been stopped for some other reason. This is
  270. // allowed but as it is unusual it is logged, but with a lower
  271. // debug level than a timeout (which is totally normal).
  272. logger.debug(1, ASIO_FETCHSTOP,
  273. data_->remote->getAddress().toText().c_str(),
  274. static_cast<int>(data_->remote->getPort()));
  275. break;
  276. default:
  277. logger.error(ASIO_UNKRESULT, static_cast<int>(result),
  278. data_->remote->getAddress().toText().c_str(),
  279. static_cast<int>(data_->remote->getPort()));
  280. }
  281. // Stop requested, cancel and I/O's on the socket and shut it down,
  282. // and cancel the timer.
  283. data_->socket->cancel();
  284. data_->socket->close();
  285. data_->timer.cancel();
  286. // Execute the I/O completion callback (if present).
  287. if (data_->callback) {
  288. (*(data_->callback))(result);
  289. }
  290. }
  291. }
  292. // Log an error - called on I/O failure
  293. void IOFetch::logIOFailure(asio::error_code ec) {
  294. // Should only get here with a known error code.
  295. assert((data_->origin == ASIO_OPENSOCK) ||
  296. (data_->origin == ASIO_SENDSOCK) ||
  297. (data_->origin == ASIO_RECVSOCK) ||
  298. (data_->origin == ASIO_UNKORIGIN));
  299. static const char* PROTOCOL[2] = {"TCP", "UDP"};
  300. logger.error(data_->origin,
  301. ec.value(),
  302. ((data_->remote->getProtocol() == IPPROTO_TCP) ?
  303. PROTOCOL[0] : PROTOCOL[1]),
  304. data_->remote->getAddress().toText().c_str(),
  305. static_cast<int>(data_->remote->getPort()));
  306. }
  307. } // namespace asiolink