asio_link.cc 20 KB


  1. // Copyright (C) 2010 Internet Systems Consortium, Inc. ("ISC")
  2. //
  3. // Permission to use, copy, modify, and/or distribute this software for any
  4. // purpose with or without fee is hereby granted, provided that the above
  5. // copyright notice and this permission notice appear in all copies.
  6. //
  7. // THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
  8. // REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
  9. // AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
  10. // INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
  11. // LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
  12. // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
  13. // PERFORMANCE OF THIS SOFTWARE.
  14. // $Id$
  15. #include <config.h>
  16. #include <unistd.h> // for some IPC/network system calls
  17. #include <sys/socket.h>
  18. #include <netinet/in.h>
  19. #include <asio.hpp>
  20. #include <boost/lexical_cast.hpp>
  21. #include <boost/bind.hpp>
  22. #include <boost/shared_ptr.hpp>
  23. #include <dns/buffer.h>
  24. #include <dns/message.h>
  25. #include <dns/messagerenderer.h>
  26. #include <asio_link.h>
  27. #include <auth/auth_srv.h>
  28. #include "common.h"
  29. using namespace asio;
  30. using asio::ip::udp;
  31. using asio::ip::tcp;
  32. using namespace std;
  33. using namespace isc::dns;
  34. namespace asio_link {
  35. IOAddress::IOAddress(const string& address_str)
  36. // XXX: we cannot simply construct the address in the initialization list
  37. // because we'd like to throw our own exception on failure.
  38. {
  39. error_code err;
  40. asio_address_ = ip::address::from_string(address_str, err);
  41. if (err) {
  42. isc_throw(IOError, "Failed to convert string to address '"
  43. << address_str << "': " << err.message());
  44. }
  45. }
  46. IOAddress::IOAddress(const ip::address& asio_address) :
  47. asio_address_(asio_address)
  48. {}
  49. string
  50. IOAddress::toText() const {
  51. return (asio_address_.to_string());
  52. }
  53. class TCPEndpoint : public IOEndpoint {
  54. public:
  55. TCPEndpoint(const IOAddress& address, const unsigned short port) :
  56. asio_endpoint_placeholder_(
  57. new tcp::endpoint(ip::address::from_string(address.toText()),
  58. port)),
  59. asio_endpoint_(*asio_endpoint_placeholder_)
  60. {}
  61. TCPEndpoint(const tcp::endpoint& asio_endpoint) :
  62. asio_endpoint_placeholder_(NULL), asio_endpoint_(asio_endpoint)
  63. {}
  64. ~TCPEndpoint() { delete asio_endpoint_placeholder_; }
  65. virtual IOAddress getAddress() const {
  66. return (asio_endpoint_.address());
  67. }
  68. private:
  69. const tcp::endpoint* asio_endpoint_placeholder_;
  70. const tcp::endpoint& asio_endpoint_;
  71. };
  72. class UDPEndpoint : public IOEndpoint {
  73. public:
  74. UDPEndpoint(const IOAddress& address, const unsigned short port) :
  75. asio_endpoint_placeholder_(
  76. new udp::endpoint(ip::address::from_string(address.toText()),
  77. port)),
  78. asio_endpoint_(*asio_endpoint_placeholder_)
  79. {}
  80. UDPEndpoint(const udp::endpoint& asio_endpoint) :
  81. asio_endpoint_placeholder_(NULL), asio_endpoint_(asio_endpoint)
  82. {}
  83. ~UDPEndpoint() { delete asio_endpoint_placeholder_; }
  84. virtual IOAddress getAddress() const {
  85. return (asio_endpoint_.address());
  86. }
  87. private:
  88. const udp::endpoint* asio_endpoint_placeholder_;
  89. const udp::endpoint& asio_endpoint_;
  90. };
  91. const IOEndpoint*
  92. IOEndpoint::create(const int protocol, const IOAddress& address,
  93. const unsigned short port)
  94. {
  95. if (protocol == IPPROTO_UDP) {
  96. return (new UDPEndpoint(address, port));
  97. } else if (protocol == IPPROTO_TCP) {
  98. return (new TCPEndpoint(address, port));
  99. }
  100. isc_throw(IOError,
  101. "IOEndpoint creation attempt for unsupported protocol: " <<
  102. protocol);
  103. }
  104. class TCPSocket : public IOSocket {
  105. private:
  106. TCPSocket(const TCPSocket& source);
  107. TCPSocket& operator=(const TCPSocket& source);
  108. public:
  109. TCPSocket(tcp::socket& socket) : socket_(socket) {}
  110. virtual int getNative() const { return (socket_.native()); }
  111. virtual int getProtocol() const { return (IPPROTO_TCP); }
  112. private:
  113. tcp::socket& socket_;
  114. };
  115. class UDPSocket : public IOSocket {
  116. private:
  117. UDPSocket(const UDPSocket& source);
  118. UDPSocket& operator=(const UDPSocket& source);
  119. public:
  120. UDPSocket(udp::socket& socket) : socket_(socket) {}
  121. virtual int getNative() const { return (socket_.native()); }
  122. virtual int getProtocol() const { return (IPPROTO_UDP); }
  123. private:
  124. udp::socket& socket_;
  125. };
  126. class DummySocket : public IOSocket {
  127. private:
  128. DummySocket(const DummySocket& source);
  129. DummySocket& operator=(const DummySocket& source);
  130. public:
  131. DummySocket(const int protocol) : protocol_(protocol) {}
  132. virtual int getNative() const { return (-1); }
  133. virtual int getProtocol() const { return (protocol_); }
  134. private:
  135. const int protocol_;
  136. };
  137. IOSocket&
  138. IOSocket::getDummyUDPSocket() {
  139. static DummySocket socket(IPPROTO_UDP);
  140. return (socket);
  141. }
  142. IOSocket&
  143. IOSocket::getDummyTCPSocket() {
  144. static DummySocket socket(IPPROTO_TCP);
  145. return (socket);
  146. }
  147. IOMessage::IOMessage(const void* data, const size_t data_size,
  148. IOSocket& io_socket, const IOEndpoint& remote_endpoint) :
  149. data_(data), data_size_(data_size), io_socket_(io_socket),
  150. remote_endpoint_(remote_endpoint)
  151. {}
  152. //
  153. // Helper classes for asynchronous I/O using asio
  154. //
  155. class TCPClient {
  156. public:
  157. TCPClient(AuthSrv* auth_server, io_service& io_service) :
  158. auth_server_(auth_server),
  159. socket_(io_service),
  160. io_socket_(socket_),
  161. response_buffer_(0),
  162. responselen_buffer_(TCP_MESSAGE_LENGTHSIZE),
  163. response_renderer_(response_buffer_),
  164. dns_message_(Message::PARSE),
  165. custom_callback_(NULL)
  166. {}
  167. void start() {
  168. // Check for queued configuration commands
  169. if (auth_server_ != NULL &&
  170. auth_server_->configSession()->hasQueuedMsgs()) {
  171. auth_server_->configSession()->checkCommand();
  172. }
  173. async_read(socket_, asio::buffer(data_, TCP_MESSAGE_LENGTHSIZE),
  174. boost::bind(&TCPClient::headerRead, this,
  175. placeholders::error,
  176. placeholders::bytes_transferred));
  177. }
  178. ip::tcp::socket& getSocket() { return (socket_); }
  179. void headerRead(const asio::error_code& error,
  180. size_t bytes_transferred)
  181. {
  182. if (!error) {
  183. InputBuffer dnsbuffer(data_, bytes_transferred);
  184. uint16_t msglen = dnsbuffer.readUint16();
  185. async_read(socket_, asio::buffer(data_, msglen),
  186. boost::bind(&TCPClient::requestRead, this,
  187. placeholders::error,
  188. placeholders::bytes_transferred));
  189. } else {
  190. delete this;
  191. }
  192. }
  193. void requestRead(const asio::error_code& error,
  194. size_t bytes_transferred)
  195. {
  196. if (!error) {
  197. const TCPEndpoint remote_endpoint(socket_.remote_endpoint());
  198. const IOMessage io_message(data_, bytes_transferred, io_socket_,
  199. remote_endpoint);
  200. // currently, for testing purpose only
  201. if (custom_callback_ != NULL) {
  202. (*custom_callback_)(io_message);
  203. start();
  204. return;
  205. }
  206. if (auth_server_->processMessage(io_message, dns_message_,
  207. response_renderer_)) {
  208. responselen_buffer_.writeUint16(
  209. response_buffer_.getLength());
  210. async_write(socket_,
  211. asio::buffer(
  212. responselen_buffer_.getData(),
  213. responselen_buffer_.getLength()),
  214. boost::bind(&TCPClient::responseWrite, this,
  215. placeholders::error));
  216. } else {
  217. delete this;
  218. }
  219. } else {
  220. delete this;
  221. }
  222. }
  223. void responseWrite(const asio::error_code& error) {
  224. if (!error) {
  225. async_write(socket_,
  226. asio::buffer(response_buffer_.getData(),
  227. response_buffer_.getLength()),
  228. boost::bind(&TCPClient::handleWrite, this,
  229. placeholders::error));
  230. } else {
  231. delete this;
  232. }
  233. }
  234. void handleWrite(const asio::error_code& error) {
  235. if (!error) {
  236. start(); // handle next request, if any.
  237. } else {
  238. delete this;
  239. }
  240. }
  241. // Currently this is for tests only
  242. void setCallBack(const IOService::IOCallBack* callback) {
  243. custom_callback_ = callback;
  244. }
  245. private:
  246. AuthSrv* auth_server_;
  247. tcp::socket socket_;
  248. TCPSocket io_socket_;
  249. OutputBuffer response_buffer_;
  250. OutputBuffer responselen_buffer_;
  251. MessageRenderer response_renderer_;
  252. Message dns_message_;
  253. enum { MAX_LENGTH = 65535 };
  254. static const size_t TCP_MESSAGE_LENGTHSIZE = 2;
  255. char data_[MAX_LENGTH];
  256. // currently, for testing purpose only.
  257. const IOService::IOCallBack* custom_callback_;
  258. };
  259. class TCPServer {
  260. public:
  261. TCPServer(AuthSrv* auth_server, io_service& io_service,
  262. int af, uint16_t port) :
  263. auth_server_(auth_server), io_service_(io_service),
  264. acceptor_(io_service_), listening_(new TCPClient(auth_server_,
  265. io_service_)),
  266. custom_callback_(NULL)
  267. {
  268. tcp::endpoint endpoint(af == AF_INET6 ? tcp::v6() : tcp::v4(), port);
  269. acceptor_.open(endpoint.protocol());
  270. // Set v6-only (we use a different instantiation for v4,
  271. // otherwise asio will bind to both v4 and v6
  272. if (af == AF_INET6) {
  273. acceptor_.set_option(ip::v6_only(true));
  274. }
  275. acceptor_.set_option(tcp::acceptor::reuse_address(true));
  276. acceptor_.bind(endpoint);
  277. acceptor_.listen();
  278. acceptor_.async_accept(listening_->getSocket(),
  279. boost::bind(&TCPServer::handleAccept, this,
  280. listening_, placeholders::error));
  281. }
  282. TCPServer(AuthSrv* auth_server, io_service& io_service,
  283. asio::ip::address addr, uint16_t port) :
  284. auth_server_(auth_server),
  285. io_service_(io_service), acceptor_(io_service_),
  286. listening_(new TCPClient(auth_server, io_service_))
  287. {
  288. tcp::endpoint endpoint(addr, port);
  289. acceptor_.open(endpoint.protocol());
  290. acceptor_.set_option(tcp::acceptor::reuse_address(true));
  291. acceptor_.bind(endpoint);
  292. acceptor_.listen();
  293. acceptor_.async_accept(listening_->getSocket(),
  294. boost::bind(&TCPServer::handleAccept, this,
  295. listening_, placeholders::error));
  296. }
  297. ~TCPServer() { delete listening_; }
  298. void handleAccept(TCPClient* new_client,
  299. const asio::error_code& error)
  300. {
  301. if (!error) {
  302. assert(new_client == listening_);
  303. new_client->setCallBack(custom_callback_);
  304. new_client->start();
  305. listening_ = new TCPClient(auth_server_, io_service_);
  306. acceptor_.async_accept(listening_->getSocket(),
  307. boost::bind(&TCPServer::handleAccept,
  308. this, listening_,
  309. placeholders::error));
  310. } else {
  311. delete new_client;
  312. }
  313. }
  314. // Currently this is for tests only
  315. void setCallBack(const IOService::IOCallBack* callback) {
  316. custom_callback_ = callback;
  317. }
  318. private:
  319. AuthSrv* auth_server_;
  320. io_service& io_service_;
  321. tcp::acceptor acceptor_;
  322. TCPClient* listening_;
  323. // currently, for testing purpose only.
  324. const IOService::IOCallBack* custom_callback_;
  325. };
  326. class UDPServer {
  327. public:
  328. UDPServer(AuthSrv* auth_server, io_service& io_service,
  329. int af, uint16_t port) :
  330. auth_server_(auth_server),
  331. io_service_(io_service),
  332. socket_(io_service, af == AF_INET6 ? udp::v6() : udp::v4()),
  333. io_socket_(socket_),
  334. response_buffer_(0),
  335. response_renderer_(response_buffer_),
  336. dns_message_(Message::PARSE),
  337. custom_callback_(NULL)
  338. {
  339. // Set v6-only (we use a different instantiation for v4,
  340. // otherwise asio will bind to both v4 and v6
  341. if (af == AF_INET6) {
  342. socket_.set_option(asio::ip::v6_only(true));
  343. socket_.bind(udp::endpoint(udp::v6(), port));
  344. } else {
  345. socket_.bind(udp::endpoint(udp::v4(), port));
  346. }
  347. startReceive();
  348. }
  349. UDPServer(AuthSrv* auth_server, io_service& io_service,
  350. asio::ip::address addr, uint16_t port) :
  351. auth_server_(auth_server), io_service_(io_service),
  352. socket_(io_service, addr.is_v6() ? udp::v6() : udp::v4()),
  353. io_socket_(socket_),
  354. response_buffer_(0),
  355. response_renderer_(response_buffer_),
  356. dns_message_(Message::PARSE),
  357. custom_callback_(NULL)
  358. {
  359. socket_.bind(udp::endpoint(addr, port));
  360. startReceive();
  361. }
  362. void handleRequest(const asio::error_code& error,
  363. size_t bytes_recvd)
  364. {
  365. // Check for queued configuration commands
  366. if (auth_server_ != NULL &&
  367. auth_server_->configSession()->hasQueuedMsgs()) {
  368. auth_server_->configSession()->checkCommand();
  369. }
  370. if (!error && bytes_recvd > 0) {
  371. const UDPEndpoint remote_endpoint(sender_endpoint_);
  372. const IOMessage io_message(data_, bytes_recvd, io_socket_,
  373. remote_endpoint);
  374. // currently, for testing purpose only
  375. if (custom_callback_ != NULL) {
  376. (*custom_callback_)(io_message);
  377. startReceive();
  378. return;
  379. }
  380. dns_message_.clear(Message::PARSE);
  381. response_renderer_.clear();
  382. if (auth_server_->processMessage(io_message, dns_message_,
  383. response_renderer_)) {
  384. socket_.async_send_to(
  385. asio::buffer(response_buffer_.getData(),
  386. response_buffer_.getLength()),
  387. sender_endpoint_,
  388. boost::bind(&UDPServer::sendCompleted,
  389. this,
  390. placeholders::error,
  391. placeholders::bytes_transferred));
  392. } else {
  393. startReceive();
  394. }
  395. } else {
  396. startReceive();
  397. }
  398. }
  399. void sendCompleted(const asio::error_code& error UNUSED_PARAM,
  400. size_t bytes_sent UNUSED_PARAM)
  401. {
  402. // Even if error occurred there's nothing to do. Simply handle
  403. // the next request.
  404. startReceive();
  405. }
  406. // Currently this is for tests only
  407. void setCallBack(const IOService::IOCallBack* callback) {
  408. custom_callback_ = callback;
  409. }
  410. private:
  411. void startReceive() {
  412. socket_.async_receive_from(
  413. asio::buffer(data_, MAX_LENGTH), sender_endpoint_,
  414. boost::bind(&UDPServer::handleRequest, this,
  415. placeholders::error,
  416. placeholders::bytes_transferred));
  417. }
  418. private:
  419. AuthSrv* auth_server_;
  420. io_service& io_service_;
  421. udp::socket socket_;
  422. UDPSocket io_socket_;
  423. OutputBuffer response_buffer_;
  424. MessageRenderer response_renderer_;
  425. Message dns_message_;
  426. udp::endpoint sender_endpoint_;
  427. enum { MAX_LENGTH = 4096 };
  428. char data_[MAX_LENGTH];
  429. // currently, for testing purpose only.
  430. const IOService::IOCallBack* custom_callback_;
  431. };
  432. class IOServiceImpl {
  433. public:
  434. IOServiceImpl(AuthSrv* auth_server, const char* address, const char* port,
  435. const bool use_ipv4, const bool use_ipv6);
  436. asio::io_service io_service_;
  437. AuthSrv* auth_server_;
  438. typedef boost::shared_ptr<UDPServer> UDPServerPtr;
  439. typedef boost::shared_ptr<TCPServer> TCPServerPtr;
  440. UDPServerPtr udp4_server_;
  441. UDPServerPtr udp6_server_;
  442. TCPServerPtr tcp4_server_;
  443. TCPServerPtr tcp6_server_;
  444. // This member is used only for testing at the moment.
  445. IOService::IOCallBack callback_;
  446. };
  447. IOServiceImpl::IOServiceImpl(AuthSrv* auth_server, const char* const address,
  448. const char* const port, const bool use_ipv4,
  449. const bool use_ipv6) :
  450. auth_server_(auth_server),
  451. udp4_server_(UDPServerPtr()), udp6_server_(UDPServerPtr()),
  452. tcp4_server_(TCPServerPtr()), tcp6_server_(TCPServerPtr())
  453. {
  454. uint16_t portnum;
  455. try {
  456. portnum = boost::lexical_cast<uint16_t>(port);
  457. } catch (const boost::bad_lexical_cast& ex) {
  458. isc_throw(IOError, "[b10-auth] Invalid port number '" << port << "'");
  459. }
  460. if (address != NULL) {
  461. asio::ip::address addr = asio::ip::address::from_string(address);
  462. if (addr.is_v6() && !use_ipv6) {
  463. isc_throw(FatalError,
  464. "[b10-auth] Error: -4 conflicts with " << addr);
  465. }
  466. if (addr.is_v4() && !use_ipv4) {
  467. isc_throw(FatalError,
  468. "[b10-auth] Error: -6 conflicts with " << addr);
  469. }
  470. if (addr.is_v4()) {
  471. udp4_server_ = UDPServerPtr(new UDPServer(auth_server, io_service_,
  472. addr, portnum));
  473. tcp4_server_ = TCPServerPtr(new TCPServer(auth_server, io_service_,
  474. addr, portnum));
  475. } else {
  476. udp6_server_ = UDPServerPtr(new UDPServer(auth_server, io_service_,
  477. addr, portnum));
  478. tcp6_server_ = TCPServerPtr(new TCPServer(auth_server, io_service_,
  479. addr, portnum));
  480. }
  481. } else {
  482. if (use_ipv4) {
  483. udp4_server_ = UDPServerPtr(new UDPServer(auth_server, io_service_,
  484. AF_INET, portnum));
  485. tcp4_server_ = TCPServerPtr(new TCPServer(auth_server, io_service_,
  486. AF_INET, portnum));
  487. }
  488. if (use_ipv6) {
  489. udp6_server_ = UDPServerPtr(new UDPServer(auth_server, io_service_,
  490. AF_INET6, portnum));
  491. tcp6_server_ = TCPServerPtr(new TCPServer(auth_server, io_service_,
  492. AF_INET6, portnum));
  493. }
  494. }
  495. }
  496. IOService::IOService(AuthSrv* auth_server, const char* const address,
  497. const char* const port, const bool use_ipv4,
  498. const bool use_ipv6) {
  499. impl_ = new IOServiceImpl(auth_server, address, port, use_ipv4, use_ipv6);
  500. }
  501. IOService::~IOService() {
  502. delete impl_;
  503. }
  504. void
  505. IOService::run() {
  506. impl_->io_service_.run();
  507. }
  508. void
  509. IOService::stop() {
  510. impl_->io_service_.stop();
  511. }
  512. asio::io_service&
  513. IOService::get_io_service() {
  514. return impl_->io_service_;
  515. }
  516. void
  517. IOService::setCallBack(const IOCallBack callback) {
  518. impl_->callback_ = callback;
  519. if (impl_->udp4_server_ != NULL) {
  520. impl_->udp4_server_->setCallBack(&impl_->callback_);
  521. }
  522. if (impl_->udp6_server_ != NULL) {
  523. impl_->udp6_server_->setCallBack(&impl_->callback_);
  524. }
  525. if (impl_->tcp4_server_ != NULL) {
  526. impl_->tcp4_server_->setCallBack(&impl_->callback_);
  527. }
  528. if (impl_->tcp6_server_ != NULL) {
  529. impl_->tcp6_server_->setCallBack(&impl_->callback_);
  530. }
  531. }
  532. }