task_io_service.hpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. //
  2. // task_io_service.hpp
  3. // ~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_DETAIL_TASK_IO_SERVICE_HPP
  11. #define BOOST_ASIO_DETAIL_TASK_IO_SERVICE_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #if defined(BOOST_ASIO_ENABLE_TWO_LOCK_QUEUE)
  16. #include <boost/asio/detail/task_io_service_2lock.hpp>
  17. #else // defined(BOOST_ASIO_ENABLE_TWO_LOCK_QUEUE)
  18. #include <boost/asio/detail/push_options.hpp>
  19. #include <boost/asio/io_service.hpp>
  20. #include <boost/asio/detail/call_stack.hpp>
  21. #include <boost/asio/detail/event.hpp>
  22. #include <boost/asio/detail/handler_alloc_helpers.hpp>
  23. #include <boost/asio/detail/handler_invoke_helpers.hpp>
  24. #include <boost/asio/detail/handler_queue.hpp>
  25. #include <boost/asio/detail/mutex.hpp>
  26. #include <boost/asio/detail/service_base.hpp>
  27. #include <boost/asio/detail/task_io_service_fwd.hpp>
  28. namespace boost {
  29. namespace asio {
  30. namespace detail {
  31. template <typename Task>
  32. class task_io_service
  33. : public boost::asio::detail::service_base<task_io_service<Task> >
  34. {
  35. public:
  36. // Constructor.
  37. task_io_service(boost::asio::io_service& io_service)
  38. : boost::asio::detail::service_base<task_io_service<Task> >(io_service),
  39. mutex_(),
  40. task_(0),
  41. task_interrupted_(true),
  42. outstanding_work_(0),
  43. stopped_(false),
  44. shutdown_(false),
  45. first_idle_thread_(0)
  46. {
  47. }
  48. void init(size_t /*concurrency_hint*/)
  49. {
  50. }
  51. // Destroy all user-defined handler objects owned by the service.
  52. void shutdown_service()
  53. {
  54. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  55. shutdown_ = true;
  56. lock.unlock();
  57. // Destroy handler objects.
  58. while (!handler_queue_.empty())
  59. {
  60. handler_queue::handler* h = handler_queue_.front();
  61. handler_queue_.pop();
  62. if (h != &task_handler_)
  63. h->destroy();
  64. }
  65. // Reset to initial state.
  66. task_ = 0;
  67. }
  68. // Initialise the task, if required.
  69. void init_task()
  70. {
  71. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  72. if (!shutdown_ && !task_)
  73. {
  74. task_ = &use_service<Task>(this->get_io_service());
  75. handler_queue_.push(&task_handler_);
  76. interrupt_one_idle_thread(lock);
  77. }
  78. }
  79. // Run the event loop until interrupted or no more work.
  80. size_t run(boost::system::error_code& ec)
  81. {
  82. typename call_stack<task_io_service>::context ctx(this);
  83. idle_thread_info this_idle_thread;
  84. this_idle_thread.next = 0;
  85. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  86. size_t n = 0;
  87. while (do_one(lock, &this_idle_thread, ec))
  88. if (n != (std::numeric_limits<size_t>::max)())
  89. ++n;
  90. return n;
  91. }
  92. // Run until interrupted or one operation is performed.
  93. size_t run_one(boost::system::error_code& ec)
  94. {
  95. typename call_stack<task_io_service>::context ctx(this);
  96. idle_thread_info this_idle_thread;
  97. this_idle_thread.next = 0;
  98. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  99. return do_one(lock, &this_idle_thread, ec);
  100. }
  101. // Poll for operations without blocking.
  102. size_t poll(boost::system::error_code& ec)
  103. {
  104. typename call_stack<task_io_service>::context ctx(this);
  105. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  106. size_t n = 0;
  107. while (do_one(lock, 0, ec))
  108. if (n != (std::numeric_limits<size_t>::max)())
  109. ++n;
  110. return n;
  111. }
  112. // Poll for one operation without blocking.
  113. size_t poll_one(boost::system::error_code& ec)
  114. {
  115. typename call_stack<task_io_service>::context ctx(this);
  116. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  117. return do_one(lock, 0, ec);
  118. }
  119. // Interrupt the event processing loop.
  120. void stop()
  121. {
  122. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  123. stop_all_threads(lock);
  124. }
  125. // Reset in preparation for a subsequent run invocation.
  126. void reset()
  127. {
  128. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  129. stopped_ = false;
  130. }
  131. // Notify that some work has started.
  132. void work_started()
  133. {
  134. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  135. ++outstanding_work_;
  136. }
  137. // Notify that some work has finished.
  138. void work_finished()
  139. {
  140. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  141. if (--outstanding_work_ == 0)
  142. stop_all_threads(lock);
  143. }
  144. // Request invocation of the given handler.
  145. template <typename Handler>
  146. void dispatch(Handler handler)
  147. {
  148. if (call_stack<task_io_service>::contains(this))
  149. boost_asio_handler_invoke_helpers::invoke(handler, &handler);
  150. else
  151. post(handler);
  152. }
  153. // Request invocation of the given handler and return immediately.
  154. template <typename Handler>
  155. void post(Handler handler)
  156. {
  157. // Allocate and construct an operation to wrap the handler.
  158. handler_queue::scoped_ptr ptr(handler_queue::wrap(handler));
  159. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  160. // If the service has been shut down we silently discard the handler.
  161. if (shutdown_)
  162. return;
  163. // Add the handler to the end of the queue.
  164. handler_queue_.push(ptr.get());
  165. ptr.release();
  166. // An undelivered handler is treated as unfinished work.
  167. ++outstanding_work_;
  168. // Wake up a thread to execute the handler.
  169. if (!interrupt_one_idle_thread(lock))
  170. {
  171. if (!task_interrupted_ && task_)
  172. {
  173. task_interrupted_ = true;
  174. task_->interrupt();
  175. }
  176. }
  177. }
  178. private:
  179. struct idle_thread_info;
  180. size_t do_one(boost::asio::detail::mutex::scoped_lock& lock,
  181. idle_thread_info* this_idle_thread, boost::system::error_code& ec)
  182. {
  183. if (outstanding_work_ == 0 && !stopped_)
  184. {
  185. stop_all_threads(lock);
  186. ec = boost::system::error_code();
  187. return 0;
  188. }
  189. bool polling = !this_idle_thread;
  190. bool task_has_run = false;
  191. while (!stopped_)
  192. {
  193. if (!handler_queue_.empty())
  194. {
  195. // Prepare to execute first handler from queue.
  196. handler_queue::handler* h = handler_queue_.front();
  197. handler_queue_.pop();
  198. if (h == &task_handler_)
  199. {
  200. bool more_handlers = (!handler_queue_.empty());
  201. task_interrupted_ = more_handlers || polling;
  202. // If the task has already run and we're polling then we're done.
  203. if (task_has_run && polling)
  204. {
  205. task_interrupted_ = true;
  206. handler_queue_.push(&task_handler_);
  207. ec = boost::system::error_code();
  208. return 0;
  209. }
  210. task_has_run = true;
  211. lock.unlock();
  212. task_cleanup c(lock, *this);
  213. // Run the task. May throw an exception. Only block if the handler
  214. // queue is empty and we have an idle_thread_info object, otherwise
  215. // we want to return as soon as possible.
  216. task_->run(!more_handlers && !polling);
  217. }
  218. else
  219. {
  220. lock.unlock();
  221. handler_cleanup c(lock, *this);
  222. // Invoke the handler. May throw an exception.
  223. h->invoke(); // invoke() deletes the handler object
  224. ec = boost::system::error_code();
  225. return 1;
  226. }
  227. }
  228. else if (this_idle_thread)
  229. {
  230. // Nothing to run right now, so just wait for work to do.
  231. this_idle_thread->next = first_idle_thread_;
  232. first_idle_thread_ = this_idle_thread;
  233. this_idle_thread->wakeup_event.clear(lock);
  234. this_idle_thread->wakeup_event.wait(lock);
  235. }
  236. else
  237. {
  238. ec = boost::system::error_code();
  239. return 0;
  240. }
  241. }
  242. ec = boost::system::error_code();
  243. return 0;
  244. }
  245. // Stop the task and all idle threads.
  246. void stop_all_threads(
  247. boost::asio::detail::mutex::scoped_lock& lock)
  248. {
  249. stopped_ = true;
  250. interrupt_all_idle_threads(lock);
  251. if (!task_interrupted_ && task_)
  252. {
  253. task_interrupted_ = true;
  254. task_->interrupt();
  255. }
  256. }
  257. // Interrupt a single idle thread. Returns true if a thread was interrupted,
  258. // false if no running thread could be found to interrupt.
  259. bool interrupt_one_idle_thread(
  260. boost::asio::detail::mutex::scoped_lock& lock)
  261. {
  262. if (first_idle_thread_)
  263. {
  264. idle_thread_info* idle_thread = first_idle_thread_;
  265. first_idle_thread_ = idle_thread->next;
  266. idle_thread->next = 0;
  267. idle_thread->wakeup_event.signal(lock);
  268. return true;
  269. }
  270. return false;
  271. }
  272. // Interrupt all idle threads.
  273. void interrupt_all_idle_threads(
  274. boost::asio::detail::mutex::scoped_lock& lock)
  275. {
  276. while (first_idle_thread_)
  277. {
  278. idle_thread_info* idle_thread = first_idle_thread_;
  279. first_idle_thread_ = idle_thread->next;
  280. idle_thread->next = 0;
  281. idle_thread->wakeup_event.signal(lock);
  282. }
  283. }
  284. // Helper class to perform task-related operations on block exit.
  285. class task_cleanup;
  286. friend class task_cleanup;
  287. class task_cleanup
  288. {
  289. public:
  290. task_cleanup(boost::asio::detail::mutex::scoped_lock& lock,
  291. task_io_service& task_io_svc)
  292. : lock_(lock),
  293. task_io_service_(task_io_svc)
  294. {
  295. }
  296. ~task_cleanup()
  297. {
  298. // Reinsert the task at the end of the handler queue.
  299. lock_.lock();
  300. task_io_service_.task_interrupted_ = true;
  301. task_io_service_.handler_queue_.push(&task_io_service_.task_handler_);
  302. }
  303. private:
  304. boost::asio::detail::mutex::scoped_lock& lock_;
  305. task_io_service& task_io_service_;
  306. };
  307. // Helper class to perform handler-related operations on block exit.
  308. class handler_cleanup;
  309. friend class handler_cleanup;
  310. class handler_cleanup
  311. {
  312. public:
  313. handler_cleanup(boost::asio::detail::mutex::scoped_lock& lock,
  314. task_io_service& task_io_svc)
  315. : lock_(lock),
  316. task_io_service_(task_io_svc)
  317. {
  318. }
  319. ~handler_cleanup()
  320. {
  321. lock_.lock();
  322. if (--task_io_service_.outstanding_work_ == 0)
  323. task_io_service_.stop_all_threads(lock_);
  324. }
  325. private:
  326. boost::asio::detail::mutex::scoped_lock& lock_;
  327. task_io_service& task_io_service_;
  328. };
  329. // Mutex to protect access to internal data.
  330. boost::asio::detail::mutex mutex_;
  331. // The task to be run by this service.
  332. Task* task_;
  333. // Handler object to represent the position of the task in the queue.
  334. class task_handler
  335. : public handler_queue::handler
  336. {
  337. public:
  338. task_handler()
  339. : handler_queue::handler(0, 0)
  340. {
  341. }
  342. } task_handler_;
  343. // Whether the task has been interrupted.
  344. bool task_interrupted_;
  345. // The count of unfinished work.
  346. int outstanding_work_;
  347. // The queue of handlers that are ready to be delivered.
  348. handler_queue handler_queue_;
  349. // Flag to indicate that the dispatcher has been stopped.
  350. bool stopped_;
  351. // Flag to indicate that the dispatcher has been shut down.
  352. bool shutdown_;
  353. // Structure containing information about an idle thread.
  354. struct idle_thread_info
  355. {
  356. event wakeup_event;
  357. idle_thread_info* next;
  358. };
  359. // The number of threads that are currently idle.
  360. idle_thread_info* first_idle_thread_;
  361. };
  362. } // namespace detail
  363. } // namespace asio
  364. } // namespace boost
  365. #include <boost/system/error_code.hpp>
  366. #include <boost/asio/detail/pop_options.hpp>
  367. #endif // defined(BOOST_ASIO_ENABLE_TWO_LOCK_QUEUE)
  368. #endif // BOOST_ASIO_DETAIL_TASK_IO_SERVICE_HPP