|
@@ -173,20 +173,33 @@ UDPServer::resume(const bool done) {
|
|
|
io_.post(*this);
|
|
|
}
|
|
|
|
|
|
-struct UDPQuery::Priv {
|
|
|
+// Private UDPQuery data (see internal/udpdns.h for reasons)
|
|
|
+struct UDPQuery::PrivateData {
|
|
|
+ // Socket we send query to and expect reply from there
|
|
|
udp::socket socket;
|
|
|
+ // Where was the query sent
|
|
|
udp::endpoint remote;
|
|
|
+ // What we ask the server
|
|
|
Question question;
|
|
|
+ // We will store the answer here
|
|
|
OutputBufferPtr buffer;
|
|
|
OutputBufferPtr msgbuf;
|
|
|
+ // Temporary buffer for answer
|
|
|
boost::shared_array<char> data;
|
|
|
+ // This will be called when the data arrive or timeouts
|
|
|
Callback* callback;
|
|
|
+ // Did we already stop operating (data arrived, we timed out, someone
|
|
|
+ // called stop). This can be so when we are cleaning up/there are
|
|
|
+ // still pointers to us.
|
|
|
bool stopped;
|
|
|
+ // Timer to measure timeouts.
|
|
|
deadline_timer timer;
|
|
|
+ // How many milliseconds are we willing to wait for answer?
|
|
|
int timeout;
|
|
|
|
|
|
- Priv(io_service& service, const udp::socket::protocol_type& protocol,
|
|
|
- const Question &q, OutputBufferPtr b, Callback *c) :
|
|
|
+ PrivateData(io_service& service,
|
|
|
+ const udp::socket::protocol_type& protocol, const Question &q,
|
|
|
+ OutputBufferPtr b, Callback *c) :
|
|
|
socket(service, protocol),
|
|
|
question(q),
|
|
|
buffer(b),
|
|
@@ -203,19 +216,19 @@ struct UDPQuery::Priv {
|
|
|
UDPQuery::UDPQuery(io_service& io_service,
|
|
|
const Question& q, const IOAddress& addr, uint16_t port,
|
|
|
OutputBufferPtr buffer, Callback *callback, int timeout) :
|
|
|
- priv(new Priv(io_service,
|
|
|
+ data_(new PrivateData(io_service,
|
|
|
addr.getFamily() == AF_INET ? udp::v4() : udp::v6(), q, buffer,
|
|
|
callback))
|
|
|
{
|
|
|
- priv->remote = UDPEndpoint(addr, port).getASIOEndpoint();
|
|
|
- priv->timeout = timeout;
|
|
|
+ data_->remote = UDPEndpoint(addr, port).getASIOEndpoint();
|
|
|
+ data_->timeout = timeout;
|
|
|
}
|
|
|
|
|
|
/// The function operator is implemented with the "stackless coroutine"
|
|
|
/// pattern; see internal/coroutine.h for details.
|
|
|
void
|
|
|
UDPQuery::operator()(error_code ec, size_t length) {
|
|
|
- if (ec || priv->stopped) {
|
|
|
+ if (ec || data_->stopped) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -231,40 +244,40 @@ UDPQuery::operator()(error_code ec, size_t length) {
|
|
|
msg.setOpcode(Opcode::QUERY());
|
|
|
msg.setRcode(Rcode::NOERROR());
|
|
|
msg.setHeaderFlag(MessageFlag::RD());
|
|
|
- msg.addQuestion(priv->question);
|
|
|
- MessageRenderer renderer(*priv->msgbuf);
|
|
|
+ msg.addQuestion(data_->question);
|
|
|
+ MessageRenderer renderer(*data_->msgbuf);
|
|
|
msg.toWire(renderer);
|
|
|
}
|
|
|
|
|
|
// If we timeout, we stop, which will shutdown everything and
|
|
|
// cancel all other attempts to run inside the coroutine
|
|
|
- if (priv->timeout != -1) {
|
|
|
- priv->timer.expires_from_now(boost::posix_time::milliseconds(
|
|
|
- priv->timeout));
|
|
|
- priv->timer.async_wait(boost::bind(&UDPQuery::stop, *this,
|
|
|
+ if (data_->timeout != -1) {
|
|
|
+ data_->timer.expires_from_now(boost::posix_time::milliseconds(
|
|
|
+ data_->timeout));
|
|
|
+ data_->timer.async_wait(boost::bind(&UDPQuery::stop, *this,
|
|
|
TIME_OUT));
|
|
|
}
|
|
|
|
|
|
// Begin an asynchronous send, and then yield. When the
|
|
|
// send completes, we will resume immediately after this point.
|
|
|
- CORO_YIELD priv->socket.async_send_to(buffer(priv->msgbuf->getData(),
|
|
|
- priv->msgbuf->getLength()), priv->remote, *this);
|
|
|
+ CORO_YIELD data_->socket.async_send_to(buffer(data_->msgbuf->getData(),
|
|
|
+ data_->msgbuf->getLength()), data_->remote, *this);
|
|
|
|
|
|
/// Allocate space for the response. (XXX: This should be
|
|
|
/// optimized by maintaining a free list of pre-allocated blocks)
|
|
|
- priv->data.reset(new char[MAX_LENGTH]);
|
|
|
+ data_->data.reset(new char[MAX_LENGTH]);
|
|
|
|
|
|
/// Begin an asynchronous receive, and yield. When the receive
|
|
|
/// completes, we will resume immediately after this point.
|
|
|
- CORO_YIELD priv->socket.async_receive_from(buffer(priv->data.get(),
|
|
|
- MAX_LENGTH), priv->remote, *this);
|
|
|
+ CORO_YIELD data_->socket.async_receive_from(buffer(data_->data.get(),
|
|
|
+ MAX_LENGTH), data_->remote, *this);
|
|
|
|
|
|
/// Copy the answer into the response buffer. (XXX: If the
|
|
|
/// OutputBuffer object were made to meet the requirements of
|
|
|
/// a MutableBufferSequence, then it could be written to directly
|
|
|
/// by async_recieve_from() and this additional copy step would
|
|
|
/// be unnecessary.)
|
|
|
- priv->buffer->writeData(priv->data.get(), length);
|
|
|
+ data_->buffer->writeData(data_->data.get(), length);
|
|
|
|
|
|
/// We are done
|
|
|
stop(SUCCESS);
|
|
@@ -273,13 +286,13 @@ UDPQuery::operator()(error_code ec, size_t length) {
|
|
|
|
|
|
void
|
|
|
UDPQuery::stop(Result result) {
|
|
|
- if (!priv->stopped) {
|
|
|
- priv->stopped = true;
|
|
|
- priv->socket.cancel();
|
|
|
- priv->socket.close();
|
|
|
- priv->timer.cancel();
|
|
|
- if (priv->callback) {
|
|
|
- (*priv->callback)(result);
|
|
|
+ if (!data_->stopped) {
|
|
|
+ data_->stopped = true;
|
|
|
+ data_->socket.cancel();
|
|
|
+ data_->socket.close();
|
|
|
+ data_->timer.cancel();
|
|
|
+ if (data_->callback) {
|
|
|
+ (*data_->callback)(result);
|
|
|
}
|
|
|
}
|
|
|
}
|