|
@@ -43,6 +43,9 @@
|
|
|
#include <asiolink/tcp_socket.h>
|
|
|
#include <asiolink/udp_endpoint.h>
|
|
|
#include <asiolink/udp_socket.h>
|
|
|
+#include <asiolink/qid_gen.h>
|
|
|
+
|
|
|
+#include <stdint.h>
|
|
|
|
|
|
using namespace asio;
|
|
|
using namespace isc::dns;
|
|
@@ -69,19 +72,20 @@ struct IOFetchData {
|
|
|
// which is not known until construction of the IOFetch. Use of a shared
|
|
|
// pointer here is merely to ensure deletion when the data object is deleted.
|
|
|
boost::scoped_ptr<IOAsioSocket<IOFetch> > socket;
|
|
|
- ///< Socket to use for I/O
|
|
|
- boost::scoped_ptr<IOEndpoint> remote; ///< Where the fetch was sent
|
|
|
- isc::dns::Question question; ///< Question to be asked
|
|
|
- isc::dns::OutputBufferPtr msgbuf; ///< Wire buffer for question
|
|
|
- isc::dns::OutputBufferPtr received; ///< Received data put here
|
|
|
- IOFetch::Callback* callback; ///< Called on I/O Completion
|
|
|
- asio::deadline_timer timer; ///< Timer to measure timeouts
|
|
|
- IOFetch::Protocol protocol; ///< Protocol being used
|
|
|
- size_t cumulative; ///< Cumulative received amount
|
|
|
- size_t expected; ///< Expected amount of data
|
|
|
- size_t offset; ///< Offset to receive data
|
|
|
- bool stopped; ///< Have we stopped running?
|
|
|
- int timeout; ///< Timeout in ms
|
|
|
+ ///< Socket to use for I/O
|
|
|
+ boost::scoped_ptr<IOEndpoint> remote_snd;///< Where the fetch is sent
|
|
|
+ boost::scoped_ptr<IOEndpoint> remote_rcv;///< Where the response came from
|
|
|
+ isc::dns::Question question; ///< Question to be asked
|
|
|
+ isc::dns::OutputBufferPtr msgbuf; ///< Wire buffer for question
|
|
|
+ isc::dns::OutputBufferPtr received; ///< Received data put here
|
|
|
+ IOFetch::Callback* callback; ///< Called on I/O Completion
|
|
|
+ asio::deadline_timer timer; ///< Timer to measure timeouts
|
|
|
+ IOFetch::Protocol protocol; ///< Protocol being used
|
|
|
+ size_t cumulative; ///< Cumulative received amount
|
|
|
+ size_t expected; ///< Expected amount of data
|
|
|
+ size_t offset; ///< Offset to receive data
|
|
|
+ bool stopped; ///< Have we stopped running?
|
|
|
+ int timeout; ///< Timeout in ms
|
|
|
|
|
|
// In case we need to log an error, the origin of the last asynchronous
|
|
|
// I/O is recorded. To save time and simplify the code, this is recorded
|
|
@@ -91,6 +95,7 @@ struct IOFetchData {
|
|
|
isc::log::MessageID origin; ///< Origin of last asynchronous I/O
|
|
|
uint8_t staging[IOFetch::STAGING_LENGTH];
|
|
|
///< Temporary array for received data
|
|
|
+ isc::dns::qid_t qid; ///< The QID set in the query
|
|
|
|
|
|
/// \brief Constructor
|
|
|
///
|
|
@@ -121,7 +126,11 @@ struct IOFetchData {
|
|
|
static_cast<IOAsioSocket<IOFetch>*>(
|
|
|
new TCPSocket<IOFetch>(service))
|
|
|
),
|
|
|
- remote((proto == IOFetch::UDP) ?
|
|
|
+ remote_snd((proto == IOFetch::UDP) ?
|
|
|
+ static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
|
|
|
+ static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
|
|
|
+ ),
|
|
|
+ remote_rcv((proto == IOFetch::UDP) ?
|
|
|
static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
|
|
|
static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
|
|
|
),
|
|
@@ -138,8 +147,21 @@ struct IOFetchData {
|
|
|
stopped(false),
|
|
|
timeout(wait),
|
|
|
origin(ASIO_UNKORIGIN),
|
|
|
- staging()
|
|
|
+ staging(),
|
|
|
+ qid(QidGenerator::getInstance().generateQid())
|
|
|
{}
|
|
|
+
|
|
|
+ // Checks if the response we received was ok;
|
|
|
+ // - data contains the buffer we read, as well as the address
|
|
|
+ // we sent to and the address we received from.
|
|
|
+ // length is provided by the operator() in IOFetch.
|
|
|
+ // Addresses must match, number of octets read must be at least
|
|
|
+ // 2, and the first two octets must match the qid of the message
|
|
|
+ // we sent.
|
|
|
+ bool responseOK() {
|
|
|
+ return (*remote_snd == *remote_rcv && cumulative >= 2 &&
|
|
|
+ readUint16(received->getData()) == qid);
|
|
|
+ }
|
|
|
};
|
|
|
|
|
|
/// IOFetch Constructor - just initialize the private data
|
|
@@ -180,7 +202,7 @@ IOFetch::operator()(asio::error_code ec, size_t length) {
|
|
|
/// declarations.
|
|
|
{
|
|
|
Message msg(Message::RENDER);
|
|
|
- msg.setQid(QidGenerator::getInstance().generateQid());
|
|
|
+ msg.setQid(data_->qid);
|
|
|
msg.setOpcode(Opcode::QUERY());
|
|
|
msg.setRcode(Rcode::NOERROR());
|
|
|
msg.setHeaderFlag(Message::HEADERFLAG_RD);
|
|
@@ -202,47 +224,49 @@ IOFetch::operator()(asio::error_code ec, size_t length) {
|
|
|
// is synchronous (i.e. UDP operation) we bypass the yield.
|
|
|
data_->origin = ASIO_OPENSOCK;
|
|
|
if (data_->socket->isOpenSynchronous()) {
|
|
|
- data_->socket->open(data_->remote.get(), *this);
|
|
|
+ data_->socket->open(data_->remote_snd.get(), *this);
|
|
|
} else {
|
|
|
- CORO_YIELD data_->socket->open(data_->remote.get(), *this);
|
|
|
+ CORO_YIELD data_->socket->open(data_->remote_snd.get(), *this);
|
|
|
}
|
|
|
|
|
|
- // Begin an asynchronous send, and then yield. When the send completes,
|
|
|
- // we will resume immediately after this point.
|
|
|
- data_->origin = ASIO_SENDSOCK;
|
|
|
- CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(),
|
|
|
- data_->msgbuf->getLength(), data_->remote.get(), *this);
|
|
|
-
|
|
|
- // Now receive the response. Since TCP may not receive the entire
|
|
|
- // message in one operation, we need to loop until we have received
|
|
|
- // it. (This can't be done within the asyncReceive() method because
|
|
|
- // each I/O operation will be done asynchronously and between each one
|
|
|
- // we need to yield ... and we *really* don't want to set up another
|
|
|
- // coroutine within that method.) So after each receive (and yield),
|
|
|
- // we check if the operation is complete and if not, loop to read again.
|
|
|
- //
|
|
|
- // Another concession to TCP is that the amount of is contained in the
|
|
|
- // first two bytes. This leads to two problems:
|
|
|
- //
|
|
|
- // a) We don't want those bytes in the return buffer.
|
|
|
- // b) They may not both arrive in the first I/O.
|
|
|
- //
|
|
|
- // So... we need to loop until we have at least two bytes, then store
|
|
|
- // the expected amount of data. Then we need to loop until we have
|
|
|
- // received all the data before copying it back to the user's buffer.
|
|
|
- // And we want to minimise the amount of copying...
|
|
|
-
|
|
|
- data_->origin = ASIO_RECVSOCK;
|
|
|
- data_->cumulative = 0; // No data yet received
|
|
|
- data_->offset = 0; // First data into start of buffer
|
|
|
do {
|
|
|
- CORO_YIELD data_->socket->asyncReceive(data_->staging,
|
|
|
- static_cast<size_t>(STAGING_LENGTH),
|
|
|
- data_->offset,
|
|
|
- data_->remote.get(), *this);
|
|
|
- } while (!data_->socket->processReceivedData(data_->staging, length,
|
|
|
- data_->cumulative, data_->offset,
|
|
|
- data_->expected, data_->received));
|
|
|
+ // Begin an asynchronous send, and then yield. When the send completes,
|
|
|
+ // we will resume immediately after this point.
|
|
|
+ data_->origin = ASIO_SENDSOCK;
|
|
|
+ CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(),
|
|
|
+ data_->msgbuf->getLength(), data_->remote_snd.get(), *this);
|
|
|
+
|
|
|
+ // Now receive the response. Since TCP may not receive the entire
|
|
|
+ // message in one operation, we need to loop until we have received
|
|
|
+ // it. (This can't be done within the asyncReceive() method because
|
|
|
+ // each I/O operation will be done asynchronously and between each one
|
|
|
+ // we need to yield ... and we *really* don't want to set up another
|
|
|
+ // coroutine within that method.) So after each receive (and yield),
|
|
|
+ // we check if the operation is complete and if not, loop to read again.
|
|
|
+ //
|
|
|
+ // Another concession to TCP is that the amount of is contained in the
|
|
|
+ // first two bytes. This leads to two problems:
|
|
|
+ //
|
|
|
+ // a) We don't want those bytes in the return buffer.
|
|
|
+ // b) They may not both arrive in the first I/O.
|
|
|
+ //
|
|
|
+ // So... we need to loop until we have at least two bytes, then store
|
|
|
+ // the expected amount of data. Then we need to loop until we have
|
|
|
+ // received all the data before copying it back to the user's buffer.
|
|
|
+ // And we want to minimise the amount of copying...
|
|
|
+
|
|
|
+ data_->origin = ASIO_RECVSOCK;
|
|
|
+ data_->cumulative = 0; // No data yet received
|
|
|
+ data_->offset = 0; // First data into start of buffer
|
|
|
+ do {
|
|
|
+ CORO_YIELD data_->socket->asyncReceive(data_->staging,
|
|
|
+ static_cast<size_t>(STAGING_LENGTH),
|
|
|
+ data_->offset,
|
|
|
+ data_->remote_rcv.get(), *this);
|
|
|
+ } while (!data_->socket->processReceivedData(data_->staging, length,
|
|
|
+ data_->cumulative, data_->offset,
|
|
|
+ data_->expected, data_->received));
|
|
|
+ } while (!data_->responseOK());
|
|
|
|
|
|
// Finished with this socket, so close it. This will not generate an
|
|
|
// I/O error, but reset the origin to unknown in case we change this.
|
|
@@ -290,16 +314,16 @@ IOFetch::stop(Result result) {
|
|
|
case TIME_OUT:
|
|
|
if (logger.isDebugEnabled(1)) {
|
|
|
logger.debug(20, ASIO_RECVTMO,
|
|
|
- data_->remote->getAddress().toText().c_str(),
|
|
|
- static_cast<int>(data_->remote->getPort()));
|
|
|
+ data_->remote_snd->getAddress().toText().c_str(),
|
|
|
+ static_cast<int>(data_->remote_snd->getPort()));
|
|
|
}
|
|
|
break;
|
|
|
|
|
|
case SUCCESS:
|
|
|
if (logger.isDebugEnabled(50)) {
|
|
|
logger.debug(30, ASIO_FETCHCOMP,
|
|
|
- data_->remote->getAddress().toText().c_str(),
|
|
|
- static_cast<int>(data_->remote->getPort()));
|
|
|
+ data_->remote_rcv->getAddress().toText().c_str(),
|
|
|
+ static_cast<int>(data_->remote_rcv->getPort()));
|
|
|
}
|
|
|
break;
|
|
|
|
|
@@ -308,14 +332,14 @@ IOFetch::stop(Result result) {
|
|
|
// allowed but as it is unusual it is logged, but with a lower
|
|
|
// debug level than a timeout (which is totally normal).
|
|
|
logger.debug(1, ASIO_FETCHSTOP,
|
|
|
- data_->remote->getAddress().toText().c_str(),
|
|
|
- static_cast<int>(data_->remote->getPort()));
|
|
|
+ data_->remote_snd->getAddress().toText().c_str(),
|
|
|
+ static_cast<int>(data_->remote_snd->getPort()));
|
|
|
break;
|
|
|
|
|
|
default:
|
|
|
logger.error(ASIO_UNKRESULT, static_cast<int>(result),
|
|
|
- data_->remote->getAddress().toText().c_str(),
|
|
|
- static_cast<int>(data_->remote->getPort()));
|
|
|
+ data_->remote_snd->getAddress().toText().c_str(),
|
|
|
+ static_cast<int>(data_->remote_snd->getPort()));
|
|
|
}
|
|
|
|
|
|
// Stop requested, cancel and I/O's on the socket and shut it down,
|
|
@@ -345,10 +369,10 @@ void IOFetch::logIOFailure(asio::error_code ec) {
|
|
|
static const char* PROTOCOL[2] = {"TCP", "UDP"};
|
|
|
logger.error(data_->origin,
|
|
|
ec.value(),
|
|
|
- ((data_->remote->getProtocol() == IPPROTO_TCP) ?
|
|
|
+ ((data_->remote_snd->getProtocol() == IPPROTO_TCP) ?
|
|
|
PROTOCOL[0] : PROTOCOL[1]),
|
|
|
- data_->remote->getAddress().toText().c_str(),
|
|
|
- static_cast<int>(data_->remote->getPort()));
|
|
|
+ data_->remote_snd->getAddress().toText().c_str(),
|
|
|
+ static_cast<int>(data_->remote_snd->getPort()));
|
|
|
}
|
|
|
|
|
|
} // namespace asiolink
|