reactive_socket_service.hpp 53 KB


  1. //
  2. // reactive_socket_service.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP
  11. #define BOOST_ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/push_options.hpp>
  16. #include <boost/asio/detail/push_options.hpp>
  17. #include <boost/shared_ptr.hpp>
  18. #include <boost/asio/detail/pop_options.hpp>
  19. #include <boost/asio/buffer.hpp>
  20. #include <boost/asio/error.hpp>
  21. #include <boost/asio/io_service.hpp>
  22. #include <boost/asio/socket_base.hpp>
  23. #include <boost/asio/detail/bind_handler.hpp>
  24. #include <boost/asio/detail/handler_base_from_member.hpp>
  25. #include <boost/asio/detail/noncopyable.hpp>
  26. #include <boost/asio/detail/service_base.hpp>
  27. #include <boost/asio/detail/socket_holder.hpp>
  28. #include <boost/asio/detail/socket_ops.hpp>
  29. #include <boost/asio/detail/socket_types.hpp>
  30. namespace boost {
  31. namespace asio {
  32. namespace detail {
  33. template <typename Protocol, typename Reactor>
  34. class reactive_socket_service
  35. : public boost::asio::detail::service_base<
  36. reactive_socket_service<Protocol, Reactor> >
  37. {
  38. public:
  39. // The protocol type.
  40. typedef Protocol protocol_type;
  41. // The endpoint type.
  42. typedef typename Protocol::endpoint endpoint_type;
  43. // The native type of a socket.
  44. typedef socket_type native_type;
  45. // The implementation type of the socket.
  46. class implementation_type
  47. : private boost::asio::detail::noncopyable
  48. {
  49. public:
  50. // Default constructor.
  51. implementation_type()
  52. : socket_(invalid_socket),
  53. flags_(0),
  54. protocol_(endpoint_type().protocol())
  55. {
  56. }
  57. private:
  58. // Only this service will have access to the internal values.
  59. friend class reactive_socket_service<Protocol, Reactor>;
  60. // The native socket representation.
  61. socket_type socket_;
  62. enum
  63. {
  64. // The user wants a non-blocking socket.
  65. user_set_non_blocking = 1,
  66. // The implementation wants a non-blocking socket (in order to be able to
  67. // perform asynchronous read and write operations).
  68. internal_non_blocking = 2,
  69. // Helper "flag" used to determine whether the socket is non-blocking.
  70. non_blocking = user_set_non_blocking | internal_non_blocking,
  71. // User wants connection_aborted errors, which are disabled by default.
  72. enable_connection_aborted = 4,
  73. // The user set the linger option. Needs to be checked when closing.
  74. user_set_linger = 8
  75. };
  76. // Flags indicating the current state of the socket.
  77. unsigned char flags_;
  78. // The protocol associated with the socket.
  79. protocol_type protocol_;
  80. // Per-descriptor data used by the reactor.
  81. typename Reactor::per_descriptor_data reactor_data_;
  82. };
  83. // The maximum number of buffers to support in a single operation.
  84. enum { max_buffers = 64 < max_iov_len ? 64 : max_iov_len };
  85. // Constructor.
  86. reactive_socket_service(boost::asio::io_service& io_service)
  87. : boost::asio::detail::service_base<
  88. reactive_socket_service<Protocol, Reactor> >(io_service),
  89. reactor_(boost::asio::use_service<Reactor>(io_service))
  90. {
  91. reactor_.init_task();
  92. }
  93. // Destroy all user-defined handler objects owned by the service.
  94. void shutdown_service()
  95. {
  96. }
  97. // Construct a new socket implementation.
  98. void construct(implementation_type& impl)
  99. {
  100. impl.socket_ = invalid_socket;
  101. impl.flags_ = 0;
  102. }
  103. // Destroy a socket implementation.
  104. void destroy(implementation_type& impl)
  105. {
  106. if (impl.socket_ != invalid_socket)
  107. {
  108. reactor_.close_descriptor(impl.socket_, impl.reactor_data_);
  109. if (impl.flags_ & implementation_type::non_blocking)
  110. {
  111. ioctl_arg_type non_blocking = 0;
  112. boost::system::error_code ignored_ec;
  113. socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec);
  114. impl.flags_ &= ~implementation_type::non_blocking;
  115. }
  116. if (impl.flags_ & implementation_type::user_set_linger)
  117. {
  118. ::linger opt;
  119. opt.l_onoff = 0;
  120. opt.l_linger = 0;
  121. boost::system::error_code ignored_ec;
  122. socket_ops::setsockopt(impl.socket_,
  123. SOL_SOCKET, SO_LINGER, &opt, sizeof(opt), ignored_ec);
  124. }
  125. boost::system::error_code ignored_ec;
  126. socket_ops::close(impl.socket_, ignored_ec);
  127. impl.socket_ = invalid_socket;
  128. }
  129. }
  130. // Open a new socket implementation.
  131. boost::system::error_code open(implementation_type& impl,
  132. const protocol_type& protocol, boost::system::error_code& ec)
  133. {
  134. if (is_open(impl))
  135. {
  136. ec = boost::asio::error::already_open;
  137. return ec;
  138. }
  139. socket_holder sock(socket_ops::socket(protocol.family(),
  140. protocol.type(), protocol.protocol(), ec));
  141. if (sock.get() == invalid_socket)
  142. return ec;
  143. if (int err = reactor_.register_descriptor(sock.get(), impl.reactor_data_))
  144. {
  145. ec = boost::system::error_code(err,
  146. boost::asio::error::get_system_category());
  147. return ec;
  148. }
  149. impl.socket_ = sock.release();
  150. impl.flags_ = 0;
  151. impl.protocol_ = protocol;
  152. ec = boost::system::error_code();
  153. return ec;
  154. }
  155. // Assign a native socket to a socket implementation.
  156. boost::system::error_code assign(implementation_type& impl,
  157. const protocol_type& protocol, const native_type& native_socket,
  158. boost::system::error_code& ec)
  159. {
  160. if (is_open(impl))
  161. {
  162. ec = boost::asio::error::already_open;
  163. return ec;
  164. }
  165. if (int err = reactor_.register_descriptor(
  166. native_socket, impl.reactor_data_))
  167. {
  168. ec = boost::system::error_code(err,
  169. boost::asio::error::get_system_category());
  170. return ec;
  171. }
  172. impl.socket_ = native_socket;
  173. impl.flags_ = 0;
  174. impl.protocol_ = protocol;
  175. ec = boost::system::error_code();
  176. return ec;
  177. }
  178. // Determine whether the socket is open.
  179. bool is_open(const implementation_type& impl) const
  180. {
  181. return impl.socket_ != invalid_socket;
  182. }
  183. // Destroy a socket implementation.
  184. boost::system::error_code close(implementation_type& impl,
  185. boost::system::error_code& ec)
  186. {
  187. if (is_open(impl))
  188. {
  189. reactor_.close_descriptor(impl.socket_, impl.reactor_data_);
  190. if (impl.flags_ & implementation_type::non_blocking)
  191. {
  192. ioctl_arg_type non_blocking = 0;
  193. boost::system::error_code ignored_ec;
  194. socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec);
  195. impl.flags_ &= ~implementation_type::non_blocking;
  196. }
  197. if (socket_ops::close(impl.socket_, ec) == socket_error_retval)
  198. return ec;
  199. impl.socket_ = invalid_socket;
  200. }
  201. ec = boost::system::error_code();
  202. return ec;
  203. }
  204. // Get the native socket representation.
  205. native_type native(implementation_type& impl)
  206. {
  207. return impl.socket_;
  208. }
  209. // Cancel all operations associated with the socket.
  210. boost::system::error_code cancel(implementation_type& impl,
  211. boost::system::error_code& ec)
  212. {
  213. if (!is_open(impl))
  214. {
  215. ec = boost::asio::error::bad_descriptor;
  216. return ec;
  217. }
  218. reactor_.cancel_ops(impl.socket_, impl.reactor_data_);
  219. ec = boost::system::error_code();
  220. return ec;
  221. }
  222. // Determine whether the socket is at the out-of-band data mark.
  223. bool at_mark(const implementation_type& impl,
  224. boost::system::error_code& ec) const
  225. {
  226. if (!is_open(impl))
  227. {
  228. ec = boost::asio::error::bad_descriptor;
  229. return false;
  230. }
  231. boost::asio::detail::ioctl_arg_type value = 0;
  232. socket_ops::ioctl(impl.socket_, SIOCATMARK, &value, ec);
  233. #if defined(ENOTTY)
  234. if (ec.value() == ENOTTY)
  235. ec = boost::asio::error::not_socket;
  236. #endif // defined(ENOTTY)
  237. return ec ? false : value != 0;
  238. }
  239. // Determine the number of bytes available for reading.
  240. std::size_t available(const implementation_type& impl,
  241. boost::system::error_code& ec) const
  242. {
  243. if (!is_open(impl))
  244. {
  245. ec = boost::asio::error::bad_descriptor;
  246. return 0;
  247. }
  248. boost::asio::detail::ioctl_arg_type value = 0;
  249. socket_ops::ioctl(impl.socket_, FIONREAD, &value, ec);
  250. #if defined(ENOTTY)
  251. if (ec.value() == ENOTTY)
  252. ec = boost::asio::error::not_socket;
  253. #endif // defined(ENOTTY)
  254. return ec ? static_cast<std::size_t>(0) : static_cast<std::size_t>(value);
  255. }
  256. // Bind the socket to the specified local endpoint.
  257. boost::system::error_code bind(implementation_type& impl,
  258. const endpoint_type& endpoint, boost::system::error_code& ec)
  259. {
  260. if (!is_open(impl))
  261. {
  262. ec = boost::asio::error::bad_descriptor;
  263. return ec;
  264. }
  265. socket_ops::bind(impl.socket_, endpoint.data(), endpoint.size(), ec);
  266. return ec;
  267. }
  268. // Place the socket into the state where it will listen for new connections.
  269. boost::system::error_code listen(implementation_type& impl, int backlog,
  270. boost::system::error_code& ec)
  271. {
  272. if (!is_open(impl))
  273. {
  274. ec = boost::asio::error::bad_descriptor;
  275. return ec;
  276. }
  277. socket_ops::listen(impl.socket_, backlog, ec);
  278. return ec;
  279. }
  280. // Set a socket option.
  281. template <typename Option>
  282. boost::system::error_code set_option(implementation_type& impl,
  283. const Option& option, boost::system::error_code& ec)
  284. {
  285. if (!is_open(impl))
  286. {
  287. ec = boost::asio::error::bad_descriptor;
  288. return ec;
  289. }
  290. if (option.level(impl.protocol_) == custom_socket_option_level
  291. && option.name(impl.protocol_) == enable_connection_aborted_option)
  292. {
  293. if (option.size(impl.protocol_) != sizeof(int))
  294. {
  295. ec = boost::asio::error::invalid_argument;
  296. }
  297. else
  298. {
  299. if (*reinterpret_cast<const int*>(option.data(impl.protocol_)))
  300. impl.flags_ |= implementation_type::enable_connection_aborted;
  301. else
  302. impl.flags_ &= ~implementation_type::enable_connection_aborted;
  303. ec = boost::system::error_code();
  304. }
  305. return ec;
  306. }
  307. else
  308. {
  309. if (option.level(impl.protocol_) == SOL_SOCKET
  310. && option.name(impl.protocol_) == SO_LINGER)
  311. {
  312. impl.flags_ |= implementation_type::user_set_linger;
  313. }
  314. socket_ops::setsockopt(impl.socket_,
  315. option.level(impl.protocol_), option.name(impl.protocol_),
  316. option.data(impl.protocol_), option.size(impl.protocol_), ec);
  317. #if defined(__MACH__) && defined(__APPLE__) \
  318. || defined(__NetBSD__) || defined(__FreeBSD__) || defined(__OpenBSD__)
  319. // To implement portable behaviour for SO_REUSEADDR with UDP sockets we
  320. // need to also set SO_REUSEPORT on BSD-based platforms.
  321. if (!ec && impl.protocol_.type() == SOCK_DGRAM
  322. && option.level(impl.protocol_) == SOL_SOCKET
  323. && option.name(impl.protocol_) == SO_REUSEADDR)
  324. {
  325. boost::system::error_code ignored_ec;
  326. socket_ops::setsockopt(impl.socket_, SOL_SOCKET, SO_REUSEPORT,
  327. option.data(impl.protocol_), option.size(impl.protocol_),
  328. ignored_ec);
  329. }
  330. #endif
  331. return ec;
  332. }
  333. }
  334. // Set a socket option.
  335. template <typename Option>
  336. boost::system::error_code get_option(const implementation_type& impl,
  337. Option& option, boost::system::error_code& ec) const
  338. {
  339. if (!is_open(impl))
  340. {
  341. ec = boost::asio::error::bad_descriptor;
  342. return ec;
  343. }
  344. if (option.level(impl.protocol_) == custom_socket_option_level
  345. && option.name(impl.protocol_) == enable_connection_aborted_option)
  346. {
  347. if (option.size(impl.protocol_) != sizeof(int))
  348. {
  349. ec = boost::asio::error::invalid_argument;
  350. }
  351. else
  352. {
  353. int* target = reinterpret_cast<int*>(option.data(impl.protocol_));
  354. if (impl.flags_ & implementation_type::enable_connection_aborted)
  355. *target = 1;
  356. else
  357. *target = 0;
  358. option.resize(impl.protocol_, sizeof(int));
  359. ec = boost::system::error_code();
  360. }
  361. return ec;
  362. }
  363. else
  364. {
  365. size_t size = option.size(impl.protocol_);
  366. socket_ops::getsockopt(impl.socket_,
  367. option.level(impl.protocol_), option.name(impl.protocol_),
  368. option.data(impl.protocol_), &size, ec);
  369. if (!ec)
  370. option.resize(impl.protocol_, size);
  371. return ec;
  372. }
  373. }
  374. // Perform an IO control command on the socket.
  375. template <typename IO_Control_Command>
  376. boost::system::error_code io_control(implementation_type& impl,
  377. IO_Control_Command& command, boost::system::error_code& ec)
  378. {
  379. if (!is_open(impl))
  380. {
  381. ec = boost::asio::error::bad_descriptor;
  382. return ec;
  383. }
  384. if (command.name() == static_cast<int>(FIONBIO))
  385. {
  386. // Flags are manipulated in a temporary variable so that the socket
  387. // implementation is not updated unless the ioctl operation succeeds.
  388. unsigned char new_flags = impl.flags_;
  389. if (*static_cast<ioctl_arg_type*>(command.data()))
  390. new_flags |= implementation_type::user_set_non_blocking;
  391. else
  392. new_flags &= ~implementation_type::user_set_non_blocking;
  393. // Perform ioctl on socket if the non-blocking state has changed.
  394. if (!(impl.flags_ & implementation_type::non_blocking)
  395. && (new_flags & implementation_type::non_blocking))
  396. {
  397. ioctl_arg_type non_blocking = 1;
  398. socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec);
  399. }
  400. else if ((impl.flags_ & implementation_type::non_blocking)
  401. && !(new_flags & implementation_type::non_blocking))
  402. {
  403. ioctl_arg_type non_blocking = 0;
  404. socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec);
  405. }
  406. else
  407. {
  408. ec = boost::system::error_code();
  409. }
  410. // Update socket implementation's flags only if successful.
  411. if (!ec)
  412. impl.flags_ = new_flags;
  413. }
  414. else
  415. {
  416. socket_ops::ioctl(impl.socket_, command.name(),
  417. static_cast<ioctl_arg_type*>(command.data()), ec);
  418. }
  419. return ec;
  420. }
  421. // Get the local endpoint.
  422. endpoint_type local_endpoint(const implementation_type& impl,
  423. boost::system::error_code& ec) const
  424. {
  425. if (!is_open(impl))
  426. {
  427. ec = boost::asio::error::bad_descriptor;
  428. return endpoint_type();
  429. }
  430. endpoint_type endpoint;
  431. std::size_t addr_len = endpoint.capacity();
  432. if (socket_ops::getsockname(impl.socket_, endpoint.data(), &addr_len, ec))
  433. return endpoint_type();
  434. endpoint.resize(addr_len);
  435. return endpoint;
  436. }
  437. // Get the remote endpoint.
  438. endpoint_type remote_endpoint(const implementation_type& impl,
  439. boost::system::error_code& ec) const
  440. {
  441. if (!is_open(impl))
  442. {
  443. ec = boost::asio::error::bad_descriptor;
  444. return endpoint_type();
  445. }
  446. endpoint_type endpoint;
  447. std::size_t addr_len = endpoint.capacity();
  448. if (socket_ops::getpeername(impl.socket_, endpoint.data(), &addr_len, ec))
  449. return endpoint_type();
  450. endpoint.resize(addr_len);
  451. return endpoint;
  452. }
  453. /// Disable sends or receives on the socket.
  454. boost::system::error_code shutdown(implementation_type& impl,
  455. socket_base::shutdown_type what, boost::system::error_code& ec)
  456. {
  457. if (!is_open(impl))
  458. {
  459. ec = boost::asio::error::bad_descriptor;
  460. return ec;
  461. }
  462. socket_ops::shutdown(impl.socket_, what, ec);
  463. return ec;
  464. }
  465. // Send the given data to the peer.
  466. template <typename ConstBufferSequence>
  467. size_t send(implementation_type& impl, const ConstBufferSequence& buffers,
  468. socket_base::message_flags flags, boost::system::error_code& ec)
  469. {
  470. if (!is_open(impl))
  471. {
  472. ec = boost::asio::error::bad_descriptor;
  473. return 0;
  474. }
  475. // Copy buffers into array.
  476. socket_ops::buf bufs[max_buffers];
  477. typename ConstBufferSequence::const_iterator iter = buffers.begin();
  478. typename ConstBufferSequence::const_iterator end = buffers.end();
  479. size_t i = 0;
  480. size_t total_buffer_size = 0;
  481. for (; iter != end && i < max_buffers; ++iter, ++i)
  482. {
  483. boost::asio::const_buffer buffer(*iter);
  484. socket_ops::init_buf(bufs[i],
  485. boost::asio::buffer_cast<const void*>(buffer),
  486. boost::asio::buffer_size(buffer));
  487. total_buffer_size += boost::asio::buffer_size(buffer);
  488. }
  489. // A request to receive 0 bytes on a stream socket is a no-op.
  490. if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
  491. {
  492. ec = boost::system::error_code();
  493. return 0;
  494. }
  495. // Send the data.
  496. for (;;)
  497. {
  498. // Try to complete the operation without blocking.
  499. int bytes_sent = socket_ops::send(impl.socket_, bufs, i, flags, ec);
  500. // Check if operation succeeded.
  501. if (bytes_sent >= 0)
  502. return bytes_sent;
  503. // Operation failed.
  504. if ((impl.flags_ & implementation_type::user_set_non_blocking)
  505. || (ec != boost::asio::error::would_block
  506. && ec != boost::asio::error::try_again))
  507. return 0;
  508. // Wait for socket to become ready.
  509. if (socket_ops::poll_write(impl.socket_, ec) < 0)
  510. return 0;
  511. }
  512. }
  513. // Wait until data can be sent without blocking.
  514. size_t send(implementation_type& impl, const null_buffers&,
  515. socket_base::message_flags, boost::system::error_code& ec)
  516. {
  517. if (!is_open(impl))
  518. {
  519. ec = boost::asio::error::bad_descriptor;
  520. return 0;
  521. }
  522. // Wait for socket to become ready.
  523. socket_ops::poll_write(impl.socket_, ec);
  524. return 0;
  525. }
  526. template <typename ConstBufferSequence, typename Handler>
  527. class send_operation :
  528. public handler_base_from_member<Handler>
  529. {
  530. public:
  531. send_operation(socket_type socket, boost::asio::io_service& io_service,
  532. const ConstBufferSequence& buffers, socket_base::message_flags flags,
  533. Handler handler)
  534. : handler_base_from_member<Handler>(handler),
  535. socket_(socket),
  536. io_service_(io_service),
  537. work_(io_service),
  538. buffers_(buffers),
  539. flags_(flags)
  540. {
  541. }
  542. bool perform(boost::system::error_code& ec,
  543. std::size_t& bytes_transferred)
  544. {
  545. // Check whether the operation was successful.
  546. if (ec)
  547. {
  548. bytes_transferred = 0;
  549. return true;
  550. }
  551. // Copy buffers into array.
  552. socket_ops::buf bufs[max_buffers];
  553. typename ConstBufferSequence::const_iterator iter = buffers_.begin();
  554. typename ConstBufferSequence::const_iterator end = buffers_.end();
  555. size_t i = 0;
  556. for (; iter != end && i < max_buffers; ++iter, ++i)
  557. {
  558. boost::asio::const_buffer buffer(*iter);
  559. socket_ops::init_buf(bufs[i],
  560. boost::asio::buffer_cast<const void*>(buffer),
  561. boost::asio::buffer_size(buffer));
  562. }
  563. // Send the data.
  564. int bytes = socket_ops::send(socket_, bufs, i, flags_, ec);
  565. // Check if we need to run the operation again.
  566. if (ec == boost::asio::error::would_block
  567. || ec == boost::asio::error::try_again)
  568. return false;
  569. bytes_transferred = (bytes < 0 ? 0 : bytes);
  570. return true;
  571. }
  572. void complete(const boost::system::error_code& ec,
  573. std::size_t bytes_transferred)
  574. {
  575. io_service_.post(bind_handler(this->handler_, ec, bytes_transferred));
  576. }
  577. private:
  578. socket_type socket_;
  579. boost::asio::io_service& io_service_;
  580. boost::asio::io_service::work work_;
  581. ConstBufferSequence buffers_;
  582. socket_base::message_flags flags_;
  583. };
  584. // Start an asynchronous send. The data being sent must be valid for the
  585. // lifetime of the asynchronous operation.
  586. template <typename ConstBufferSequence, typename Handler>
  587. void async_send(implementation_type& impl, const ConstBufferSequence& buffers,
  588. socket_base::message_flags flags, Handler handler)
  589. {
  590. if (!is_open(impl))
  591. {
  592. this->get_io_service().post(bind_handler(handler,
  593. boost::asio::error::bad_descriptor, 0));
  594. }
  595. else
  596. {
  597. if (impl.protocol_.type() == SOCK_STREAM)
  598. {
  599. // Determine total size of buffers.
  600. typename ConstBufferSequence::const_iterator iter = buffers.begin();
  601. typename ConstBufferSequence::const_iterator end = buffers.end();
  602. size_t i = 0;
  603. size_t total_buffer_size = 0;
  604. for (; iter != end && i < max_buffers; ++iter, ++i)
  605. {
  606. boost::asio::const_buffer buffer(*iter);
  607. total_buffer_size += boost::asio::buffer_size(buffer);
  608. }
  609. // A request to receive 0 bytes on a stream socket is a no-op.
  610. if (total_buffer_size == 0)
  611. {
  612. this->get_io_service().post(bind_handler(handler,
  613. boost::system::error_code(), 0));
  614. return;
  615. }
  616. }
  617. // Make socket non-blocking.
  618. if (!(impl.flags_ & implementation_type::internal_non_blocking))
  619. {
  620. if (!(impl.flags_ & implementation_type::non_blocking))
  621. {
  622. ioctl_arg_type non_blocking = 1;
  623. boost::system::error_code ec;
  624. if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
  625. {
  626. this->get_io_service().post(bind_handler(handler, ec, 0));
  627. return;
  628. }
  629. }
  630. impl.flags_ |= implementation_type::internal_non_blocking;
  631. }
  632. reactor_.start_write_op(impl.socket_, impl.reactor_data_,
  633. send_operation<ConstBufferSequence, Handler>(
  634. impl.socket_, this->get_io_service(), buffers, flags, handler));
  635. }
  636. }
  637. template <typename Handler>
  638. class null_buffers_operation :
  639. public handler_base_from_member<Handler>
  640. {
  641. public:
  642. null_buffers_operation(boost::asio::io_service& io_service, Handler handler)
  643. : handler_base_from_member<Handler>(handler),
  644. work_(io_service)
  645. {
  646. }
  647. bool perform(boost::system::error_code&,
  648. std::size_t& bytes_transferred)
  649. {
  650. bytes_transferred = 0;
  651. return true;
  652. }
  653. void complete(const boost::system::error_code& ec,
  654. std::size_t bytes_transferred)
  655. {
  656. work_.get_io_service().post(bind_handler(
  657. this->handler_, ec, bytes_transferred));
  658. }
  659. private:
  660. boost::asio::io_service::work work_;
  661. };
  662. // Start an asynchronous wait until data can be sent without blocking.
  663. template <typename Handler>
  664. void async_send(implementation_type& impl, const null_buffers&,
  665. socket_base::message_flags, Handler handler)
  666. {
  667. if (!is_open(impl))
  668. {
  669. this->get_io_service().post(bind_handler(handler,
  670. boost::asio::error::bad_descriptor, 0));
  671. }
  672. else
  673. {
  674. reactor_.start_write_op(impl.socket_, impl.reactor_data_,
  675. null_buffers_operation<Handler>(this->get_io_service(), handler),
  676. false);
  677. }
  678. }
  679. // Send a datagram to the specified endpoint. Returns the number of bytes
  680. // sent.
  681. template <typename ConstBufferSequence>
  682. size_t send_to(implementation_type& impl, const ConstBufferSequence& buffers,
  683. const endpoint_type& destination, socket_base::message_flags flags,
  684. boost::system::error_code& ec)
  685. {
  686. if (!is_open(impl))
  687. {
  688. ec = boost::asio::error::bad_descriptor;
  689. return 0;
  690. }
  691. // Copy buffers into array.
  692. socket_ops::buf bufs[max_buffers];
  693. typename ConstBufferSequence::const_iterator iter = buffers.begin();
  694. typename ConstBufferSequence::const_iterator end = buffers.end();
  695. size_t i = 0;
  696. for (; iter != end && i < max_buffers; ++iter, ++i)
  697. {
  698. boost::asio::const_buffer buffer(*iter);
  699. socket_ops::init_buf(bufs[i],
  700. boost::asio::buffer_cast<const void*>(buffer),
  701. boost::asio::buffer_size(buffer));
  702. }
  703. // Send the data.
  704. for (;;)
  705. {
  706. // Try to complete the operation without blocking.
  707. int bytes_sent = socket_ops::sendto(impl.socket_, bufs, i, flags,
  708. destination.data(), destination.size(), ec);
  709. // Check if operation succeeded.
  710. if (bytes_sent >= 0)
  711. return bytes_sent;
  712. // Operation failed.
  713. if ((impl.flags_ & implementation_type::user_set_non_blocking)
  714. || (ec != boost::asio::error::would_block
  715. && ec != boost::asio::error::try_again))
  716. return 0;
  717. // Wait for socket to become ready.
  718. if (socket_ops::poll_write(impl.socket_, ec) < 0)
  719. return 0;
  720. }
  721. }
  722. // Wait until data can be sent without blocking.
  723. size_t send_to(implementation_type& impl, const null_buffers&,
  724. socket_base::message_flags, const endpoint_type&,
  725. boost::system::error_code& ec)
  726. {
  727. if (!is_open(impl))
  728. {
  729. ec = boost::asio::error::bad_descriptor;
  730. return 0;
  731. }
  732. // Wait for socket to become ready.
  733. socket_ops::poll_write(impl.socket_, ec);
  734. return 0;
  735. }
  736. template <typename ConstBufferSequence, typename Handler>
  737. class send_to_operation :
  738. public handler_base_from_member<Handler>
  739. {
  740. public:
  741. send_to_operation(socket_type socket, boost::asio::io_service& io_service,
  742. const ConstBufferSequence& buffers, const endpoint_type& endpoint,
  743. socket_base::message_flags flags, Handler handler)
  744. : handler_base_from_member<Handler>(handler),
  745. socket_(socket),
  746. io_service_(io_service),
  747. work_(io_service),
  748. buffers_(buffers),
  749. destination_(endpoint),
  750. flags_(flags)
  751. {
  752. }
  753. bool perform(boost::system::error_code& ec,
  754. std::size_t& bytes_transferred)
  755. {
  756. // Check whether the operation was successful.
  757. if (ec)
  758. {
  759. bytes_transferred = 0;
  760. return true;
  761. }
  762. // Copy buffers into array.
  763. socket_ops::buf bufs[max_buffers];
  764. typename ConstBufferSequence::const_iterator iter = buffers_.begin();
  765. typename ConstBufferSequence::const_iterator end = buffers_.end();
  766. size_t i = 0;
  767. for (; iter != end && i < max_buffers; ++iter, ++i)
  768. {
  769. boost::asio::const_buffer buffer(*iter);
  770. socket_ops::init_buf(bufs[i],
  771. boost::asio::buffer_cast<const void*>(buffer),
  772. boost::asio::buffer_size(buffer));
  773. }
  774. // Send the data.
  775. int bytes = socket_ops::sendto(socket_, bufs, i, flags_,
  776. destination_.data(), destination_.size(), ec);
  777. // Check if we need to run the operation again.
  778. if (ec == boost::asio::error::would_block
  779. || ec == boost::asio::error::try_again)
  780. return false;
  781. bytes_transferred = (bytes < 0 ? 0 : bytes);
  782. return true;
  783. }
  784. void complete(const boost::system::error_code& ec,
  785. std::size_t bytes_transferred)
  786. {
  787. io_service_.post(bind_handler(this->handler_, ec, bytes_transferred));
  788. }
  789. private:
  790. socket_type socket_;
  791. boost::asio::io_service& io_service_;
  792. boost::asio::io_service::work work_;
  793. ConstBufferSequence buffers_;
  794. endpoint_type destination_;
  795. socket_base::message_flags flags_;
  796. };
  797. // Start an asynchronous send. The data being sent must be valid for the
  798. // lifetime of the asynchronous operation.
  799. template <typename ConstBufferSequence, typename Handler>
  800. void async_send_to(implementation_type& impl,
  801. const ConstBufferSequence& buffers,
  802. const endpoint_type& destination, socket_base::message_flags flags,
  803. Handler handler)
  804. {
  805. if (!is_open(impl))
  806. {
  807. this->get_io_service().post(bind_handler(handler,
  808. boost::asio::error::bad_descriptor, 0));
  809. }
  810. else
  811. {
  812. // Make socket non-blocking.
  813. if (!(impl.flags_ & implementation_type::internal_non_blocking))
  814. {
  815. if (!(impl.flags_ & implementation_type::non_blocking))
  816. {
  817. ioctl_arg_type non_blocking = 1;
  818. boost::system::error_code ec;
  819. if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
  820. {
  821. this->get_io_service().post(bind_handler(handler, ec, 0));
  822. return;
  823. }
  824. }
  825. impl.flags_ |= implementation_type::internal_non_blocking;
  826. }
  827. reactor_.start_write_op(impl.socket_, impl.reactor_data_,
  828. send_to_operation<ConstBufferSequence, Handler>(
  829. impl.socket_, this->get_io_service(), buffers,
  830. destination, flags, handler));
  831. }
  832. }
  833. // Start an asynchronous wait until data can be sent without blocking.
  834. template <typename Handler>
  835. void async_send_to(implementation_type& impl, const null_buffers&,
  836. socket_base::message_flags, const endpoint_type&, Handler handler)
  837. {
  838. if (!is_open(impl))
  839. {
  840. this->get_io_service().post(bind_handler(handler,
  841. boost::asio::error::bad_descriptor, 0));
  842. }
  843. else
  844. {
  845. reactor_.start_write_op(impl.socket_, impl.reactor_data_,
  846. null_buffers_operation<Handler>(this->get_io_service(), handler),
  847. false);
  848. }
  849. }
  850. // Receive some data from the peer. Returns the number of bytes received.
  851. template <typename MutableBufferSequence>
  852. size_t receive(implementation_type& impl,
  853. const MutableBufferSequence& buffers,
  854. socket_base::message_flags flags, boost::system::error_code& ec)
  855. {
  856. if (!is_open(impl))
  857. {
  858. ec = boost::asio::error::bad_descriptor;
  859. return 0;
  860. }
  861. // Copy buffers into array.
  862. socket_ops::buf bufs[max_buffers];
  863. typename MutableBufferSequence::const_iterator iter = buffers.begin();
  864. typename MutableBufferSequence::const_iterator end = buffers.end();
  865. size_t i = 0;
  866. size_t total_buffer_size = 0;
  867. for (; iter != end && i < max_buffers; ++iter, ++i)
  868. {
  869. boost::asio::mutable_buffer buffer(*iter);
  870. socket_ops::init_buf(bufs[i],
  871. boost::asio::buffer_cast<void*>(buffer),
  872. boost::asio::buffer_size(buffer));
  873. total_buffer_size += boost::asio::buffer_size(buffer);
  874. }
  875. // A request to receive 0 bytes on a stream socket is a no-op.
  876. if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
  877. {
  878. ec = boost::system::error_code();
  879. return 0;
  880. }
  881. // Receive some data.
  882. for (;;)
  883. {
  884. // Try to complete the operation without blocking.
  885. int bytes_recvd = socket_ops::recv(impl.socket_, bufs, i, flags, ec);
  886. // Check if operation succeeded.
  887. if (bytes_recvd > 0)
  888. return bytes_recvd;
  889. // Check for EOF.
  890. if (bytes_recvd == 0 && impl.protocol_.type() == SOCK_STREAM)
  891. {
  892. ec = boost::asio::error::eof;
  893. return 0;
  894. }
  895. // Operation failed.
  896. if ((impl.flags_ & implementation_type::user_set_non_blocking)
  897. || (ec != boost::asio::error::would_block
  898. && ec != boost::asio::error::try_again))
  899. return 0;
  900. // Wait for socket to become ready.
  901. if (socket_ops::poll_read(impl.socket_, ec) < 0)
  902. return 0;
  903. }
  904. }
  905. // Wait until data can be received without blocking.
  906. size_t receive(implementation_type& impl, const null_buffers&,
  907. socket_base::message_flags, boost::system::error_code& ec)
  908. {
  909. if (!is_open(impl))
  910. {
  911. ec = boost::asio::error::bad_descriptor;
  912. return 0;
  913. }
  914. // Wait for socket to become ready.
  915. socket_ops::poll_read(impl.socket_, ec);
  916. return 0;
  917. }
  918. template <typename MutableBufferSequence, typename Handler>
  919. class receive_operation :
  920. public handler_base_from_member<Handler>
  921. {
  922. public:
  923. receive_operation(socket_type socket, int protocol_type,
  924. boost::asio::io_service& io_service,
  925. const MutableBufferSequence& buffers,
  926. socket_base::message_flags flags, Handler handler)
  927. : handler_base_from_member<Handler>(handler),
  928. socket_(socket),
  929. protocol_type_(protocol_type),
  930. io_service_(io_service),
  931. work_(io_service),
  932. buffers_(buffers),
  933. flags_(flags)
  934. {
  935. }
  936. bool perform(boost::system::error_code& ec,
  937. std::size_t& bytes_transferred)
  938. {
  939. // Check whether the operation was successful.
  940. if (ec)
  941. {
  942. bytes_transferred = 0;
  943. return true;
  944. }
  945. // Copy buffers into array.
  946. socket_ops::buf bufs[max_buffers];
  947. typename MutableBufferSequence::const_iterator iter = buffers_.begin();
  948. typename MutableBufferSequence::const_iterator end = buffers_.end();
  949. size_t i = 0;
  950. for (; iter != end && i < max_buffers; ++iter, ++i)
  951. {
  952. boost::asio::mutable_buffer buffer(*iter);
  953. socket_ops::init_buf(bufs[i],
  954. boost::asio::buffer_cast<void*>(buffer),
  955. boost::asio::buffer_size(buffer));
  956. }
  957. // Receive some data.
  958. int bytes = socket_ops::recv(socket_, bufs, i, flags_, ec);
  959. if (bytes == 0 && protocol_type_ == SOCK_STREAM)
  960. ec = boost::asio::error::eof;
  961. // Check if we need to run the operation again.
  962. if (ec == boost::asio::error::would_block
  963. || ec == boost::asio::error::try_again)
  964. return false;
  965. bytes_transferred = (bytes < 0 ? 0 : bytes);
  966. return true;
  967. }
  968. void complete(const boost::system::error_code& ec,
  969. std::size_t bytes_transferred)
  970. {
  971. io_service_.post(bind_handler(this->handler_, ec, bytes_transferred));
  972. }
  973. private:
  974. socket_type socket_;
  975. int protocol_type_;
  976. boost::asio::io_service& io_service_;
  977. boost::asio::io_service::work work_;
  978. MutableBufferSequence buffers_;
  979. socket_base::message_flags flags_;
  980. };
  981. // Start an asynchronous receive. The buffer for the data being received
  982. // must be valid for the lifetime of the asynchronous operation.
  983. template <typename MutableBufferSequence, typename Handler>
  984. void async_receive(implementation_type& impl,
  985. const MutableBufferSequence& buffers,
  986. socket_base::message_flags flags, Handler handler)
  987. {
  988. if (!is_open(impl))
  989. {
  990. this->get_io_service().post(bind_handler(handler,
  991. boost::asio::error::bad_descriptor, 0));
  992. }
  993. else
  994. {
  995. if (impl.protocol_.type() == SOCK_STREAM)
  996. {
  997. // Determine total size of buffers.
  998. typename MutableBufferSequence::const_iterator iter = buffers.begin();
  999. typename MutableBufferSequence::const_iterator end = buffers.end();
  1000. size_t i = 0;
  1001. size_t total_buffer_size = 0;
  1002. for (; iter != end && i < max_buffers; ++iter, ++i)
  1003. {
  1004. boost::asio::mutable_buffer buffer(*iter);
  1005. total_buffer_size += boost::asio::buffer_size(buffer);
  1006. }
  1007. // A request to receive 0 bytes on a stream socket is a no-op.
  1008. if (total_buffer_size == 0)
  1009. {
  1010. this->get_io_service().post(bind_handler(handler,
  1011. boost::system::error_code(), 0));
  1012. return;
  1013. }
  1014. }
  1015. // Make socket non-blocking.
  1016. if (!(impl.flags_ & implementation_type::internal_non_blocking))
  1017. {
  1018. if (!(impl.flags_ & implementation_type::non_blocking))
  1019. {
  1020. ioctl_arg_type non_blocking = 1;
  1021. boost::system::error_code ec;
  1022. if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
  1023. {
  1024. this->get_io_service().post(bind_handler(handler, ec, 0));
  1025. return;
  1026. }
  1027. }
  1028. impl.flags_ |= implementation_type::internal_non_blocking;
  1029. }
  1030. if (flags & socket_base::message_out_of_band)
  1031. {
  1032. reactor_.start_except_op(impl.socket_, impl.reactor_data_,
  1033. receive_operation<MutableBufferSequence, Handler>(
  1034. impl.socket_, impl.protocol_.type(),
  1035. this->get_io_service(), buffers, flags, handler));
  1036. }
  1037. else
  1038. {
  1039. reactor_.start_read_op(impl.socket_, impl.reactor_data_,
  1040. receive_operation<MutableBufferSequence, Handler>(
  1041. impl.socket_, impl.protocol_.type(),
  1042. this->get_io_service(), buffers, flags, handler));
  1043. }
  1044. }
  1045. }
  1046. // Wait until data can be received without blocking.
  1047. template <typename Handler>
  1048. void async_receive(implementation_type& impl, const null_buffers&,
  1049. socket_base::message_flags flags, Handler handler)
  1050. {
  1051. if (!is_open(impl))
  1052. {
  1053. this->get_io_service().post(bind_handler(handler,
  1054. boost::asio::error::bad_descriptor, 0));
  1055. }
  1056. else if (flags & socket_base::message_out_of_band)
  1057. {
  1058. reactor_.start_except_op(impl.socket_, impl.reactor_data_,
  1059. null_buffers_operation<Handler>(this->get_io_service(), handler));
  1060. }
  1061. else
  1062. {
  1063. reactor_.start_read_op(impl.socket_, impl.reactor_data_,
  1064. null_buffers_operation<Handler>(this->get_io_service(), handler),
  1065. false);
  1066. }
  1067. }
  1068. // Receive a datagram with the endpoint of the sender. Returns the number of
  1069. // bytes received.
  1070. template <typename MutableBufferSequence>
  1071. size_t receive_from(implementation_type& impl,
  1072. const MutableBufferSequence& buffers,
  1073. endpoint_type& sender_endpoint, socket_base::message_flags flags,
  1074. boost::system::error_code& ec)
  1075. {
  1076. if (!is_open(impl))
  1077. {
  1078. ec = boost::asio::error::bad_descriptor;
  1079. return 0;
  1080. }
  1081. // Copy buffers into array.
  1082. socket_ops::buf bufs[max_buffers];
  1083. typename MutableBufferSequence::const_iterator iter = buffers.begin();
  1084. typename MutableBufferSequence::const_iterator end = buffers.end();
  1085. size_t i = 0;
  1086. for (; iter != end && i < max_buffers; ++iter, ++i)
  1087. {
  1088. boost::asio::mutable_buffer buffer(*iter);
  1089. socket_ops::init_buf(bufs[i],
  1090. boost::asio::buffer_cast<void*>(buffer),
  1091. boost::asio::buffer_size(buffer));
  1092. }
  1093. // Receive some data.
  1094. for (;;)
  1095. {
  1096. // Try to complete the operation without blocking.
  1097. std::size_t addr_len = sender_endpoint.capacity();
  1098. int bytes_recvd = socket_ops::recvfrom(impl.socket_, bufs, i, flags,
  1099. sender_endpoint.data(), &addr_len, ec);
  1100. // Check if operation succeeded.
  1101. if (bytes_recvd > 0)
  1102. {
  1103. sender_endpoint.resize(addr_len);
  1104. return bytes_recvd;
  1105. }
  1106. // Check for EOF.
  1107. if (bytes_recvd == 0 && impl.protocol_.type() == SOCK_STREAM)
  1108. {
  1109. ec = boost::asio::error::eof;
  1110. return 0;
  1111. }
  1112. // Operation failed.
  1113. if ((impl.flags_ & implementation_type::user_set_non_blocking)
  1114. || (ec != boost::asio::error::would_block
  1115. && ec != boost::asio::error::try_again))
  1116. return 0;
  1117. // Wait for socket to become ready.
  1118. if (socket_ops::poll_read(impl.socket_, ec) < 0)
  1119. return 0;
  1120. }
  1121. }
  1122. // Wait until data can be received without blocking.
  1123. size_t receive_from(implementation_type& impl, const null_buffers&,
  1124. endpoint_type& sender_endpoint, socket_base::message_flags,
  1125. boost::system::error_code& ec)
  1126. {
  1127. if (!is_open(impl))
  1128. {
  1129. ec = boost::asio::error::bad_descriptor;
  1130. return 0;
  1131. }
  1132. // Wait for socket to become ready.
  1133. socket_ops::poll_read(impl.socket_, ec);
  1134. // Reset endpoint since it can be given no sensible value at this time.
  1135. sender_endpoint = endpoint_type();
  1136. return 0;
  1137. }
  1138. template <typename MutableBufferSequence, typename Handler>
  1139. class receive_from_operation :
  1140. public handler_base_from_member<Handler>
  1141. {
  1142. public:
  1143. receive_from_operation(socket_type socket, int protocol_type,
  1144. boost::asio::io_service& io_service,
  1145. const MutableBufferSequence& buffers, endpoint_type& endpoint,
  1146. socket_base::message_flags flags, Handler handler)
  1147. : handler_base_from_member<Handler>(handler),
  1148. socket_(socket),
  1149. protocol_type_(protocol_type),
  1150. io_service_(io_service),
  1151. work_(io_service),
  1152. buffers_(buffers),
  1153. sender_endpoint_(endpoint),
  1154. flags_(flags)
  1155. {
  1156. }
  1157. bool perform(boost::system::error_code& ec,
  1158. std::size_t& bytes_transferred)
  1159. {
  1160. // Check whether the operation was successful.
  1161. if (ec)
  1162. {
  1163. bytes_transferred = 0;
  1164. return true;
  1165. }
  1166. // Copy buffers into array.
  1167. socket_ops::buf bufs[max_buffers];
  1168. typename MutableBufferSequence::const_iterator iter = buffers_.begin();
  1169. typename MutableBufferSequence::const_iterator end = buffers_.end();
  1170. size_t i = 0;
  1171. for (; iter != end && i < max_buffers; ++iter, ++i)
  1172. {
  1173. boost::asio::mutable_buffer buffer(*iter);
  1174. socket_ops::init_buf(bufs[i],
  1175. boost::asio::buffer_cast<void*>(buffer),
  1176. boost::asio::buffer_size(buffer));
  1177. }
  1178. // Receive some data.
  1179. std::size_t addr_len = sender_endpoint_.capacity();
  1180. int bytes = socket_ops::recvfrom(socket_, bufs, i, flags_,
  1181. sender_endpoint_.data(), &addr_len, ec);
  1182. if (bytes == 0 && protocol_type_ == SOCK_STREAM)
  1183. ec = boost::asio::error::eof;
  1184. // Check if we need to run the operation again.
  1185. if (ec == boost::asio::error::would_block
  1186. || ec == boost::asio::error::try_again)
  1187. return false;
  1188. sender_endpoint_.resize(addr_len);
  1189. bytes_transferred = (bytes < 0 ? 0 : bytes);
  1190. return true;
  1191. }
  1192. void complete(const boost::system::error_code& ec,
  1193. std::size_t bytes_transferred)
  1194. {
  1195. io_service_.post(bind_handler(this->handler_, ec, bytes_transferred));
  1196. }
  1197. private:
  1198. socket_type socket_;
  1199. int protocol_type_;
  1200. boost::asio::io_service& io_service_;
  1201. boost::asio::io_service::work work_;
  1202. MutableBufferSequence buffers_;
  1203. endpoint_type& sender_endpoint_;
  1204. socket_base::message_flags flags_;
  1205. };
  1206. // Start an asynchronous receive. The buffer for the data being received and
  1207. // the sender_endpoint object must both be valid for the lifetime of the
  1208. // asynchronous operation.
  1209. template <typename MutableBufferSequence, typename Handler>
  1210. void async_receive_from(implementation_type& impl,
  1211. const MutableBufferSequence& buffers, endpoint_type& sender_endpoint,
  1212. socket_base::message_flags flags, Handler handler)
  1213. {
  1214. if (!is_open(impl))
  1215. {
  1216. this->get_io_service().post(bind_handler(handler,
  1217. boost::asio::error::bad_descriptor, 0));
  1218. }
  1219. else
  1220. {
  1221. // Make socket non-blocking.
  1222. if (!(impl.flags_ & implementation_type::internal_non_blocking))
  1223. {
  1224. if (!(impl.flags_ & implementation_type::non_blocking))
  1225. {
  1226. ioctl_arg_type non_blocking = 1;
  1227. boost::system::error_code ec;
  1228. if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
  1229. {
  1230. this->get_io_service().post(bind_handler(handler, ec, 0));
  1231. return;
  1232. }
  1233. }
  1234. impl.flags_ |= implementation_type::internal_non_blocking;
  1235. }
  1236. reactor_.start_read_op(impl.socket_, impl.reactor_data_,
  1237. receive_from_operation<MutableBufferSequence, Handler>(
  1238. impl.socket_, impl.protocol_.type(), this->get_io_service(),
  1239. buffers, sender_endpoint, flags, handler));
  1240. }
  1241. }
  1242. // Wait until data can be received without blocking.
  1243. template <typename Handler>
  1244. void async_receive_from(implementation_type& impl,
  1245. const null_buffers&, endpoint_type& sender_endpoint,
  1246. socket_base::message_flags flags, Handler handler)
  1247. {
  1248. if (!is_open(impl))
  1249. {
  1250. this->get_io_service().post(bind_handler(handler,
  1251. boost::asio::error::bad_descriptor, 0));
  1252. }
  1253. else
  1254. {
  1255. // Reset endpoint since it can be given no sensible value at this time.
  1256. sender_endpoint = endpoint_type();
  1257. if (flags & socket_base::message_out_of_band)
  1258. {
  1259. reactor_.start_except_op(impl.socket_, impl.reactor_data_,
  1260. null_buffers_operation<Handler>(this->get_io_service(), handler));
  1261. }
  1262. else
  1263. {
  1264. reactor_.start_read_op(impl.socket_, impl.reactor_data_,
  1265. null_buffers_operation<Handler>(this->get_io_service(), handler),
  1266. false);
  1267. }
  1268. }
  1269. }
  1270. // Accept a new connection.
  1271. template <typename Socket>
  1272. boost::system::error_code accept(implementation_type& impl,
  1273. Socket& peer, endpoint_type* peer_endpoint, boost::system::error_code& ec)
  1274. {
  1275. if (!is_open(impl))
  1276. {
  1277. ec = boost::asio::error::bad_descriptor;
  1278. return ec;
  1279. }
  1280. // We cannot accept a socket that is already open.
  1281. if (peer.is_open())
  1282. {
  1283. ec = boost::asio::error::already_open;
  1284. return ec;
  1285. }
  1286. // Accept a socket.
  1287. for (;;)
  1288. {
  1289. // Try to complete the operation without blocking.
  1290. socket_holder new_socket;
  1291. std::size_t addr_len = 0;
  1292. if (peer_endpoint)
  1293. {
  1294. addr_len = peer_endpoint->capacity();
  1295. new_socket.reset(socket_ops::accept(impl.socket_,
  1296. peer_endpoint->data(), &addr_len, ec));
  1297. }
  1298. else
  1299. {
  1300. new_socket.reset(socket_ops::accept(impl.socket_, 0, 0, ec));
  1301. }
  1302. // Check if operation succeeded.
  1303. if (new_socket.get() >= 0)
  1304. {
  1305. if (peer_endpoint)
  1306. peer_endpoint->resize(addr_len);
  1307. peer.assign(impl.protocol_, new_socket.get(), ec);
  1308. if (!ec)
  1309. new_socket.release();
  1310. return ec;
  1311. }
  1312. // Operation failed.
  1313. if (ec == boost::asio::error::would_block
  1314. || ec == boost::asio::error::try_again)
  1315. {
  1316. if (impl.flags_ & implementation_type::user_set_non_blocking)
  1317. return ec;
  1318. // Fall through to retry operation.
  1319. }
  1320. else if (ec == boost::asio::error::connection_aborted)
  1321. {
  1322. if (impl.flags_ & implementation_type::enable_connection_aborted)
  1323. return ec;
  1324. // Fall through to retry operation.
  1325. }
  1326. #if defined(EPROTO)
  1327. else if (ec.value() == EPROTO)
  1328. {
  1329. if (impl.flags_ & implementation_type::enable_connection_aborted)
  1330. return ec;
  1331. // Fall through to retry operation.
  1332. }
  1333. #endif // defined(EPROTO)
  1334. else
  1335. return ec;
  1336. // Wait for socket to become ready.
  1337. if (socket_ops::poll_read(impl.socket_, ec) < 0)
  1338. return ec;
  1339. }
  1340. }
  1341. template <typename Socket, typename Handler>
  1342. class accept_operation :
  1343. public handler_base_from_member<Handler>
  1344. {
  1345. public:
  1346. accept_operation(socket_type socket, boost::asio::io_service& io_service,
  1347. Socket& peer, const protocol_type& protocol,
  1348. endpoint_type* peer_endpoint, bool enable_connection_aborted,
  1349. Handler handler)
  1350. : handler_base_from_member<Handler>(handler),
  1351. socket_(socket),
  1352. io_service_(io_service),
  1353. work_(io_service),
  1354. peer_(peer),
  1355. protocol_(protocol),
  1356. peer_endpoint_(peer_endpoint),
  1357. enable_connection_aborted_(enable_connection_aborted)
  1358. {
  1359. }
  1360. bool perform(boost::system::error_code& ec, std::size_t&)
  1361. {
  1362. // Check whether the operation was successful.
  1363. if (ec)
  1364. return true;
  1365. // Accept the waiting connection.
  1366. socket_holder new_socket;
  1367. std::size_t addr_len = 0;
  1368. if (peer_endpoint_)
  1369. {
  1370. addr_len = peer_endpoint_->capacity();
  1371. new_socket.reset(socket_ops::accept(socket_,
  1372. peer_endpoint_->data(), &addr_len, ec));
  1373. }
  1374. else
  1375. {
  1376. new_socket.reset(socket_ops::accept(socket_, 0, 0, ec));
  1377. }
  1378. // Check if we need to run the operation again.
  1379. if (ec == boost::asio::error::would_block
  1380. || ec == boost::asio::error::try_again)
  1381. return false;
  1382. if (ec == boost::asio::error::connection_aborted
  1383. && !enable_connection_aborted_)
  1384. return false;
  1385. #if defined(EPROTO)
  1386. if (ec.value() == EPROTO && !enable_connection_aborted_)
  1387. return false;
  1388. #endif // defined(EPROTO)
  1389. // Transfer ownership of the new socket to the peer object.
  1390. if (!ec)
  1391. {
  1392. if (peer_endpoint_)
  1393. peer_endpoint_->resize(addr_len);
  1394. peer_.assign(protocol_, new_socket.get(), ec);
  1395. if (!ec)
  1396. new_socket.release();
  1397. }
  1398. return true;
  1399. }
  1400. void complete(const boost::system::error_code& ec, std::size_t)
  1401. {
  1402. io_service_.post(bind_handler(this->handler_, ec));
  1403. }
  1404. private:
  1405. socket_type socket_;
  1406. boost::asio::io_service& io_service_;
  1407. boost::asio::io_service::work work_;
  1408. Socket& peer_;
  1409. protocol_type protocol_;
  1410. endpoint_type* peer_endpoint_;
  1411. bool enable_connection_aborted_;
  1412. };
  1413. // Start an asynchronous accept. The peer and peer_endpoint objects
  1414. // must be valid until the accept's handler is invoked.
  1415. template <typename Socket, typename Handler>
  1416. void async_accept(implementation_type& impl, Socket& peer,
  1417. endpoint_type* peer_endpoint, Handler handler)
  1418. {
  1419. if (!is_open(impl))
  1420. {
  1421. this->get_io_service().post(bind_handler(handler,
  1422. boost::asio::error::bad_descriptor));
  1423. }
  1424. else if (peer.is_open())
  1425. {
  1426. this->get_io_service().post(bind_handler(handler,
  1427. boost::asio::error::already_open));
  1428. }
  1429. else
  1430. {
  1431. // Make socket non-blocking.
  1432. if (!(impl.flags_ & implementation_type::internal_non_blocking))
  1433. {
  1434. if (!(impl.flags_ & implementation_type::non_blocking))
  1435. {
  1436. ioctl_arg_type non_blocking = 1;
  1437. boost::system::error_code ec;
  1438. if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
  1439. {
  1440. this->get_io_service().post(bind_handler(handler, ec));
  1441. return;
  1442. }
  1443. }
  1444. impl.flags_ |= implementation_type::internal_non_blocking;
  1445. }
  1446. reactor_.start_read_op(impl.socket_, impl.reactor_data_,
  1447. accept_operation<Socket, Handler>(
  1448. impl.socket_, this->get_io_service(),
  1449. peer, impl.protocol_, peer_endpoint,
  1450. (impl.flags_ & implementation_type::enable_connection_aborted) != 0,
  1451. handler));
  1452. }
  1453. }
  1454. // Connect the socket to the specified endpoint.
  1455. boost::system::error_code connect(implementation_type& impl,
  1456. const endpoint_type& peer_endpoint, boost::system::error_code& ec)
  1457. {
  1458. if (!is_open(impl))
  1459. {
  1460. ec = boost::asio::error::bad_descriptor;
  1461. return ec;
  1462. }
  1463. // Perform the connect operation.
  1464. socket_ops::connect(impl.socket_,
  1465. peer_endpoint.data(), peer_endpoint.size(), ec);
  1466. if (ec != boost::asio::error::in_progress
  1467. && ec != boost::asio::error::would_block)
  1468. {
  1469. // The connect operation finished immediately.
  1470. return ec;
  1471. }
  1472. // Wait for socket to become ready.
  1473. if (socket_ops::poll_connect(impl.socket_, ec) < 0)
  1474. return ec;
  1475. // Get the error code from the connect operation.
  1476. int connect_error = 0;
  1477. size_t connect_error_len = sizeof(connect_error);
  1478. if (socket_ops::getsockopt(impl.socket_, SOL_SOCKET, SO_ERROR,
  1479. &connect_error, &connect_error_len, ec) == socket_error_retval)
  1480. return ec;
  1481. // Return the result of the connect operation.
  1482. ec = boost::system::error_code(connect_error,
  1483. boost::asio::error::get_system_category());
  1484. return ec;
  1485. }
  1486. template <typename Handler>
  1487. class connect_operation :
  1488. public handler_base_from_member<Handler>
  1489. {
  1490. public:
  1491. connect_operation(socket_type socket,
  1492. boost::asio::io_service& io_service, Handler handler)
  1493. : handler_base_from_member<Handler>(handler),
  1494. socket_(socket),
  1495. io_service_(io_service),
  1496. work_(io_service)
  1497. {
  1498. }
  1499. bool perform(boost::system::error_code& ec, std::size_t&)
  1500. {
  1501. // Check whether the operation was successful.
  1502. if (ec)
  1503. return true;
  1504. // Get the error code from the connect operation.
  1505. int connect_error = 0;
  1506. size_t connect_error_len = sizeof(connect_error);
  1507. if (socket_ops::getsockopt(socket_, SOL_SOCKET, SO_ERROR,
  1508. &connect_error, &connect_error_len, ec) == socket_error_retval)
  1509. return true;
  1510. // The connection failed so the handler will be posted with an error code.
  1511. if (connect_error)
  1512. {
  1513. ec = boost::system::error_code(connect_error,
  1514. boost::asio::error::get_system_category());
  1515. return true;
  1516. }
  1517. return true;
  1518. }
  1519. void complete(const boost::system::error_code& ec, std::size_t)
  1520. {
  1521. io_service_.post(bind_handler(this->handler_, ec));
  1522. }
  1523. private:
  1524. socket_type socket_;
  1525. boost::asio::io_service& io_service_;
  1526. boost::asio::io_service::work work_;
  1527. };
  1528. // Start an asynchronous connect.
  1529. template <typename Handler>
  1530. void async_connect(implementation_type& impl,
  1531. const endpoint_type& peer_endpoint, Handler handler)
  1532. {
  1533. if (!is_open(impl))
  1534. {
  1535. this->get_io_service().post(bind_handler(handler,
  1536. boost::asio::error::bad_descriptor));
  1537. return;
  1538. }
  1539. // Make socket non-blocking.
  1540. if (!(impl.flags_ & implementation_type::internal_non_blocking))
  1541. {
  1542. if (!(impl.flags_ & implementation_type::non_blocking))
  1543. {
  1544. ioctl_arg_type non_blocking = 1;
  1545. boost::system::error_code ec;
  1546. if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
  1547. {
  1548. this->get_io_service().post(bind_handler(handler, ec));
  1549. return;
  1550. }
  1551. }
  1552. impl.flags_ |= implementation_type::internal_non_blocking;
  1553. }
  1554. // Start the connect operation. The socket is already marked as non-blocking
  1555. // so the connection will take place asynchronously.
  1556. boost::system::error_code ec;
  1557. if (socket_ops::connect(impl.socket_, peer_endpoint.data(),
  1558. peer_endpoint.size(), ec) == 0)
  1559. {
  1560. // The connect operation has finished successfully so we need to post the
  1561. // handler immediately.
  1562. this->get_io_service().post(bind_handler(handler,
  1563. boost::system::error_code()));
  1564. }
  1565. else if (ec == boost::asio::error::in_progress
  1566. || ec == boost::asio::error::would_block)
  1567. {
  1568. // The connection is happening in the background, and we need to wait
  1569. // until the socket becomes writeable.
  1570. reactor_.start_connect_op(impl.socket_, impl.reactor_data_,
  1571. connect_operation<Handler>(impl.socket_,
  1572. this->get_io_service(), handler));
  1573. }
  1574. else
  1575. {
  1576. // The connect operation has failed, so post the handler immediately.
  1577. this->get_io_service().post(bind_handler(handler, ec));
  1578. }
  1579. }
  1580. private:
  1581. // The selector that performs event demultiplexing for the service.
  1582. Reactor& reactor_;
  1583. };
  1584. } // namespace detail
  1585. } // namespace asio
  1586. } // namespace boost
  1587. #include <boost/asio/detail/pop_options.hpp>
  1588. #endif // BOOST_ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP