io_fetch.cc 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. // Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC")
  2. //
  3. // Permission to use, copy, modify, and/or distribute this software for any
  4. // purpose with or without fee is hereby granted, provided that the above
  5. // copyright notice and this permission notice appear in all copies.
  6. //
  7. // THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
  8. // REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
  9. // AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
  10. // INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
  11. // LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
  12. // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
  13. // PERFORMANCE OF THIS SOFTWARE.
  14. #include <config.h>
  15. #include <unistd.h> // for some IPC/network system calls
  16. #include <sys/socket.h>
  17. #include <netinet/in.h>
  18. #include <boost/bind.hpp>
  19. #include <boost/shared_array.hpp>
  20. #include <boost/shared_ptr.hpp>
  21. #include <boost/date_time/posix_time/posix_time_types.hpp>
  22. #include <dns/message.h>
  23. #include <dns/messagerenderer.h>
  24. #include <dns/opcode.h>
  25. #include <dns/rcode.h>
  26. #include <log/dummylog.h>
  27. #include <log/logger.h>
  28. #include <asio.hpp>
  29. #include <asio/deadline_timer.hpp>
  30. #include <asiolink/asiodef.h>
  31. #include <asiolink/io_address.h>
  32. #include <asiolink/io_asio_socket.h>
  33. #include <asiolink/io_endpoint.h>
  34. #include <asiolink/io_fetch.h>
  35. #include <asiolink/io_service.h>
  36. #include <asiolink/tcp_endpoint.h>
  37. #include <asiolink/tcp_socket.h>
  38. #include <asiolink/udp_endpoint.h>
  39. #include <asiolink/udp_socket.h>
  40. using namespace asio;
  41. using namespace isc::dns;
  42. using namespace isc::log;
  43. using namespace std;
  44. namespace asiolink {
  45. /// Use the ASIO logger
  46. isc::log::Logger logger("asio");
  47. /// \brief IOFetch Data
  48. ///
  49. /// The data for IOFetch is held in a separate struct pointed to by a shared_ptr
  50. /// object. This is because the IOFetch object will be copied often (it is used
  51. /// as a coroutine and passed as callback to many async_*() functions) and we
  52. /// want keep the same data). Organising the data in this way keeps copying to
  53. /// a minimum.
  54. struct IOFetchData {
  55. // The first two members are shared pointers to a base class because what is
  56. // actually instantiated depends on whether the fetch is over UDP or TCP,
  57. // which is not known until construction of the IOFetch. Use of a shared
  58. // pointer here is merely to ensure deletion when the data object is deleted.
  59. boost::shared_ptr<IOAsioSocket<IOFetch> > socket;
  60. ///< Socket to use for I/O
  61. boost::shared_ptr<IOEndpoint> remote; ///< Where the fetch was sent
  62. isc::dns::Question question; ///< Question to be asked
  63. isc::dns::OutputBufferPtr msgbuf; ///< Wire buffer for question
  64. isc::dns::OutputBufferPtr buffer; ///< Received data held here
  65. boost::shared_array<char> data; ///< Temporary array for data
  66. IOFetch::Callback* callback; ///< Called on I/O Completion
  67. size_t cumulative; ///< Cumulative received amount
  68. bool stopped; ///< Have we stopped running?
  69. asio::deadline_timer timer; ///< Timer to measure timeouts
  70. int timeout; ///< Timeout in ms
  71. // In case we need to log an error, the origin of the last asynchronous
  72. // I/O is recorded. To save time and simplify the code, this is recorded
  73. // as the ID of the error message that would be generated if the I/O failed.
  74. // This means that we must make sure that all possible "origins" take the
  75. // same arguments in their message in the same order.
  76. isc::log::MessageID origin; ///< Origin of last asynchronous I/O
  77. /// \brief Constructor
  78. ///
  79. /// Just fills in the data members of the IOFetchData structure
  80. ///
  81. /// \param protocol Either IOFetch::TCP or IOFetch::UDP.
  82. /// \param service I/O Service object to handle the asynchronous
  83. /// operations.
  84. /// \param query DNS question to send to the upstream server.
  85. /// \param address IP address of upstream server
  86. /// \param port Port to use for the query
  87. /// \param buff Output buffer into which the response (in wire format)
  88. /// is written (if a response is received).
  89. /// \param cb Callback object containing the callback to be called
  90. /// when we terminate. The caller is responsible for managing this
  91. /// object and deleting it if necessary.
  92. /// \param wait Timeout for the fetch (in ms).
  93. ///
  94. /// TODO: May need to alter constructor (see comment 4 in Trac ticket #554)
  95. IOFetchData(IOFetch::Protocol protocol, IOService& service,
  96. const isc::dns::Question& query, const IOAddress& address,
  97. uint16_t port, isc::dns::OutputBufferPtr& buff, IOFetch::Callback* cb,
  98. int wait)
  99. :
  100. socket((protocol == IOFetch::UDP) ?
  101. static_cast<IOAsioSocket<IOFetch>*>(
  102. new UDPSocket<IOFetch>(service)) :
  103. static_cast<IOAsioSocket<IOFetch>*>(
  104. new TCPSocket<IOFetch>(service))
  105. ),
  106. remote((protocol == IOFetch::UDP) ?
  107. static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
  108. static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
  109. ),
  110. question(query),
  111. msgbuf(new isc::dns::OutputBuffer(512)),
  112. buffer(buff),
  113. data(new char[IOFetch::MIN_LENGTH]),
  114. callback(cb),
  115. cumulative(0),
  116. stopped(false),
  117. timer(service.get_io_service()),
  118. timeout(wait),
  119. origin(ASIO_UNKORIGIN)
  120. {}
  121. };
  122. /// IOFetch Constructor - just initialize the private data
  123. IOFetch::IOFetch(Protocol protocol, IOService& service,
  124. const isc::dns::Question& question, const IOAddress& address, uint16_t port,
  125. OutputBufferPtr& buff, Callback* cb, int wait)
  126. :
  127. data_(new IOFetchData(protocol, service, question, address,
  128. port, buff, cb, wait))
  129. {
  130. }
  131. /// The function operator is implemented with the "stackless coroutine"
  132. /// pattern; see internal/coroutine.h for details.
  133. void
  134. IOFetch::operator()(asio::error_code ec, size_t length) {
  135. if (data_->stopped) {
  136. return;
  137. } else if (ec) {
  138. logIOFailure(ec);
  139. return;
  140. }
  141. CORO_REENTER (this) {
  142. /// Generate the upstream query and render it to wire format
  143. /// This is done in a different scope to allow inline variable
  144. /// declarations.
  145. {
  146. Message msg(Message::RENDER);
  147. // TODO: replace with boost::random or some other suitable PRNG
  148. msg.setQid(0);
  149. msg.setOpcode(Opcode::QUERY());
  150. msg.setRcode(Rcode::NOERROR());
  151. msg.setHeaderFlag(Message::HEADERFLAG_RD);
  152. msg.addQuestion(data_->question);
  153. MessageRenderer renderer(*data_->msgbuf);
  154. msg.toWire(renderer);
  155. // As this is a new fetch, clear the amount of data received
  156. data_->cumulative = 0;
  157. dlog("Sending " + msg.toText() + " to " +
  158. data_->remote->getAddress().toText());
  159. }
  160. // If we timeout, we stop, which will can cancel outstanding I/Os and
  161. // shutdown everything.
  162. if (data_->timeout != -1) {
  163. data_->timer.expires_from_now(boost::posix_time::milliseconds(
  164. data_->timeout));
  165. data_->timer.async_wait(boost::bind(&IOFetch::stop, *this,
  166. TIME_OUT));
  167. }
  168. // Open a connection to the target system. For speed, if the operation
  169. // is synchronous (i.e. UDP operation) we bypass the yield.
  170. data_->origin = ASIO_OPENSOCK;
  171. if (data_->socket->isOpenSynchronous()) {
  172. data_->socket->open(data_->remote.get(), *this);
  173. } else {
  174. CORO_YIELD data_->socket->open(data_->remote.get(), *this);
  175. }
  176. // Begin an asynchronous send, and then yield. When the send completes,
  177. // we will resume immediately after this point.
  178. data_->origin = ASIO_SENDSOCK;
  179. CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(),
  180. data_->msgbuf->getLength(), data_->remote.get(), *this);
  181. // Now receive the response. Since TCP may not receive the entire
  182. // message in one operation, we need to loop until we have received
  183. // it. (This can't be done within the asyncReceive() method because
  184. // each I/O operation will be done asynchronously and between each one
  185. // we need to yield ... and we *really* don't want to set up another
  186. // coroutine within that method.) So after each receive (and yield),
  187. // we check if the operation is complete and if not, loop to read again.
  188. data_->origin = ASIO_RECVSOCK;
  189. do {
  190. CORO_YIELD data_->socket->asyncReceive(data_->data.get(),
  191. static_cast<size_t>(MIN_LENGTH), data_->cumulative,
  192. data_->remote.get(), *this);
  193. data_->cumulative += length;
  194. } while (!data_->socket->receiveComplete(data_->data.get(),
  195. data_->cumulative));
  196. /// Copy the answer into the response buffer. (TODO: If the
  197. /// OutputBuffer object were made to meet the requirements of a
  198. /// MutableBufferSequence, then it could be written to directly by
  199. /// async_receive_from() and this additional copy step would be
  200. /// unnecessary.)
  201. data_->buffer->writeData(data_->data.get(), length);
  202. // Finished with this socket, so close it. This will not generate an
  203. // I/O error, but reset the origin to unknown in case we change this.
  204. data_->origin = ASIO_UNKORIGIN;
  205. data_->socket->close();
  206. /// We are done
  207. stop(SUCCESS);
  208. }
  209. }
  210. // Function that stops the coroutine sequence. It is called either when the
  211. // query finishes or when the timer times out. Either way, it sets the
  212. // "stopped_" flag and cancels anything that is in progress.
  213. //
  214. // As the function may be entered multiple times as things wind down, it checks
  215. // if the stopped_ flag is already set. If it is, the call is a no-op.
  216. void
  217. IOFetch::stop(Result result) {
  218. if (!data_->stopped) {
  219. // Mark the fetch as stopped to prevent other completion callbacks
  220. // (invoked because of the calls to cancel()) from executing the
  221. // cancel calls again.
  222. //
  223. // In a single threaded environment, the callbacks won't be invoked
  224. // until this one completes. In a multi-threaded environment, they may
  225. // well be, in which case the testing (and setting) of the stopped_
  226. // variable should be done inside a mutex (and the stopped_ variable
  227. // declared as "volatile").
  228. //
  229. // The numeric arguments indicate the debug level, with the lower
  230. // numbers indicating the most important information. The relative
  231. // values are somewhat arbitrary.
  232. //
  233. // Although Logger::debug checks the debug flag internally, doing it
  234. // below before calling Logger::debug avoids the overhead of a string
  235. // conversion in the common case when debug is not enabled.
  236. //
  237. // TODO: Update testing of stopped_ if threads are used.
  238. data_->stopped = true;
  239. switch (result) {
  240. case TIME_OUT:
  241. if (logger.isDebugEnabled(1)) {
  242. logger.debug(20, ASIO_RECVTMO,
  243. data_->remote->getAddress().toText().c_str(),
  244. static_cast<int>(data_->remote->getPort()));
  245. }
  246. break;
  247. case SUCCESS:
  248. if (logger.isDebugEnabled(50)) {
  249. logger.debug(30, ASIO_FETCHCOMP,
  250. data_->remote->getAddress().toText().c_str(),
  251. static_cast<int>(data_->remote->getPort()));
  252. }
  253. break;
  254. case STOPPED:
  255. // Fetch has been stopped for some other reason. This is
  256. // allowed but as it is unusual it is logged, but with a lower
  257. // debug level than a timeout (which is totally normal).
  258. logger.debug(1, ASIO_FETCHSTOP,
  259. data_->remote->getAddress().toText().c_str(),
  260. static_cast<int>(data_->remote->getPort()));
  261. break;
  262. default:
  263. logger.error(ASIO_UNKRESULT, static_cast<int>(result),
  264. data_->remote->getAddress().toText().c_str(),
  265. static_cast<int>(data_->remote->getPort()));
  266. }
  267. // Stop requested, cancel and I/O's on the socket and shut it down,
  268. // and cancel the timer.
  269. data_->socket->cancel();
  270. data_->socket->close();
  271. data_->timer.cancel();
  272. // Execute the I/O completion callback (if present).
  273. if (data_->callback) {
  274. (*(data_->callback))(result);
  275. }
  276. }
  277. }
  278. // Log an error - called on I/O failure
  279. void IOFetch::logIOFailure(asio::error_code ec) {
  280. // Should only get here with a known error code.
  281. assert((data_->origin == ASIO_OPENSOCK) ||
  282. (data_->origin == ASIO_SENDSOCK) ||
  283. (data_->origin == ASIO_RECVSOCK) ||
  284. (data_->origin == ASIO_UNKORIGIN));
  285. static const char* PROTOCOL[2] = {"TCP", "UDP"};
  286. logger.error(data_->origin,
  287. ec.value(),
  288. ((data_->remote->getProtocol() == IPPROTO_TCP) ?
  289. PROTOCOL[0] : PROTOCOL[1]),
  290. data_->remote->getAddress().toText().c_str(),
  291. static_cast<int>(data_->remote->getPort()));
  292. }
  293. } // namespace asiolink