|
@@ -30,10 +30,9 @@
|
|
|
#include <dns/message.h>
|
|
|
#include <dns/messagerenderer.h>
|
|
|
|
|
|
-#include <asiolink.h>
|
|
|
-
|
|
|
-#include "coroutine.h"
|
|
|
-#include "yield.h"
|
|
|
+#include <asiolink/asiolink.h>
|
|
|
+#include <asiolink/internal/tcpdns.h>
|
|
|
+#include <asiolink/internal/udpdns.h>
|
|
|
|
|
|
using namespace asio;
|
|
|
using asio::ip::udp;
|
|
@@ -74,71 +73,6 @@ IOAddress::getFamily() const {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// Note: this implementation is optimized for the case where this object
|
|
|
-// is created from an ASIO endpoint object in a receiving code path
|
|
|
-// by avoiding to make a copy of the base endpoint. For TCP it may not be
|
|
|
-// a big deal, but when we receive UDP packets at a high rate, the copy
|
|
|
-// overhead might be significant.
|
|
|
-class TCPEndpoint : public IOEndpoint {
|
|
|
-public:
|
|
|
- TCPEndpoint(const IOAddress& address, const unsigned short port) :
|
|
|
- asio_endpoint_placeholder_(
|
|
|
- new tcp::endpoint(ip::address::from_string(address.toText()),
|
|
|
- port)),
|
|
|
- asio_endpoint_(*asio_endpoint_placeholder_)
|
|
|
- {}
|
|
|
- TCPEndpoint(const tcp::endpoint& asio_endpoint) :
|
|
|
- asio_endpoint_placeholder_(NULL), asio_endpoint_(asio_endpoint)
|
|
|
- {}
|
|
|
-
|
|
|
- ~TCPEndpoint() { delete asio_endpoint_placeholder_; }
|
|
|
- virtual IOAddress getAddress() const {
|
|
|
- return (asio_endpoint_.address());
|
|
|
- }
|
|
|
- virtual uint16_t getPort() const {
|
|
|
- return (asio_endpoint_.port());
|
|
|
- }
|
|
|
- virtual short getProtocol() const {
|
|
|
- return (asio_endpoint_.protocol().protocol());
|
|
|
- }
|
|
|
- virtual short getFamily() const {
|
|
|
- return (asio_endpoint_.protocol().family());
|
|
|
- }
|
|
|
-private:
|
|
|
- const tcp::endpoint* asio_endpoint_placeholder_;
|
|
|
- const tcp::endpoint& asio_endpoint_;
|
|
|
-};
|
|
|
-
|
|
|
-class UDPEndpoint : public IOEndpoint {
|
|
|
-public:
|
|
|
- UDPEndpoint(const IOAddress& address, const unsigned short port) :
|
|
|
- asio_endpoint_placeholder_(
|
|
|
- new udp::endpoint(ip::address::from_string(address.toText()),
|
|
|
- port)),
|
|
|
- asio_endpoint_(*asio_endpoint_placeholder_)
|
|
|
- {}
|
|
|
- UDPEndpoint(const udp::endpoint& asio_endpoint) :
|
|
|
- asio_endpoint_placeholder_(NULL), asio_endpoint_(asio_endpoint)
|
|
|
- {}
|
|
|
-
|
|
|
- ~UDPEndpoint() { delete asio_endpoint_placeholder_; }
|
|
|
- virtual IOAddress getAddress() const {
|
|
|
- return (asio_endpoint_.address());
|
|
|
- }
|
|
|
- virtual uint16_t getPort() const {
|
|
|
- return (asio_endpoint_.port());
|
|
|
- }
|
|
|
- virtual short getProtocol() const {
|
|
|
- return (asio_endpoint_.protocol().protocol());
|
|
|
- }
|
|
|
- virtual short getFamily() const {
|
|
|
- return (asio_endpoint_.protocol().family());
|
|
|
- }
|
|
|
-private:
|
|
|
- const udp::endpoint* asio_endpoint_placeholder_;
|
|
|
- const udp::endpoint& asio_endpoint_;
|
|
|
-};
|
|
|
-
|
|
|
const IOEndpoint*
|
|
|
IOEndpoint::create(const int protocol, const IOAddress& address,
|
|
|
const unsigned short port)
|
|
@@ -153,261 +87,12 @@ IOEndpoint::create(const int protocol, const IOAddress& address,
|
|
|
protocol);
|
|
|
}
|
|
|
|
|
|
-class TCPSocket : public IOSocket {
|
|
|
-private:
|
|
|
- TCPSocket(const TCPSocket& source);
|
|
|
- TCPSocket& operator=(const TCPSocket& source);
|
|
|
-public:
|
|
|
- TCPSocket(tcp::socket& socket) : socket_(socket) {}
|
|
|
- virtual int getNative() const { return (socket_.native()); }
|
|
|
- virtual int getProtocol() const { return (IPPROTO_TCP); }
|
|
|
-private:
|
|
|
- tcp::socket& socket_;
|
|
|
-};
|
|
|
-
|
|
|
-class UDPSocket : public IOSocket {
|
|
|
-private:
|
|
|
- UDPSocket(const UDPSocket& source);
|
|
|
- UDPSocket& operator=(const UDPSocket& source);
|
|
|
-public:
|
|
|
- UDPSocket(udp::socket& socket) : socket_(socket) {}
|
|
|
- virtual int getNative() const { return (socket_.native()); }
|
|
|
- virtual int getProtocol() const { return (IPPROTO_UDP); }
|
|
|
-private:
|
|
|
- udp::socket& socket_;
|
|
|
-};
|
|
|
-
|
|
|
IOMessage::IOMessage(const void* data, const size_t data_size,
|
|
|
IOSocket& io_socket, const IOEndpoint& remote_endpoint) :
|
|
|
data_(data), data_size_(data_size), io_socket_(io_socket),
|
|
|
remote_endpoint_(remote_endpoint)
|
|
|
{}
|
|
|
|
|
|
-//
|
|
|
-// Asynchronous TCP server coroutine
|
|
|
-//
|
|
|
-class TCPServer : public coroutine {
|
|
|
-public:
|
|
|
- explicit TCPServer(io_service& io_service,
|
|
|
- const ip::address& addr, const uint16_t port,
|
|
|
- CheckinProvider* checkin = NULL,
|
|
|
- DNSProvider* process = NULL) :
|
|
|
- checkin_callback_(checkin), dns_callback_(process)
|
|
|
- {
|
|
|
-
|
|
|
- tcp::endpoint endpoint(addr, port);
|
|
|
- acceptor_.reset(new tcp::acceptor(io_service));
|
|
|
- acceptor_->open(endpoint.protocol());
|
|
|
- // Set v6-only (we use a different instantiation for v4,
|
|
|
- // otherwise asio will bind to both v4 and v6
|
|
|
- if (addr.is_v6()) {
|
|
|
- acceptor_->set_option(ip::v6_only(true));
|
|
|
- }
|
|
|
- acceptor_->set_option(tcp::acceptor::reuse_address(true));
|
|
|
- acceptor_->bind(endpoint);
|
|
|
- acceptor_->listen();
|
|
|
- }
|
|
|
-
|
|
|
- void operator()(error_code ec = error_code(), size_t length = 0) {
|
|
|
- if (ec) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- reenter (this) {
|
|
|
- do {
|
|
|
- socket_.reset(new tcp::socket(acceptor_->get_io_service()));
|
|
|
- yield acceptor_->async_accept(*socket_, *this);
|
|
|
- fork TCPServer(*this)();
|
|
|
- } while (is_parent());
|
|
|
-
|
|
|
- // Perform any necessary operations prior to processing
|
|
|
- // an incoming packet (e.g., checking for queued
|
|
|
- // configuration messages).
|
|
|
- if (checkin_callback_ != NULL) {
|
|
|
- (*checkin_callback_)();
|
|
|
- }
|
|
|
-
|
|
|
- // Instantiate the data buffer that will be used by the
|
|
|
- // asynchronous read calls.
|
|
|
- // data_.reset(new boost::array<char, MAX_LENGTH>);
|
|
|
- data_ = boost::shared_ptr<char>(new char[MAX_LENGTH]);
|
|
|
-
|
|
|
- yield async_read(*socket_,
|
|
|
- asio::buffer(data_.get(), TCP_MESSAGE_LENGTHSIZE),
|
|
|
- *this);
|
|
|
-
|
|
|
- yield {
|
|
|
- InputBuffer dnsbuffer((const void *) data_.get(), length);
|
|
|
- uint16_t msglen = dnsbuffer.readUint16();
|
|
|
- async_read(*socket_, asio::buffer(data_.get(), msglen), *this);
|
|
|
- }
|
|
|
-
|
|
|
- // Stop here if we don't have a DNS callback function
|
|
|
- if (dns_callback_ == NULL) {
|
|
|
- yield return;
|
|
|
- }
|
|
|
-
|
|
|
- // Instantiate the objects that will be used by the
|
|
|
- // asynchronous write calls.
|
|
|
- dns_message_.reset(new Message(Message::PARSE));
|
|
|
- response_.reset(new OutputBuffer(0));
|
|
|
- responselen_buffer_.reset(new OutputBuffer(TCP_MESSAGE_LENGTHSIZE));
|
|
|
- renderer_.reset(new MessageRenderer(*response_));
|
|
|
- io_socket_.reset(new TCPSocket(*socket_));
|
|
|
- io_endpoint_.reset(new TCPEndpoint(socket_->remote_endpoint()));
|
|
|
- io_message_.reset(new IOMessage(data_.get(), length, *io_socket_,
|
|
|
- *io_endpoint_));
|
|
|
-
|
|
|
- // Process the DNS message
|
|
|
- if (! (*dns_callback_)(*io_message_, *dns_message_, *renderer_)) {
|
|
|
- yield return;
|
|
|
- }
|
|
|
-
|
|
|
- responselen_buffer_->writeUint16(response_->getLength());
|
|
|
- yield async_write(*socket_,
|
|
|
- asio::buffer(responselen_buffer_->getData(),
|
|
|
- responselen_buffer_->getLength()),
|
|
|
- *this);
|
|
|
- yield async_write(*socket_,
|
|
|
- asio::buffer(response_->getData(),
|
|
|
- response_->getLength()),
|
|
|
- *this);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-private:
|
|
|
- enum { MAX_LENGTH = 65535 };
|
|
|
- static const size_t TCP_MESSAGE_LENGTHSIZE = 2;
|
|
|
-
|
|
|
- // Class member variables which are dynamic, and changes to which
|
|
|
- // are expected to be accessible from both sides of a coroutine fork,
|
|
|
- // should be declared here as shared pointers and allocated in the
|
|
|
- // constructor or in the coroutine itself. (Forking a new coroutine
|
|
|
- // causes class members to be copied, not referenced, so without using
|
|
|
- // this approach, when a variable is changed by a "parent" coroutine
|
|
|
- // the change might not be visible to the "child". Using shared_ptr<>
|
|
|
- // ensures that when all coroutines using this data are deleted, the
|
|
|
- // memory will be freed.)
|
|
|
- boost::shared_ptr<tcp::acceptor> acceptor_;
|
|
|
- boost::shared_ptr<tcp::socket> socket_;
|
|
|
- boost::shared_ptr<OutputBuffer> response_;
|
|
|
- boost::shared_ptr<OutputBuffer> responselen_buffer_;
|
|
|
- boost::shared_ptr<MessageRenderer> renderer_;
|
|
|
- boost::shared_ptr<Message> dns_message_;
|
|
|
- boost::shared_ptr<IOMessage> io_message_;
|
|
|
- boost::shared_ptr<TCPSocket> io_socket_;
|
|
|
- boost::shared_ptr<TCPEndpoint> io_endpoint_;
|
|
|
- boost::shared_ptr<char> data_;
|
|
|
-
|
|
|
- // Callbacks
|
|
|
- const CheckinProvider* checkin_callback_;
|
|
|
- const DNSProvider* dns_callback_;
|
|
|
-};
|
|
|
-
|
|
|
-//
|
|
|
-// Asynchronous UDP server coroutine
|
|
|
-//
|
|
|
-class UDPServer : public coroutine {
|
|
|
-public:
|
|
|
- explicit UDPServer(io_service& io_service,
|
|
|
- const ip::address& addr, const uint16_t port,
|
|
|
- CheckinProvider* checkin = NULL,
|
|
|
- DNSProvider* process = NULL) :
|
|
|
- checkin_callback_(checkin), dns_callback_(process)
|
|
|
- {
|
|
|
- // Wwe use a different instantiation for v4,
|
|
|
- // otherwise asio will bind to both v4 and v6
|
|
|
- if (addr.is_v6()) {
|
|
|
- socket_.reset(new udp::socket(io_service, udp::v6()));
|
|
|
- socket_->set_option(socket_base::reuse_address(true));
|
|
|
- socket_->set_option(asio::ip::v6_only(true));
|
|
|
- socket_->bind(udp::endpoint(udp::v6(), port));
|
|
|
- } else {
|
|
|
- socket_.reset(new udp::socket(io_service, udp::v4()));
|
|
|
- socket_->set_option(socket_base::reuse_address(true));
|
|
|
- socket_->bind(udp::endpoint(udp::v6(), port));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- void operator()(error_code ec = error_code(), size_t length = 0) {
|
|
|
- reenter (this) for (;;) {
|
|
|
- // Instantiate the data buffer that will be used by the
|
|
|
- // asynchronous read calls.
|
|
|
- // data_.reset(new boost::array<char, MAX_LENGTH>);
|
|
|
- data_ = boost::shared_ptr<char>(new char[MAX_LENGTH]);
|
|
|
- sender_.reset(new udp::endpoint());
|
|
|
-
|
|
|
- do {
|
|
|
- yield socket_->async_receive_from(asio::buffer(data_.get(),
|
|
|
- MAX_LENGTH),
|
|
|
- *sender_, *this);
|
|
|
- } while (ec || length == 0);
|
|
|
-
|
|
|
- bytes_ = length;
|
|
|
- fork UDPServer(*this)();
|
|
|
- if (is_parent()) {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- // Perform any necessary operations prior to processing
|
|
|
- // an incoming packet (e.g., checking for queued
|
|
|
- // configuration messages).
|
|
|
- if (checkin_callback_ != NULL) {
|
|
|
- (*checkin_callback_)();
|
|
|
- }
|
|
|
-
|
|
|
- // Stop here if we don't have a DNS callback function
|
|
|
- if (dns_callback_ == NULL) {
|
|
|
- yield return;
|
|
|
- }
|
|
|
-
|
|
|
- // Instantiate the objects that will be used by the
|
|
|
- // asynchronous write calls.
|
|
|
- dns_message_.reset(new Message(Message::PARSE));
|
|
|
- response_.reset(new OutputBuffer(0));
|
|
|
- renderer_.reset(new MessageRenderer(*response_));
|
|
|
- io_socket_.reset(new UDPSocket(*socket_));
|
|
|
- io_endpoint_.reset(new UDPEndpoint(*sender_));
|
|
|
- io_message_.reset(new IOMessage(data_.get(), bytes_,
|
|
|
- *io_socket_,
|
|
|
- *io_endpoint_));
|
|
|
-
|
|
|
- // Process the DNS message
|
|
|
- if (! (*dns_callback_)(*io_message_, *dns_message_, *renderer_))
|
|
|
- {
|
|
|
- yield return;
|
|
|
- }
|
|
|
-
|
|
|
- yield socket_->async_send_to(asio::buffer(response_->getData(),
|
|
|
- response_->getLength()),
|
|
|
- *sender_, *this);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-private:
|
|
|
- enum { MAX_LENGTH = 4096 };
|
|
|
-
|
|
|
- // As mentioned in the comments to TCPServer, class member variables
|
|
|
- // which are dynamic, and changes to which are expected to be
|
|
|
- // accessible from both sides of a coroutine fork, should be
|
|
|
- // declared here as shared pointers and allocated in the
|
|
|
- // constructor or in the coroutine.
|
|
|
- boost::shared_ptr<udp::socket> socket_;
|
|
|
- boost::shared_ptr<udp::endpoint> sender_;
|
|
|
- boost::shared_ptr<UDPEndpoint> io_endpoint_;
|
|
|
- boost::shared_ptr<OutputBuffer> response_;
|
|
|
- boost::shared_ptr<MessageRenderer> renderer_;
|
|
|
- boost::shared_ptr<Message> dns_message_;
|
|
|
- boost::shared_ptr<IOMessage> io_message_;
|
|
|
- boost::shared_ptr<UDPSocket> io_socket_;
|
|
|
- boost::shared_ptr<char> data_;
|
|
|
- size_t bytes_;
|
|
|
-
|
|
|
- // Callbacks
|
|
|
- const CheckinProvider* checkin_callback_;
|
|
|
- const DNSProvider* dns_callback_;
|
|
|
-};
|
|
|
-
|
|
|
class IOServiceImpl {
|
|
|
public:
|
|
|
IOServiceImpl(const char& port,
|