select_reactor.hpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
  1. //
  2. // select_reactor.hpp
  3. // ~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_DETAIL_SELECT_REACTOR_HPP
  11. #define BOOST_ASIO_DETAIL_SELECT_REACTOR_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/push_options.hpp>
  16. #include <boost/asio/detail/socket_types.hpp> // Must come before posix_time.
  17. #include <boost/asio/detail/push_options.hpp>
  18. #include <cstddef>
  19. #include <boost/config.hpp>
  20. #include <boost/date_time/posix_time/posix_time_types.hpp>
  21. #include <boost/shared_ptr.hpp>
  22. #include <vector>
  23. #include <boost/asio/detail/pop_options.hpp>
  24. #include <boost/asio/io_service.hpp>
  25. #include <boost/asio/detail/bind_handler.hpp>
  26. #include <boost/asio/detail/fd_set_adapter.hpp>
  27. #include <boost/asio/detail/mutex.hpp>
  28. #include <boost/asio/detail/noncopyable.hpp>
  29. #include <boost/asio/detail/reactor_op_queue.hpp>
  30. #include <boost/asio/detail/select_interrupter.hpp>
  31. #include <boost/asio/detail/select_reactor_fwd.hpp>
  32. #include <boost/asio/detail/service_base.hpp>
  33. #include <boost/asio/detail/signal_blocker.hpp>
  34. #include <boost/asio/detail/socket_ops.hpp>
  35. #include <boost/asio/detail/socket_types.hpp>
  36. #include <boost/asio/detail/task_io_service.hpp>
  37. #include <boost/asio/detail/thread.hpp>
  38. #include <boost/asio/detail/timer_queue.hpp>
  39. namespace boost {
  40. namespace asio {
  41. namespace detail {
  42. template <bool Own_Thread>
  43. class select_reactor
  44. : public boost::asio::detail::service_base<select_reactor<Own_Thread> >
  45. {
  46. public:
  47. // Per-descriptor data.
  48. struct per_descriptor_data
  49. {
  50. };
  51. // Constructor.
  52. select_reactor(boost::asio::io_service& io_service)
  53. : boost::asio::detail::service_base<
  54. select_reactor<Own_Thread> >(io_service),
  55. mutex_(),
  56. select_in_progress_(false),
  57. interrupter_(),
  58. read_op_queue_(),
  59. write_op_queue_(),
  60. except_op_queue_(),
  61. pending_cancellations_(),
  62. stop_thread_(false),
  63. thread_(0),
  64. shutdown_(false)
  65. {
  66. if (Own_Thread)
  67. {
  68. boost::asio::detail::signal_blocker sb;
  69. thread_ = new boost::asio::detail::thread(
  70. bind_handler(&select_reactor::call_run_thread, this));
  71. }
  72. }
  73. // Destructor.
  74. ~select_reactor()
  75. {
  76. shutdown_service();
  77. }
  78. // Destroy all user-defined handler objects owned by the service.
  79. void shutdown_service()
  80. {
  81. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  82. shutdown_ = true;
  83. stop_thread_ = true;
  84. lock.unlock();
  85. if (thread_)
  86. {
  87. interrupter_.interrupt();
  88. thread_->join();
  89. delete thread_;
  90. thread_ = 0;
  91. }
  92. read_op_queue_.destroy_operations();
  93. write_op_queue_.destroy_operations();
  94. except_op_queue_.destroy_operations();
  95. for (std::size_t i = 0; i < timer_queues_.size(); ++i)
  96. timer_queues_[i]->destroy_timers();
  97. timer_queues_.clear();
  98. }
  99. // Initialise the task, but only if the reactor is not in its own thread.
  100. void init_task()
  101. {
  102. if (!Own_Thread)
  103. {
  104. typedef task_io_service<select_reactor<Own_Thread> > task_io_service_type;
  105. use_service<task_io_service_type>(this->get_io_service()).init_task();
  106. }
  107. }
  108. // Register a socket with the reactor. Returns 0 on success, system error
  109. // code on failure.
  110. int register_descriptor(socket_type, per_descriptor_data&)
  111. {
  112. return 0;
  113. }
  114. // Start a new read operation. The handler object will be invoked when the
  115. // given descriptor is ready to be read, or an error has occurred.
  116. template <typename Handler>
  117. void start_read_op(socket_type descriptor, per_descriptor_data&,
  118. Handler handler, bool /*allow_speculative_read*/ = true)
  119. {
  120. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  121. if (!shutdown_)
  122. if (read_op_queue_.enqueue_operation(descriptor, handler))
  123. interrupter_.interrupt();
  124. }
  125. // Start a new write operation. The handler object will be invoked when the
  126. // given descriptor is ready to be written, or an error has occurred.
  127. template <typename Handler>
  128. void start_write_op(socket_type descriptor, per_descriptor_data&,
  129. Handler handler, bool /*allow_speculative_write*/ = true)
  130. {
  131. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  132. if (!shutdown_)
  133. if (write_op_queue_.enqueue_operation(descriptor, handler))
  134. interrupter_.interrupt();
  135. }
  136. // Start a new exception operation. The handler object will be invoked when
  137. // the given descriptor has exception information, or an error has occurred.
  138. template <typename Handler>
  139. void start_except_op(socket_type descriptor,
  140. per_descriptor_data&, Handler handler)
  141. {
  142. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  143. if (!shutdown_)
  144. if (except_op_queue_.enqueue_operation(descriptor, handler))
  145. interrupter_.interrupt();
  146. }
  147. // Wrapper for connect handlers to enable the handler object to be placed
  148. // in both the write and the except operation queues, but ensure that only
  149. // one of the handlers is called.
  150. template <typename Handler>
  151. class connect_handler_wrapper
  152. {
  153. public:
  154. connect_handler_wrapper(socket_type descriptor,
  155. boost::shared_ptr<bool> completed,
  156. select_reactor<Own_Thread>& reactor, Handler handler)
  157. : descriptor_(descriptor),
  158. completed_(completed),
  159. reactor_(reactor),
  160. handler_(handler)
  161. {
  162. }
  163. bool perform(boost::system::error_code& ec,
  164. std::size_t& bytes_transferred)
  165. {
  166. // Check whether one of the handlers has already been called. If it has,
  167. // then we don't want to do anything in this handler.
  168. if (*completed_)
  169. {
  170. completed_.reset(); // Indicate that this handler should not complete.
  171. return true;
  172. }
  173. // Cancel the other reactor operation for the connection.
  174. *completed_ = true;
  175. reactor_.enqueue_cancel_ops_unlocked(descriptor_);
  176. // Call the contained handler.
  177. return handler_.perform(ec, bytes_transferred);
  178. }
  179. void complete(const boost::system::error_code& ec,
  180. std::size_t bytes_transferred)
  181. {
  182. if (completed_.get())
  183. handler_.complete(ec, bytes_transferred);
  184. }
  185. private:
  186. socket_type descriptor_;
  187. boost::shared_ptr<bool> completed_;
  188. select_reactor<Own_Thread>& reactor_;
  189. Handler handler_;
  190. };
  191. // Start new write and exception operations. The handler object will be
  192. // invoked when the given descriptor is ready for writing or has exception
  193. // information available, or an error has occurred. The handler will be called
  194. // only once.
  195. template <typename Handler>
  196. void start_connect_op(socket_type descriptor,
  197. per_descriptor_data&, Handler handler)
  198. {
  199. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  200. if (!shutdown_)
  201. {
  202. boost::shared_ptr<bool> completed(new bool(false));
  203. connect_handler_wrapper<Handler> wrapped_handler(
  204. descriptor, completed, *this, handler);
  205. bool interrupt = write_op_queue_.enqueue_operation(
  206. descriptor, wrapped_handler);
  207. interrupt = except_op_queue_.enqueue_operation(
  208. descriptor, wrapped_handler) || interrupt;
  209. if (interrupt)
  210. interrupter_.interrupt();
  211. }
  212. }
  213. // Cancel all operations associated with the given descriptor. The
  214. // handlers associated with the descriptor will be invoked with the
  215. // operation_aborted error.
  216. void cancel_ops(socket_type descriptor, per_descriptor_data&)
  217. {
  218. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  219. cancel_ops_unlocked(descriptor);
  220. }
  221. // Enqueue cancellation of all operations associated with the given
  222. // descriptor. The handlers associated with the descriptor will be invoked
  223. // with the operation_aborted error. This function does not acquire the
  224. // select_reactor's mutex, and so should only be used when the reactor lock is
  225. // already held.
  226. void enqueue_cancel_ops_unlocked(socket_type descriptor)
  227. {
  228. pending_cancellations_.push_back(descriptor);
  229. }
  230. // Cancel any operations that are running against the descriptor and remove
  231. // its registration from the reactor.
  232. void close_descriptor(socket_type descriptor, per_descriptor_data&)
  233. {
  234. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  235. cancel_ops_unlocked(descriptor);
  236. }
  237. // Add a new timer queue to the reactor.
  238. template <typename Time_Traits>
  239. void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
  240. {
  241. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  242. timer_queues_.push_back(&timer_queue);
  243. }
  244. // Remove a timer queue from the reactor.
  245. template <typename Time_Traits>
  246. void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
  247. {
  248. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  249. for (std::size_t i = 0; i < timer_queues_.size(); ++i)
  250. {
  251. if (timer_queues_[i] == &timer_queue)
  252. {
  253. timer_queues_.erase(timer_queues_.begin() + i);
  254. return;
  255. }
  256. }
  257. }
  258. // Schedule a timer in the given timer queue to expire at the specified
  259. // absolute time. The handler object will be invoked when the timer expires.
  260. template <typename Time_Traits, typename Handler>
  261. void schedule_timer(timer_queue<Time_Traits>& timer_queue,
  262. const typename Time_Traits::time_type& time, Handler handler, void* token)
  263. {
  264. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  265. if (!shutdown_)
  266. if (timer_queue.enqueue_timer(time, handler, token))
  267. interrupter_.interrupt();
  268. }
  269. // Cancel the timer associated with the given token. Returns the number of
  270. // handlers that have been posted or dispatched.
  271. template <typename Time_Traits>
  272. std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
  273. {
  274. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  275. std::size_t n = timer_queue.cancel_timer(token);
  276. if (n > 0)
  277. interrupter_.interrupt();
  278. return n;
  279. }
  280. private:
  281. friend class task_io_service<select_reactor<Own_Thread> >;
  282. // Run select once until interrupted or events are ready to be dispatched.
  283. void run(bool block)
  284. {
  285. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  286. // Dispatch any operation cancellations that were made while the select
  287. // loop was not running.
  288. read_op_queue_.perform_cancellations();
  289. write_op_queue_.perform_cancellations();
  290. except_op_queue_.perform_cancellations();
  291. for (std::size_t i = 0; i < timer_queues_.size(); ++i)
  292. timer_queues_[i]->dispatch_cancellations();
  293. // Check if the thread is supposed to stop.
  294. if (stop_thread_)
  295. {
  296. complete_operations_and_timers(lock);
  297. return;
  298. }
  299. // We can return immediately if there's no work to do and the reactor is
  300. // not supposed to block.
  301. if (!block && read_op_queue_.empty() && write_op_queue_.empty()
  302. && except_op_queue_.empty() && all_timer_queues_are_empty())
  303. {
  304. complete_operations_and_timers(lock);
  305. return;
  306. }
  307. // Set up the descriptor sets.
  308. fd_set_adapter read_fds;
  309. read_fds.set(interrupter_.read_descriptor());
  310. read_op_queue_.get_descriptors(read_fds);
  311. fd_set_adapter write_fds;
  312. write_op_queue_.get_descriptors(write_fds);
  313. fd_set_adapter except_fds;
  314. except_op_queue_.get_descriptors(except_fds);
  315. socket_type max_fd = read_fds.max_descriptor();
  316. if (write_fds.max_descriptor() > max_fd)
  317. max_fd = write_fds.max_descriptor();
  318. if (except_fds.max_descriptor() > max_fd)
  319. max_fd = except_fds.max_descriptor();
  320. // Block on the select call without holding the lock so that new
  321. // operations can be started while the call is executing.
  322. timeval tv_buf = { 0, 0 };
  323. timeval* tv = block ? get_timeout(tv_buf) : &tv_buf;
  324. select_in_progress_ = true;
  325. lock.unlock();
  326. boost::system::error_code ec;
  327. int retval = socket_ops::select(static_cast<int>(max_fd + 1),
  328. read_fds, write_fds, except_fds, tv, ec);
  329. lock.lock();
  330. select_in_progress_ = false;
  331. // Block signals while dispatching operations.
  332. boost::asio::detail::signal_blocker sb;
  333. // Reset the interrupter.
  334. if (retval > 0 && read_fds.is_set(interrupter_.read_descriptor()))
  335. interrupter_.reset();
  336. // Dispatch all ready operations.
  337. if (retval > 0)
  338. {
  339. // Exception operations must be processed first to ensure that any
  340. // out-of-band data is read before normal data.
  341. except_op_queue_.perform_operations_for_descriptors(
  342. except_fds, boost::system::error_code());
  343. read_op_queue_.perform_operations_for_descriptors(
  344. read_fds, boost::system::error_code());
  345. write_op_queue_.perform_operations_for_descriptors(
  346. write_fds, boost::system::error_code());
  347. except_op_queue_.perform_cancellations();
  348. read_op_queue_.perform_cancellations();
  349. write_op_queue_.perform_cancellations();
  350. }
  351. for (std::size_t i = 0; i < timer_queues_.size(); ++i)
  352. {
  353. timer_queues_[i]->dispatch_timers();
  354. timer_queues_[i]->dispatch_cancellations();
  355. }
  356. // Issue any pending cancellations.
  357. for (size_t i = 0; i < pending_cancellations_.size(); ++i)
  358. cancel_ops_unlocked(pending_cancellations_[i]);
  359. pending_cancellations_.clear();
  360. complete_operations_and_timers(lock);
  361. }
  362. // Run the select loop in the thread.
  363. void run_thread()
  364. {
  365. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  366. while (!stop_thread_)
  367. {
  368. lock.unlock();
  369. run(true);
  370. lock.lock();
  371. }
  372. }
  373. // Entry point for the select loop thread.
  374. static void call_run_thread(select_reactor* reactor)
  375. {
  376. reactor->run_thread();
  377. }
  378. // Interrupt the select loop.
  379. void interrupt()
  380. {
  381. interrupter_.interrupt();
  382. }
  383. // Check if all timer queues are empty.
  384. bool all_timer_queues_are_empty() const
  385. {
  386. for (std::size_t i = 0; i < timer_queues_.size(); ++i)
  387. if (!timer_queues_[i]->empty())
  388. return false;
  389. return true;
  390. }
  391. // Get the timeout value for the select call.
  392. timeval* get_timeout(timeval& tv)
  393. {
  394. if (all_timer_queues_are_empty())
  395. return 0;
  396. // By default we will wait no longer than 5 minutes. This will ensure that
  397. // any changes to the system clock are detected after no longer than this.
  398. boost::posix_time::time_duration minimum_wait_duration
  399. = boost::posix_time::minutes(5);
  400. for (std::size_t i = 0; i < timer_queues_.size(); ++i)
  401. {
  402. boost::posix_time::time_duration wait_duration
  403. = timer_queues_[i]->wait_duration();
  404. if (wait_duration < minimum_wait_duration)
  405. minimum_wait_duration = wait_duration;
  406. }
  407. if (minimum_wait_duration > boost::posix_time::time_duration())
  408. {
  409. tv.tv_sec = minimum_wait_duration.total_seconds();
  410. tv.tv_usec = minimum_wait_duration.total_microseconds() % 1000000;
  411. }
  412. else
  413. {
  414. tv.tv_sec = 0;
  415. tv.tv_usec = 0;
  416. }
  417. return &tv;
  418. }
  419. // Cancel all operations associated with the given descriptor. The do_cancel
  420. // function of the handler objects will be invoked. This function does not
  421. // acquire the select_reactor's mutex.
  422. void cancel_ops_unlocked(socket_type descriptor)
  423. {
  424. bool interrupt = read_op_queue_.cancel_operations(descriptor);
  425. interrupt = write_op_queue_.cancel_operations(descriptor) || interrupt;
  426. interrupt = except_op_queue_.cancel_operations(descriptor) || interrupt;
  427. if (interrupt)
  428. interrupter_.interrupt();
  429. }
  430. // Clean up operations and timers. We must not hold the lock since the
  431. // destructors may make calls back into this reactor. We make a copy of the
  432. // vector of timer queues since the original may be modified while the lock
  433. // is not held.
  434. void complete_operations_and_timers(
  435. boost::asio::detail::mutex::scoped_lock& lock)
  436. {
  437. timer_queues_for_cleanup_ = timer_queues_;
  438. lock.unlock();
  439. read_op_queue_.complete_operations();
  440. write_op_queue_.complete_operations();
  441. except_op_queue_.complete_operations();
  442. for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i)
  443. timer_queues_for_cleanup_[i]->complete_timers();
  444. }
  445. // Mutex to protect access to internal data.
  446. boost::asio::detail::mutex mutex_;
  447. // Whether the select loop is currently running or not.
  448. bool select_in_progress_;
  449. // The interrupter is used to break a blocking select call.
  450. select_interrupter interrupter_;
  451. // The queue of read operations.
  452. reactor_op_queue<socket_type> read_op_queue_;
  453. // The queue of write operations.
  454. reactor_op_queue<socket_type> write_op_queue_;
  455. // The queue of exception operations.
  456. reactor_op_queue<socket_type> except_op_queue_;
  457. // The timer queues.
  458. std::vector<timer_queue_base*> timer_queues_;
  459. // A copy of the timer queues, used when cleaning up timers. The copy is
  460. // stored as a class data member to avoid unnecessary memory allocation.
  461. std::vector<timer_queue_base*> timer_queues_for_cleanup_;
  462. // The descriptors that are pending cancellation.
  463. std::vector<socket_type> pending_cancellations_;
  464. // Does the reactor loop thread need to stop.
  465. bool stop_thread_;
  466. // The thread that is running the reactor loop.
  467. boost::asio::detail::thread* thread_;
  468. // Whether the service has been shut down.
  469. bool shutdown_;
  470. };
  471. } // namespace detail
  472. } // namespace asio
  473. } // namespace boost
  474. #include <boost/asio/detail/pop_options.hpp>
  475. #endif // BOOST_ASIO_DETAIL_SELECT_REACTOR_HPP