io_fetch.cc 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. // Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC")
  2. //
  3. // Permission to use, copy, modify, and/or distribute this software for any
  4. // purpose with or without fee is hereby granted, provided that the above
  5. // copyright notice and this permission notice appear in all copies.
  6. //
  7. // THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
  8. // REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
  9. // AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
  10. // INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
  11. // LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
  12. // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
  13. // PERFORMANCE OF THIS SOFTWARE.
  14. #include <config.h>
  15. #include <netinet/in.h>
  16. #include <stdint.h>
  17. #include <sys/socket.h>
  18. #include <unistd.h> // for some IPC/network system calls
  19. #include <boost/bind.hpp>
  20. #include <boost/scoped_ptr.hpp>
  21. #include <boost/date_time/posix_time/posix_time_types.hpp>
  22. #include <asio.hpp>
  23. #include <asio/deadline_timer.hpp>
  24. #include <asiolink/io_address.h>
  25. #include <asiolink/io_asio_socket.h>
  26. #include <asiolink/io_endpoint.h>
  27. #include <asiolink/io_service.h>
  28. #include <asiolink/tcp_endpoint.h>
  29. #include <asiolink/tcp_socket.h>
  30. #include <asiolink/udp_endpoint.h>
  31. #include <asiolink/udp_socket.h>
  32. #include <dns/messagerenderer.h>
  33. #include <dns/opcode.h>
  34. #include <dns/rcode.h>
  35. #include <log/logger.h>
  36. #include <log/macros.h>
  37. #include <asiodns/asiodns_messages.h>
  38. #include <asiodns/io_fetch.h>
  39. #include <util/buffer.h>
  40. #include <util/random/qid_gen.h>
  41. using namespace asio;
  42. using namespace isc::asiolink;
  43. using namespace isc::dns;
  44. using namespace isc::util;
  45. using namespace isc::util::random;
  46. using namespace isc::log;
  47. using namespace std;
  48. namespace isc {
  49. namespace asiodns {
  50. /// Use the ASIO logger
  51. namespace {
  52. isc::log::Logger logger("asiolink");
  53. // Log debug verbosity
  54. enum {
  55. DBG_IMPORTANT = 1,
  56. DBG_COMMON = 20,
  57. DBG_ALL = 50
  58. };
  59. }
  60. /// \brief IOFetch Data
  61. ///
  62. /// The data for IOFetch is held in a separate struct pointed to by a shared_ptr
  63. /// object. This is because the IOFetch object will be copied often (it is used
  64. /// as a coroutine and passed as callback to many async_*() functions) and we
  65. /// want keep the same data). Organising the data in this way keeps copying to
  66. /// a minimum.
  67. struct IOFetchData {
  68. // The first two members are shared pointers to a base class because what is
  69. // actually instantiated depends on whether the fetch is over UDP or TCP,
  70. // which is not known until construction of the IOFetch. Use of a shared
  71. // pointer here is merely to ensure deletion when the data object is deleted.
  72. boost::scoped_ptr<IOAsioSocket<IOFetch> > socket;
  73. ///< Socket to use for I/O
  74. boost::scoped_ptr<IOEndpoint> remote_snd;///< Where the fetch is sent
  75. boost::scoped_ptr<IOEndpoint> remote_rcv;///< Where the response came from
  76. OutputBufferPtr msgbuf; ///< Wire buffer for question
  77. OutputBufferPtr received; ///< Received data put here
  78. IOFetch::Callback* callback; ///< Called on I/O Completion
  79. asio::deadline_timer timer; ///< Timer to measure timeouts
  80. IOFetch::Protocol protocol; ///< Protocol being used
  81. size_t cumulative; ///< Cumulative received amount
  82. size_t expected; ///< Expected amount of data
  83. size_t offset; ///< Offset to receive data
  84. bool stopped; ///< Have we stopped running?
  85. int timeout; ///< Timeout in ms
  86. bool packet; ///< true if packet was supplied
  87. // In case we need to log an error, the origin of the last asynchronous
  88. // I/O is recorded. To save time and simplify the code, this is recorded
  89. // as the ID of the error message that would be generated if the I/O failed.
  90. // This means that we must make sure that all possible "origins" take the
  91. // same arguments in their message in the same order.
  92. isc::log::MessageID origin; ///< Origin of last asynchronous I/O
  93. uint8_t staging[IOFetch::STAGING_LENGTH];
  94. ///< Temporary array for received data
  95. isc::dns::qid_t qid; ///< The QID set in the query
  96. /// \brief Constructor
  97. ///
  98. /// Just fills in the data members of the IOFetchData structure
  99. ///
  100. /// \param proto Either IOFetch::TCP or IOFetch::UDP.
  101. /// \param service I/O Service object to handle the asynchronous
  102. /// operations.
  103. /// \param address IP address of upstream server
  104. /// \param port Port to use for the query
  105. /// \param buff Output buffer into which the response (in wire format)
  106. /// is written (if a response is received).
  107. /// \param cb Callback object containing the callback to be called
  108. /// when we terminate. The caller is responsible for managing this
  109. /// object and deleting it if necessary.
  110. /// \param wait Timeout for the fetch (in ms).
  111. ///
  112. /// TODO: May need to alter constructor (see comment 4 in Trac ticket #554)
  113. IOFetchData(IOFetch::Protocol proto, IOService& service,
  114. const IOAddress& address, uint16_t port, OutputBufferPtr& buff,
  115. IOFetch::Callback* cb, int wait)
  116. :
  117. socket((proto == IOFetch::UDP) ?
  118. static_cast<IOAsioSocket<IOFetch>*>(
  119. new UDPSocket<IOFetch>(service)) :
  120. static_cast<IOAsioSocket<IOFetch>*>(
  121. new TCPSocket<IOFetch>(service))
  122. ),
  123. remote_snd((proto == IOFetch::UDP) ?
  124. static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
  125. static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
  126. ),
  127. remote_rcv((proto == IOFetch::UDP) ?
  128. static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
  129. static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
  130. ),
  131. msgbuf(new OutputBuffer(512)),
  132. received(buff),
  133. callback(cb),
  134. timer(service.get_io_service()),
  135. protocol(proto),
  136. cumulative(0),
  137. expected(0),
  138. offset(0),
  139. stopped(false),
  140. timeout(wait),
  141. packet(false),
  142. origin(ASIODNS_UNKNOWN_ORIGIN),
  143. staging(),
  144. qid(QidGenerator::getInstance().generateQid())
  145. {}
  146. // Checks if the response we received was ok;
  147. // - data contains the buffer we read, as well as the address
  148. // we sent to and the address we received from.
  149. // length is provided by the operator() in IOFetch.
  150. // Addresses must match, number of octets read must be at least
  151. // 2, and the first two octets must match the qid of the message
  152. // we sent.
  153. bool responseOK() {
  154. return (*remote_snd == *remote_rcv && cumulative >= 2 &&
  155. readUint16(received->getData()) == qid);
  156. }
  157. };
  158. /// IOFetch Constructor - just initialize the private data
  159. IOFetch::IOFetch(Protocol protocol, IOService& service,
  160. const isc::dns::Question& question, const IOAddress& address, uint16_t port,
  161. OutputBufferPtr& buff, Callback* cb, int wait)
  162. {
  163. MessagePtr query_msg(new Message(Message::RENDER));
  164. initIOFetch(query_msg, protocol, service, question, address, port, buff,
  165. cb, wait);
  166. }
  167. IOFetch::IOFetch(Protocol protocol, IOService& service,
  168. OutputBufferPtr& outpkt, const IOAddress& address, uint16_t port,
  169. OutputBufferPtr& buff, Callback* cb, int wait)
  170. :
  171. data_(new IOFetchData(protocol, service,
  172. address, port, buff, cb, wait))
  173. {
  174. data_->msgbuf = outpkt;
  175. data_->packet = true;
  176. }
  177. IOFetch::IOFetch(Protocol protocol, IOService& service,
  178. ConstMessagePtr query_message, const IOAddress& address, uint16_t port,
  179. OutputBufferPtr& buff, Callback* cb, int wait)
  180. {
  181. MessagePtr msg(new Message(Message::RENDER));
  182. msg->setHeaderFlag(Message::HEADERFLAG_RD,
  183. query_message->getHeaderFlag(Message::HEADERFLAG_RD));
  184. msg->setHeaderFlag(Message::HEADERFLAG_CD,
  185. query_message->getHeaderFlag(Message::HEADERFLAG_CD));
  186. initIOFetch(msg, protocol, service,
  187. **(query_message->beginQuestion()),
  188. address, port, buff, cb, wait);
  189. }
  190. void
  191. IOFetch::initIOFetch(MessagePtr& query_msg, Protocol protocol, IOService& service,
  192. const isc::dns::Question& question,
  193. const IOAddress& address, uint16_t port,
  194. OutputBufferPtr& buff, Callback* cb, int wait)
  195. {
  196. data_ = boost::shared_ptr<IOFetchData>(new IOFetchData(
  197. protocol, service, address, port, buff, cb, wait));
  198. query_msg->setQid(data_->qid);
  199. query_msg->setOpcode(Opcode::QUERY());
  200. query_msg->setRcode(Rcode::NOERROR());
  201. query_msg->setHeaderFlag(Message::HEADERFLAG_RD);
  202. query_msg->addQuestion(question);
  203. EDNSPtr edns_query(new EDNS());
  204. edns_query->setUDPSize(Message::DEFAULT_MAX_EDNS0_UDPSIZE);
  205. query_msg->setEDNS(edns_query);
  206. MessageRenderer renderer(*data_->msgbuf);
  207. query_msg->toWire(renderer);
  208. }
  209. // Return protocol in use.
  210. IOFetch::Protocol
  211. IOFetch::getProtocol() const {
  212. return (data_->protocol);
  213. }
  214. /// The function operator is implemented with the "stackless coroutine"
  215. /// pattern; see internal/coroutine.h for details.
  216. void
  217. IOFetch::operator()(asio::error_code ec, size_t length) {
  218. if (data_->stopped) {
  219. return;
  220. } else if (ec) {
  221. logIOFailure(ec);
  222. return;
  223. }
  224. CORO_REENTER (this) {
  225. /// Generate the upstream query and render it to wire format
  226. /// This is done in a different scope to allow inline variable
  227. /// declarations.
  228. {
  229. if (data_->packet) {
  230. // A packet was given, overwrite the QID (which is in the
  231. // first two bytes of the packet).
  232. data_->msgbuf->writeUint16At(data_->qid, 0);
  233. }
  234. }
  235. // If we timeout, we stop, which will can cancel outstanding I/Os and
  236. // shutdown everything.
  237. if (data_->timeout != -1) {
  238. data_->timer.expires_from_now(boost::posix_time::milliseconds(
  239. data_->timeout));
  240. data_->timer.async_wait(boost::bind(&IOFetch::stop, *this,
  241. TIME_OUT));
  242. }
  243. // Open a connection to the target system. For speed, if the operation
  244. // is synchronous (i.e. UDP operation) we bypass the yield.
  245. data_->origin = ASIODNS_OPEN_SOCKET;
  246. if (data_->socket->isOpenSynchronous()) {
  247. data_->socket->open(data_->remote_snd.get(), *this);
  248. } else {
  249. CORO_YIELD data_->socket->open(data_->remote_snd.get(), *this);
  250. }
  251. do {
  252. // Begin an asynchronous send, and then yield. When the send completes,
  253. // we will resume immediately after this point.
  254. data_->origin = ASIODNS_SEND_DATA;
  255. CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(),
  256. data_->msgbuf->getLength(), data_->remote_snd.get(), *this);
  257. // Now receive the response. Since TCP may not receive the entire
  258. // message in one operation, we need to loop until we have received
  259. // it. (This can't be done within the asyncReceive() method because
  260. // each I/O operation will be done asynchronously and between each one
  261. // we need to yield ... and we *really* don't want to set up another
  262. // coroutine within that method.) So after each receive (and yield),
  263. // we check if the operation is complete and if not, loop to read again.
  264. //
  265. // Another concession to TCP is that the amount of is contained in the
  266. // first two bytes. This leads to two problems:
  267. //
  268. // a) We don't want those bytes in the return buffer.
  269. // b) They may not both arrive in the first I/O.
  270. //
  271. // So... we need to loop until we have at least two bytes, then store
  272. // the expected amount of data. Then we need to loop until we have
  273. // received all the data before copying it back to the user's buffer.
  274. // And we want to minimise the amount of copying...
  275. data_->origin = ASIODNS_READ_DATA;
  276. data_->cumulative = 0; // No data yet received
  277. data_->offset = 0; // First data into start of buffer
  278. data_->received->clear(); // Clear the receive buffer
  279. do {
  280. CORO_YIELD data_->socket->asyncReceive(data_->staging,
  281. static_cast<size_t>(STAGING_LENGTH),
  282. data_->offset,
  283. data_->remote_rcv.get(), *this);
  284. } while (!data_->socket->processReceivedData(data_->staging, length,
  285. data_->cumulative, data_->offset,
  286. data_->expected, data_->received));
  287. } while (!data_->responseOK());
  288. // Finished with this socket, so close it. This will not generate an
  289. // I/O error, but reset the origin to unknown in case we change this.
  290. data_->origin = ASIODNS_UNKNOWN_ORIGIN;
  291. data_->socket->close();
  292. /// We are done
  293. stop(SUCCESS);
  294. }
  295. }
  296. // Function that stops the coroutine sequence. It is called either when the
  297. // query finishes or when the timer times out. Either way, it sets the
  298. // "stopped_" flag and cancels anything that is in progress.
  299. //
  300. // As the function may be entered multiple times as things wind down, it checks
  301. // if the stopped_ flag is already set. If it is, the call is a no-op.
  302. void
  303. IOFetch::stop(Result result) {
  304. if (!data_->stopped) {
  305. // Mark the fetch as stopped to prevent other completion callbacks
  306. // (invoked because of the calls to cancel()) from executing the
  307. // cancel calls again.
  308. //
  309. // In a single threaded environment, the callbacks won't be invoked
  310. // until this one completes. In a multi-threaded environment, they may
  311. // well be, in which case the testing (and setting) of the stopped_
  312. // variable should be done inside a mutex (and the stopped_ variable
  313. // declared as "volatile").
  314. //
  315. // The numeric arguments indicate the debug level, with the lower
  316. // numbers indicating the most important information. The relative
  317. // values are somewhat arbitrary.
  318. //
  319. // TODO: Update testing of stopped_ if threads are used.
  320. data_->stopped = true;
  321. switch (result) {
  322. case TIME_OUT:
  323. LOG_DEBUG(logger, DBG_COMMON, ASIODNS_READ_TIMEOUT).
  324. arg(data_->remote_snd->getAddress().toText()).
  325. arg(data_->remote_snd->getPort());
  326. break;
  327. case SUCCESS:
  328. LOG_DEBUG(logger, DBG_ALL, ASIODNS_FETCH_COMPLETED).
  329. arg(data_->remote_rcv->getAddress().toText()).
  330. arg(data_->remote_rcv->getPort());
  331. break;
  332. case STOPPED:
  333. // Fetch has been stopped for some other reason. This is
  334. // allowed but as it is unusual it is logged, but with a lower
  335. // debug level than a timeout (which is totally normal).
  336. LOG_DEBUG(logger, DBG_IMPORTANT, ASIODNS_FETCH_STOPPED).
  337. arg(data_->remote_snd->getAddress().toText()).
  338. arg(data_->remote_snd->getPort());
  339. break;
  340. default:
  341. LOG_ERROR(logger, ASIODNS_UNKNOWN_RESULT).
  342. arg(data_->remote_snd->getAddress().toText()).
  343. arg(data_->remote_snd->getPort());
  344. }
  345. // Stop requested, cancel and I/O's on the socket and shut it down,
  346. // and cancel the timer.
  347. data_->socket->cancel();
  348. data_->socket->close();
  349. data_->timer.cancel();
  350. // Execute the I/O completion callback (if present).
  351. if (data_->callback) {
  352. (*(data_->callback))(result);
  353. }
  354. }
  355. }
  356. // Log an error - called on I/O failure
  357. void IOFetch::logIOFailure(asio::error_code ec) {
  358. // Should only get here with a known error code.
  359. assert((data_->origin == ASIODNS_OPEN_SOCKET) ||
  360. (data_->origin == ASIODNS_SEND_DATA) ||
  361. (data_->origin == ASIODNS_READ_DATA) ||
  362. (data_->origin == ASIODNS_UNKNOWN_ORIGIN));
  363. static const char* PROTOCOL[2] = {"TCP", "UDP"};
  364. LOG_ERROR(logger, data_->origin).arg(ec.value()).
  365. arg((data_->remote_snd->getProtocol() == IPPROTO_TCP) ?
  366. PROTOCOL[0] : PROTOCOL[1]).
  367. arg(data_->remote_snd->getAddress().toText()).
  368. arg(data_->remote_snd->getPort());
  369. }
  370. } // namespace asiodns
  371. } // namespace isc {