select_reactor.hpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. //
  2. // select_reactor.hpp
  3. // ~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2010 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef ASIO_DETAIL_SELECT_REACTOR_HPP
  11. #define ASIO_DETAIL_SELECT_REACTOR_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include "asio/detail/push_options.hpp"
  16. #include "asio/detail/socket_types.hpp" // Must come before posix_time.
  17. #include "asio/detail/push_options.hpp"
  18. #include <cstddef>
  19. #include <boost/config.hpp>
  20. #include "asio/detail/pop_options.hpp"
  21. #include "asio/io_service.hpp"
  22. #include "asio/detail/bind_handler.hpp"
  23. #include "asio/detail/fd_set_adapter.hpp"
  24. #include "asio/detail/mutex.hpp"
  25. #include "asio/detail/noncopyable.hpp"
  26. #include "asio/detail/op_queue.hpp"
  27. #include "asio/detail/reactor_op.hpp"
  28. #include "asio/detail/reactor_op_queue.hpp"
  29. #include "asio/detail/select_interrupter.hpp"
  30. #include "asio/detail/select_reactor_fwd.hpp"
  31. #include "asio/detail/service_base.hpp"
  32. #include "asio/detail/signal_blocker.hpp"
  33. #include "asio/detail/socket_ops.hpp"
  34. #include "asio/detail/socket_types.hpp"
  35. #include "asio/detail/thread.hpp"
  36. #include "asio/detail/timer_op.hpp"
  37. #include "asio/detail/timer_queue_base.hpp"
  38. #include "asio/detail/timer_queue_fwd.hpp"
  39. #include "asio/detail/timer_queue_set.hpp"
  40. namespace asio {
  41. namespace detail {
  42. template <bool Own_Thread>
  43. class select_reactor
  44. : public asio::detail::service_base<select_reactor<Own_Thread> >
  45. {
  46. public:
  47. #if defined(BOOST_WINDOWS) || defined(__CYGWIN__)
  48. enum { read_op = 0, write_op = 1, except_op = 2,
  49. max_select_ops = 3, connect_op = 3, max_ops = 4 };
  50. #else // defined(BOOST_WINDOWS) || defined(__CYGWIN__)
  51. enum { read_op = 0, write_op = 1, except_op = 2,
  52. max_select_ops = 3, connect_op = 1, max_ops = 3 };
  53. #endif // defined(BOOST_WINDOWS) || defined(__CYGWIN__)
  54. // Per-descriptor data.
  55. struct per_descriptor_data
  56. {
  57. };
  58. // Constructor.
  59. select_reactor(asio::io_service& io_service)
  60. : asio::detail::service_base<
  61. select_reactor<Own_Thread> >(io_service),
  62. io_service_(use_service<io_service_impl>(io_service)),
  63. mutex_(),
  64. interrupter_(),
  65. stop_thread_(false),
  66. thread_(0),
  67. shutdown_(false)
  68. {
  69. if (Own_Thread)
  70. {
  71. asio::detail::signal_blocker sb;
  72. thread_ = new asio::detail::thread(
  73. bind_handler(&select_reactor::call_run_thread, this));
  74. }
  75. }
  76. // Destructor.
  77. ~select_reactor()
  78. {
  79. shutdown_service();
  80. }
  81. // Destroy all user-defined handler objects owned by the service.
  82. void shutdown_service()
  83. {
  84. asio::detail::mutex::scoped_lock lock(mutex_);
  85. shutdown_ = true;
  86. stop_thread_ = true;
  87. lock.unlock();
  88. if (Own_Thread)
  89. {
  90. if (thread_)
  91. {
  92. interrupter_.interrupt();
  93. thread_->join();
  94. delete thread_;
  95. thread_ = 0;
  96. }
  97. }
  98. op_queue<operation> ops;
  99. for (int i = 0; i < max_ops; ++i)
  100. op_queue_[i].get_all_operations(ops);
  101. timer_queues_.get_all_timers(ops);
  102. }
  103. // Initialise the task, but only if the reactor is not in its own thread.
  104. void init_task()
  105. {
  106. io_service_.init_task();
  107. }
  108. // Register a socket with the reactor. Returns 0 on success, system error
  109. // code on failure.
  110. int register_descriptor(socket_type, per_descriptor_data&)
  111. {
  112. return 0;
  113. }
  114. // Start a new operation. The reactor operation will be performed when the
  115. // given descriptor is flagged as ready, or an error has occurred.
  116. void start_op(int op_type, socket_type descriptor,
  117. per_descriptor_data&, reactor_op* op, bool)
  118. {
  119. asio::detail::mutex::scoped_lock lock(mutex_);
  120. if (!shutdown_)
  121. {
  122. bool first = op_queue_[op_type].enqueue_operation(descriptor, op);
  123. io_service_.work_started();
  124. if (first)
  125. interrupter_.interrupt();
  126. }
  127. }
  128. // Cancel all operations associated with the given descriptor. The
  129. // handlers associated with the descriptor will be invoked with the
  130. // operation_aborted error.
  131. void cancel_ops(socket_type descriptor, per_descriptor_data&)
  132. {
  133. asio::detail::mutex::scoped_lock lock(mutex_);
  134. cancel_ops_unlocked(descriptor, asio::error::operation_aborted);
  135. }
  136. // Cancel any operations that are running against the descriptor and remove
  137. // its registration from the reactor.
  138. void close_descriptor(socket_type descriptor, per_descriptor_data&)
  139. {
  140. asio::detail::mutex::scoped_lock lock(mutex_);
  141. cancel_ops_unlocked(descriptor, asio::error::operation_aborted);
  142. }
  143. // Add a new timer queue to the reactor.
  144. template <typename Time_Traits>
  145. void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
  146. {
  147. asio::detail::mutex::scoped_lock lock(mutex_);
  148. timer_queues_.insert(&timer_queue);
  149. }
  150. // Remove a timer queue from the reactor.
  151. template <typename Time_Traits>
  152. void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
  153. {
  154. asio::detail::mutex::scoped_lock lock(mutex_);
  155. timer_queues_.erase(&timer_queue);
  156. }
  157. // Schedule a new operation in the given timer queue to expire at the
  158. // specified absolute time.
  159. template <typename Time_Traits>
  160. void schedule_timer(timer_queue<Time_Traits>& timer_queue,
  161. const typename Time_Traits::time_type& time, timer_op* op, void* token)
  162. {
  163. asio::detail::mutex::scoped_lock lock(mutex_);
  164. if (!shutdown_)
  165. {
  166. bool earliest = timer_queue.enqueue_timer(time, op, token);
  167. io_service_.work_started();
  168. if (earliest)
  169. interrupter_.interrupt();
  170. }
  171. }
  172. // Cancel the timer operations associated with the given token. Returns the
  173. // number of operations that have been posted or dispatched.
  174. template <typename Time_Traits>
  175. std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
  176. {
  177. asio::detail::mutex::scoped_lock lock(mutex_);
  178. op_queue<operation> ops;
  179. std::size_t n = timer_queue.cancel_timer(token, ops);
  180. lock.unlock();
  181. io_service_.post_deferred_completions(ops);
  182. return n;
  183. }
  184. // Run select once until interrupted or events are ready to be dispatched.
  185. void run(bool block, op_queue<operation>& ops)
  186. {
  187. asio::detail::mutex::scoped_lock lock(mutex_);
  188. // Check if the thread is supposed to stop.
  189. if (Own_Thread)
  190. if (stop_thread_)
  191. return;
  192. // Set up the descriptor sets.
  193. fd_set_adapter fds[max_select_ops];
  194. fds[read_op].set(interrupter_.read_descriptor());
  195. socket_type max_fd = 0;
  196. bool have_work_to_do = !timer_queues_.all_empty();
  197. for (int i = 0; i < max_select_ops; ++i)
  198. {
  199. have_work_to_do = have_work_to_do || !op_queue_[i].empty();
  200. op_queue_[i].get_descriptors(fds[i], ops);
  201. if (fds[i].max_descriptor() > max_fd)
  202. max_fd = fds[i].max_descriptor();
  203. }
  204. #if defined(BOOST_WINDOWS) || defined(__CYGWIN__)
  205. // Connection operations on Windows use both except and write fd_sets.
  206. have_work_to_do = have_work_to_do || !op_queue_[connect_op].empty();
  207. op_queue_[connect_op].get_descriptors(fds[write_op], ops);
  208. if (fds[write_op].max_descriptor() > max_fd)
  209. max_fd = fds[write_op].max_descriptor();
  210. op_queue_[connect_op].get_descriptors(fds[except_op], ops);
  211. if (fds[except_op].max_descriptor() > max_fd)
  212. max_fd = fds[except_op].max_descriptor();
  213. #endif // defined(BOOST_WINDOWS) || defined(__CYGWIN__)
  214. // We can return immediately if there's no work to do and the reactor is
  215. // not supposed to block.
  216. if (!block && !have_work_to_do)
  217. return;
  218. // Determine how long to block while waiting for events.
  219. timeval tv_buf = { 0, 0 };
  220. timeval* tv = block ? get_timeout(tv_buf) : &tv_buf;
  221. lock.unlock();
  222. // Block on the select call until descriptors become ready.
  223. asio::error_code ec;
  224. int retval = socket_ops::select(static_cast<int>(max_fd + 1),
  225. fds[read_op], fds[write_op], fds[except_op], tv, ec);
  226. // Reset the interrupter.
  227. if (retval > 0 && fds[read_op].is_set(interrupter_.read_descriptor()))
  228. interrupter_.reset();
  229. lock.lock();
  230. // Dispatch all ready operations.
  231. if (retval > 0)
  232. {
  233. #if defined(BOOST_WINDOWS) || defined(__CYGWIN__)
  234. // Connection operations on Windows use both except and write fd_sets.
  235. op_queue_[connect_op].perform_operations_for_descriptors(
  236. fds[except_op], ops);
  237. op_queue_[connect_op].perform_operations_for_descriptors(
  238. fds[write_op], ops);
  239. #endif // defined(BOOST_WINDOWS) || defined(__CYGWIN__)
  240. // Exception operations must be processed first to ensure that any
  241. // out-of-band data is read before normal data.
  242. for (int i = max_select_ops - 1; i >= 0; --i)
  243. op_queue_[i].perform_operations_for_descriptors(fds[i], ops);
  244. }
  245. timer_queues_.get_ready_timers(ops);
  246. }
  247. // Interrupt the select loop.
  248. void interrupt()
  249. {
  250. interrupter_.interrupt();
  251. }
  252. private:
  253. // Run the select loop in the thread.
  254. void run_thread()
  255. {
  256. if (Own_Thread)
  257. {
  258. asio::detail::mutex::scoped_lock lock(mutex_);
  259. while (!stop_thread_)
  260. {
  261. lock.unlock();
  262. op_queue<operation> ops;
  263. run(true, ops);
  264. io_service_.post_deferred_completions(ops);
  265. lock.lock();
  266. }
  267. }
  268. }
  269. // Entry point for the select loop thread.
  270. static void call_run_thread(select_reactor* reactor)
  271. {
  272. if (Own_Thread)
  273. {
  274. reactor->run_thread();
  275. }
  276. }
  277. // Get the timeout value for the select call.
  278. timeval* get_timeout(timeval& tv)
  279. {
  280. // By default we will wait no longer than 5 minutes. This will ensure that
  281. // any changes to the system clock are detected after no longer than this.
  282. long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
  283. tv.tv_sec = usec / 1000000;
  284. tv.tv_usec = usec % 1000000;
  285. return &tv;
  286. }
  287. // Cancel all operations associated with the given descriptor. This function
  288. // does not acquire the select_reactor's mutex.
  289. void cancel_ops_unlocked(socket_type descriptor,
  290. const asio::error_code& ec)
  291. {
  292. bool need_interrupt = false;
  293. op_queue<operation> ops;
  294. for (int i = 0; i < max_ops; ++i)
  295. need_interrupt = op_queue_[i].cancel_operations(
  296. descriptor, ops, ec) || need_interrupt;
  297. io_service_.post_deferred_completions(ops);
  298. if (need_interrupt)
  299. interrupter_.interrupt();
  300. }
  301. // The io_service implementation used to post completions.
  302. io_service_impl& io_service_;
  303. // Mutex to protect access to internal data.
  304. asio::detail::mutex mutex_;
  305. // The interrupter is used to break a blocking select call.
  306. select_interrupter interrupter_;
  307. // The queues of read, write and except operations.
  308. reactor_op_queue<socket_type> op_queue_[max_ops];
  309. // The timer queues.
  310. timer_queue_set timer_queues_;
  311. // Does the reactor loop thread need to stop.
  312. bool stop_thread_;
  313. // The thread that is running the reactor loop.
  314. asio::detail::thread* thread_;
  315. // Whether the service has been shut down.
  316. bool shutdown_;
  317. };
  318. } // namespace detail
  319. } // namespace asio
  320. #include "asio/detail/pop_options.hpp"
  321. #endif // ASIO_DETAIL_SELECT_REACTOR_HPP