reactive_descriptor_service.hpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668
  1. //
  2. // reactive_descriptor_service.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2010 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef ASIO_DETAIL_REACTIVE_DESCRIPTOR_SERVICE_HPP
  11. #define ASIO_DETAIL_REACTIVE_DESCRIPTOR_SERVICE_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include "asio/detail/push_options.hpp"
  16. #include "asio/buffer.hpp"
  17. #include "asio/error.hpp"
  18. #include "asio/io_service.hpp"
  19. #include "asio/detail/bind_handler.hpp"
  20. #include "asio/detail/buffer_sequence_adapter.hpp"
  21. #include "asio/detail/descriptor_ops.hpp"
  22. #include "asio/detail/fenced_block.hpp"
  23. #include "asio/detail/noncopyable.hpp"
  24. #include "asio/detail/null_buffers_op.hpp"
  25. #include "asio/detail/reactor.hpp"
  26. #include "asio/detail/reactor_op.hpp"
  27. #if !defined(BOOST_WINDOWS) && !defined(__CYGWIN__)
  28. namespace asio {
  29. namespace detail {
  30. class reactive_descriptor_service
  31. {
  32. public:
  33. // The native type of a descriptor.
  34. typedef int native_type;
  35. // The implementation type of the descriptor.
  36. class implementation_type
  37. : private asio::detail::noncopyable
  38. {
  39. public:
  40. // Default constructor.
  41. implementation_type()
  42. : descriptor_(-1),
  43. flags_(0)
  44. {
  45. }
  46. private:
  47. // Only this service will have access to the internal values.
  48. friend class reactive_descriptor_service;
  49. // The native descriptor representation.
  50. int descriptor_;
  51. enum
  52. {
  53. // The user wants a non-blocking descriptor.
  54. user_set_non_blocking = 1,
  55. // The descriptor has been set non-blocking.
  56. internal_non_blocking = 2,
  57. // Helper "flag" used to determine whether the descriptor is non-blocking.
  58. non_blocking = user_set_non_blocking | internal_non_blocking
  59. };
  60. // Flags indicating the current state of the descriptor.
  61. unsigned char flags_;
  62. // Per-descriptor data used by the reactor.
  63. reactor::per_descriptor_data reactor_data_;
  64. };
  65. // The maximum number of buffers to support in a single operation.
  66. enum { max_buffers = 64 < max_iov_len ? 64 : max_iov_len };
  67. // Constructor.
  68. reactive_descriptor_service(asio::io_service& io_service)
  69. : io_service_impl_(asio::use_service<io_service_impl>(io_service)),
  70. reactor_(asio::use_service<reactor>(io_service))
  71. {
  72. reactor_.init_task();
  73. }
  74. // Destroy all user-defined handler objects owned by the service.
  75. void shutdown_service()
  76. {
  77. }
  78. // Construct a new descriptor implementation.
  79. void construct(implementation_type& impl)
  80. {
  81. impl.descriptor_ = -1;
  82. impl.flags_ = 0;
  83. }
  84. // Destroy a descriptor implementation.
  85. void destroy(implementation_type& impl)
  86. {
  87. if (impl.descriptor_ != -1)
  88. {
  89. reactor_.close_descriptor(impl.descriptor_, impl.reactor_data_);
  90. if (impl.flags_ & implementation_type::internal_non_blocking)
  91. {
  92. ioctl_arg_type non_blocking = 0;
  93. asio::error_code ignored_ec;
  94. descriptor_ops::ioctl(impl.descriptor_,
  95. FIONBIO, &non_blocking, ignored_ec);
  96. impl.flags_ &= ~implementation_type::internal_non_blocking;
  97. }
  98. asio::error_code ignored_ec;
  99. descriptor_ops::close(impl.descriptor_, ignored_ec);
  100. impl.descriptor_ = -1;
  101. }
  102. }
  103. // Assign a native descriptor to a descriptor implementation.
  104. asio::error_code assign(implementation_type& impl,
  105. const native_type& native_descriptor, asio::error_code& ec)
  106. {
  107. if (is_open(impl))
  108. {
  109. ec = asio::error::already_open;
  110. return ec;
  111. }
  112. if (int err = reactor_.register_descriptor(
  113. native_descriptor, impl.reactor_data_))
  114. {
  115. ec = asio::error_code(err,
  116. asio::error::get_system_category());
  117. return ec;
  118. }
  119. impl.descriptor_ = native_descriptor;
  120. impl.flags_ = 0;
  121. ec = asio::error_code();
  122. return ec;
  123. }
  124. // Determine whether the descriptor is open.
  125. bool is_open(const implementation_type& impl) const
  126. {
  127. return impl.descriptor_ != -1;
  128. }
  129. // Destroy a descriptor implementation.
  130. asio::error_code close(implementation_type& impl,
  131. asio::error_code& ec)
  132. {
  133. if (is_open(impl))
  134. {
  135. reactor_.close_descriptor(impl.descriptor_, impl.reactor_data_);
  136. if (impl.flags_ & implementation_type::internal_non_blocking)
  137. {
  138. ioctl_arg_type non_blocking = 0;
  139. asio::error_code ignored_ec;
  140. descriptor_ops::ioctl(impl.descriptor_,
  141. FIONBIO, &non_blocking, ignored_ec);
  142. impl.flags_ &= ~implementation_type::internal_non_blocking;
  143. }
  144. if (descriptor_ops::close(impl.descriptor_, ec) == -1)
  145. return ec;
  146. impl.descriptor_ = -1;
  147. }
  148. ec = asio::error_code();
  149. return ec;
  150. }
  151. // Get the native descriptor representation.
  152. native_type native(const implementation_type& impl) const
  153. {
  154. return impl.descriptor_;
  155. }
  156. // Cancel all operations associated with the descriptor.
  157. asio::error_code cancel(implementation_type& impl,
  158. asio::error_code& ec)
  159. {
  160. if (!is_open(impl))
  161. {
  162. ec = asio::error::bad_descriptor;
  163. return ec;
  164. }
  165. reactor_.cancel_ops(impl.descriptor_, impl.reactor_data_);
  166. ec = asio::error_code();
  167. return ec;
  168. }
  169. // Perform an IO control command on the descriptor.
  170. template <typename IO_Control_Command>
  171. asio::error_code io_control(implementation_type& impl,
  172. IO_Control_Command& command, asio::error_code& ec)
  173. {
  174. if (!is_open(impl))
  175. {
  176. ec = asio::error::bad_descriptor;
  177. return ec;
  178. }
  179. descriptor_ops::ioctl(impl.descriptor_, command.name(),
  180. static_cast<ioctl_arg_type*>(command.data()), ec);
  181. // When updating the non-blocking mode we always perform the ioctl syscall,
  182. // even if the flags would otherwise indicate that the descriptor is
  183. // already in the correct state. This ensures that the underlying
  184. // descriptor is put into the state that has been requested by the user. If
  185. // the ioctl syscall was successful then we need to update the flags to
  186. // match.
  187. if (!ec && command.name() == static_cast<int>(FIONBIO))
  188. {
  189. if (*static_cast<ioctl_arg_type*>(command.data()))
  190. {
  191. impl.flags_ |= implementation_type::user_set_non_blocking;
  192. }
  193. else
  194. {
  195. // Clearing the non-blocking mode always overrides any internally-set
  196. // non-blocking flag. Any subsequent asynchronous operations will need
  197. // to re-enable non-blocking I/O.
  198. impl.flags_ &= ~(implementation_type::user_set_non_blocking
  199. | implementation_type::internal_non_blocking);
  200. }
  201. }
  202. return ec;
  203. }
  204. // Write some data to the descriptor.
  205. template <typename ConstBufferSequence>
  206. size_t write_some(implementation_type& impl,
  207. const ConstBufferSequence& buffers, asio::error_code& ec)
  208. {
  209. if (!is_open(impl))
  210. {
  211. ec = asio::error::bad_descriptor;
  212. return 0;
  213. }
  214. buffer_sequence_adapter<asio::const_buffer,
  215. ConstBufferSequence> bufs(buffers);
  216. // A request to read_some 0 bytes on a stream is a no-op.
  217. if (bufs.all_empty())
  218. {
  219. ec = asio::error_code();
  220. return 0;
  221. }
  222. // Send the data.
  223. for (;;)
  224. {
  225. // Try to complete the operation without blocking.
  226. int bytes_sent = descriptor_ops::gather_write(
  227. impl.descriptor_, bufs.buffers(), bufs.count(), ec);
  228. // Check if operation succeeded.
  229. if (bytes_sent >= 0)
  230. return bytes_sent;
  231. // Operation failed.
  232. if ((impl.flags_ & implementation_type::user_set_non_blocking)
  233. || (ec != asio::error::would_block
  234. && ec != asio::error::try_again))
  235. return 0;
  236. // Wait for descriptor to become ready.
  237. if (descriptor_ops::poll_write(impl.descriptor_, ec) < 0)
  238. return 0;
  239. }
  240. }
  241. // Wait until data can be written without blocking.
  242. size_t write_some(implementation_type& impl,
  243. const null_buffers&, asio::error_code& ec)
  244. {
  245. if (!is_open(impl))
  246. {
  247. ec = asio::error::bad_descriptor;
  248. return 0;
  249. }
  250. // Wait for descriptor to become ready.
  251. descriptor_ops::poll_write(impl.descriptor_, ec);
  252. return 0;
  253. }
  254. template <typename ConstBufferSequence>
  255. class write_op_base : public reactor_op
  256. {
  257. public:
  258. write_op_base(int descriptor,
  259. const ConstBufferSequence& buffers, func_type complete_func)
  260. : reactor_op(&write_op_base::do_perform, complete_func),
  261. descriptor_(descriptor),
  262. buffers_(buffers)
  263. {
  264. }
  265. static bool do_perform(reactor_op* base)
  266. {
  267. write_op_base* o(static_cast<write_op_base*>(base));
  268. buffer_sequence_adapter<asio::const_buffer,
  269. ConstBufferSequence> bufs(o->buffers_);
  270. for (;;)
  271. {
  272. // Write the data.
  273. asio::error_code ec;
  274. int bytes = descriptor_ops::gather_write(
  275. o->descriptor_, bufs.buffers(), bufs.count(), ec);
  276. // Retry operation if interrupted by signal.
  277. if (ec == asio::error::interrupted)
  278. continue;
  279. // Check if we need to run the operation again.
  280. if (ec == asio::error::would_block
  281. || ec == asio::error::try_again)
  282. return false;
  283. o->ec_ = ec;
  284. o->bytes_transferred_ = (bytes < 0 ? 0 : bytes);
  285. return true;
  286. }
  287. }
  288. private:
  289. int descriptor_;
  290. ConstBufferSequence buffers_;
  291. };
  292. template <typename ConstBufferSequence, typename Handler>
  293. class write_op : public write_op_base<ConstBufferSequence>
  294. {
  295. public:
  296. write_op(int descriptor,
  297. const ConstBufferSequence& buffers, Handler handler)
  298. : write_op_base<ConstBufferSequence>(
  299. descriptor, buffers, &write_op::do_complete),
  300. handler_(handler)
  301. {
  302. }
  303. static void do_complete(io_service_impl* owner, operation* base,
  304. asio::error_code /*ec*/, std::size_t /*bytes_transferred*/)
  305. {
  306. // Take ownership of the handler object.
  307. write_op* o(static_cast<write_op*>(base));
  308. typedef handler_alloc_traits<Handler, write_op> alloc_traits;
  309. handler_ptr<alloc_traits> ptr(o->handler_, o);
  310. // Make the upcall if required.
  311. if (owner)
  312. {
  313. // Make a copy of the handler so that the memory can be deallocated
  314. // before the upcall is made. Even if we're not about to make an
  315. // upcall, a sub-object of the handler may be the true owner of the
  316. // memory associated with the handler. Consequently, a local copy of
  317. // the handler is required to ensure that any owning sub-object remains
  318. // valid until after we have deallocated the memory here.
  319. detail::binder2<Handler, asio::error_code, std::size_t>
  320. handler(o->handler_, o->ec_, o->bytes_transferred_);
  321. ptr.reset();
  322. asio::detail::fenced_block b;
  323. asio_handler_invoke_helpers::invoke(handler, handler);
  324. }
  325. }
  326. private:
  327. Handler handler_;
  328. };
  329. // Start an asynchronous write. The data being sent must be valid for the
  330. // lifetime of the asynchronous operation.
  331. template <typename ConstBufferSequence, typename Handler>
  332. void async_write_some(implementation_type& impl,
  333. const ConstBufferSequence& buffers, Handler handler)
  334. {
  335. // Allocate and construct an operation to wrap the handler.
  336. typedef write_op<ConstBufferSequence, Handler> value_type;
  337. typedef handler_alloc_traits<Handler, value_type> alloc_traits;
  338. raw_handler_ptr<alloc_traits> raw_ptr(handler);
  339. handler_ptr<alloc_traits> ptr(raw_ptr, impl.descriptor_, buffers, handler);
  340. start_op(impl, reactor::write_op, ptr.get(), true,
  341. buffer_sequence_adapter<asio::const_buffer,
  342. ConstBufferSequence>::all_empty(buffers));
  343. ptr.release();
  344. }
  345. // Start an asynchronous wait until data can be written without blocking.
  346. template <typename Handler>
  347. void async_write_some(implementation_type& impl,
  348. const null_buffers&, Handler handler)
  349. {
  350. // Allocate and construct an operation to wrap the handler.
  351. typedef null_buffers_op<Handler> value_type;
  352. typedef handler_alloc_traits<Handler, value_type> alloc_traits;
  353. raw_handler_ptr<alloc_traits> raw_ptr(handler);
  354. handler_ptr<alloc_traits> ptr(raw_ptr, handler);
  355. start_op(impl, reactor::write_op, ptr.get(), false, false);
  356. ptr.release();
  357. }
  358. // Read some data from the stream. Returns the number of bytes read.
  359. template <typename MutableBufferSequence>
  360. size_t read_some(implementation_type& impl,
  361. const MutableBufferSequence& buffers, asio::error_code& ec)
  362. {
  363. if (!is_open(impl))
  364. {
  365. ec = asio::error::bad_descriptor;
  366. return 0;
  367. }
  368. buffer_sequence_adapter<asio::mutable_buffer,
  369. MutableBufferSequence> bufs(buffers);
  370. // A request to read_some 0 bytes on a stream is a no-op.
  371. if (bufs.all_empty())
  372. {
  373. ec = asio::error_code();
  374. return 0;
  375. }
  376. // Read some data.
  377. for (;;)
  378. {
  379. // Try to complete the operation without blocking.
  380. int bytes_read = descriptor_ops::scatter_read(
  381. impl.descriptor_, bufs.buffers(), bufs.count(), ec);
  382. // Check if operation succeeded.
  383. if (bytes_read > 0)
  384. return bytes_read;
  385. // Check for EOF.
  386. if (bytes_read == 0)
  387. {
  388. ec = asio::error::eof;
  389. return 0;
  390. }
  391. // Operation failed.
  392. if ((impl.flags_ & implementation_type::user_set_non_blocking)
  393. || (ec != asio::error::would_block
  394. && ec != asio::error::try_again))
  395. return 0;
  396. // Wait for descriptor to become ready.
  397. if (descriptor_ops::poll_read(impl.descriptor_, ec) < 0)
  398. return 0;
  399. }
  400. }
  401. // Wait until data can be read without blocking.
  402. size_t read_some(implementation_type& impl,
  403. const null_buffers&, asio::error_code& ec)
  404. {
  405. if (!is_open(impl))
  406. {
  407. ec = asio::error::bad_descriptor;
  408. return 0;
  409. }
  410. // Wait for descriptor to become ready.
  411. descriptor_ops::poll_read(impl.descriptor_, ec);
  412. return 0;
  413. }
  414. template <typename MutableBufferSequence>
  415. class read_op_base : public reactor_op
  416. {
  417. public:
  418. read_op_base(int descriptor,
  419. const MutableBufferSequence& buffers, func_type complete_func)
  420. : reactor_op(&read_op_base::do_perform, complete_func),
  421. descriptor_(descriptor),
  422. buffers_(buffers)
  423. {
  424. }
  425. static bool do_perform(reactor_op* base)
  426. {
  427. read_op_base* o(static_cast<read_op_base*>(base));
  428. buffer_sequence_adapter<asio::mutable_buffer,
  429. MutableBufferSequence> bufs(o->buffers_);
  430. for (;;)
  431. {
  432. // Read some data.
  433. asio::error_code ec;
  434. int bytes = descriptor_ops::scatter_read(
  435. o->descriptor_, bufs.buffers(), bufs.count(), ec);
  436. if (bytes == 0)
  437. ec = asio::error::eof;
  438. // Retry operation if interrupted by signal.
  439. if (ec == asio::error::interrupted)
  440. continue;
  441. // Check if we need to run the operation again.
  442. if (ec == asio::error::would_block
  443. || ec == asio::error::try_again)
  444. return false;
  445. o->ec_ = ec;
  446. o->bytes_transferred_ = (bytes < 0 ? 0 : bytes);
  447. return true;
  448. }
  449. }
  450. private:
  451. int descriptor_;
  452. MutableBufferSequence buffers_;
  453. };
  454. template <typename MutableBufferSequence, typename Handler>
  455. class read_op : public read_op_base<MutableBufferSequence>
  456. {
  457. public:
  458. read_op(int descriptor,
  459. const MutableBufferSequence& buffers, Handler handler)
  460. : read_op_base<MutableBufferSequence>(
  461. descriptor, buffers, &read_op::do_complete),
  462. handler_(handler)
  463. {
  464. }
  465. static void do_complete(io_service_impl* owner, operation* base,
  466. asio::error_code /*ec*/, std::size_t /*bytes_transferred*/)
  467. {
  468. // Take ownership of the handler object.
  469. read_op* o(static_cast<read_op*>(base));
  470. typedef handler_alloc_traits<Handler, read_op> alloc_traits;
  471. handler_ptr<alloc_traits> ptr(o->handler_, o);
  472. // Make the upcall if required.
  473. if (owner)
  474. {
  475. // Make a copy of the handler so that the memory can be deallocated
  476. // before the upcall is made. Even if we're not about to make an
  477. // upcall, a sub-object of the handler may be the true owner of the
  478. // memory associated with the handler. Consequently, a local copy of
  479. // the handler is required to ensure that any owning sub-object remains
  480. // valid until after we have deallocated the memory here.
  481. detail::binder2<Handler, asio::error_code, std::size_t>
  482. handler(o->handler_, o->ec_, o->bytes_transferred_);
  483. ptr.reset();
  484. asio::detail::fenced_block b;
  485. asio_handler_invoke_helpers::invoke(handler, handler);
  486. }
  487. }
  488. private:
  489. Handler handler_;
  490. };
  491. // Start an asynchronous read. The buffer for the data being read must be
  492. // valid for the lifetime of the asynchronous operation.
  493. template <typename MutableBufferSequence, typename Handler>
  494. void async_read_some(implementation_type& impl,
  495. const MutableBufferSequence& buffers, Handler handler)
  496. {
  497. // Allocate and construct an operation to wrap the handler.
  498. typedef read_op<MutableBufferSequence, Handler> value_type;
  499. typedef handler_alloc_traits<Handler, value_type> alloc_traits;
  500. raw_handler_ptr<alloc_traits> raw_ptr(handler);
  501. handler_ptr<alloc_traits> ptr(raw_ptr,
  502. impl.descriptor_, buffers, handler);
  503. start_op(impl, reactor::read_op, ptr.get(), true,
  504. buffer_sequence_adapter<asio::mutable_buffer,
  505. MutableBufferSequence>::all_empty(buffers));
  506. ptr.release();
  507. }
  508. // Wait until data can be read without blocking.
  509. template <typename Handler>
  510. void async_read_some(implementation_type& impl,
  511. const null_buffers&, Handler handler)
  512. {
  513. // Allocate and construct an operation to wrap the handler.
  514. typedef null_buffers_op<Handler> value_type;
  515. typedef handler_alloc_traits<Handler, value_type> alloc_traits;
  516. raw_handler_ptr<alloc_traits> raw_ptr(handler);
  517. handler_ptr<alloc_traits> ptr(raw_ptr, handler);
  518. start_op(impl, reactor::read_op, ptr.get(), false, false);
  519. ptr.release();
  520. }
  521. private:
  522. // Start the asynchronous operation.
  523. void start_op(implementation_type& impl, int op_type,
  524. reactor_op* op, bool non_blocking, bool noop)
  525. {
  526. if (!noop)
  527. {
  528. if (is_open(impl))
  529. {
  530. if (is_non_blocking(impl) || set_non_blocking(impl, op->ec_))
  531. {
  532. reactor_.start_op(op_type, impl.descriptor_,
  533. impl.reactor_data_, op, non_blocking);
  534. return;
  535. }
  536. }
  537. else
  538. op->ec_ = asio::error::bad_descriptor;
  539. }
  540. io_service_impl_.post_immediate_completion(op);
  541. }
  542. // Determine whether the descriptor has been set non-blocking.
  543. bool is_non_blocking(implementation_type& impl) const
  544. {
  545. return (impl.flags_ & implementation_type::non_blocking);
  546. }
  547. // Set the internal non-blocking flag.
  548. bool set_non_blocking(implementation_type& impl,
  549. asio::error_code& ec)
  550. {
  551. ioctl_arg_type non_blocking = 1;
  552. if (descriptor_ops::ioctl(impl.descriptor_, FIONBIO, &non_blocking, ec))
  553. return false;
  554. impl.flags_ |= implementation_type::internal_non_blocking;
  555. return true;
  556. }
  557. // The io_service implementation used to post completions.
  558. io_service_impl& io_service_impl_;
  559. // The selector that performs event demultiplexing for the service.
  560. reactor& reactor_;
  561. };
  562. } // namespace detail
  563. } // namespace asio
  564. #endif // !defined(BOOST_WINDOWS) && !defined(__CYGWIN__)
  565. #include "asio/detail/pop_options.hpp"
  566. #endif // ASIO_DETAIL_REACTIVE_DESCRIPTOR_SERVICE_HPP