task_io_service_2lock.hpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. //
  2. // task_io_service_2lock.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_DETAIL_TASK_IO_SERVICE_2LOCK_HPP
  11. #define BOOST_ASIO_DETAIL_TASK_IO_SERVICE_2LOCK_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/push_options.hpp>
  16. #include <boost/asio/io_service.hpp>
  17. #include <boost/asio/detail/call_stack.hpp>
  18. #include <boost/asio/detail/event.hpp>
  19. #include <boost/asio/detail/handler_alloc_helpers.hpp>
  20. #include <boost/asio/detail/handler_invoke_helpers.hpp>
  21. #include <boost/asio/detail/indirect_handler_queue.hpp>
  22. #include <boost/asio/detail/mutex.hpp>
  23. #include <boost/asio/detail/service_base.hpp>
  24. #include <boost/asio/detail/task_io_service_fwd.hpp>
  25. #include <boost/asio/detail/push_options.hpp>
  26. #include <boost/detail/atomic_count.hpp>
  27. #include <boost/system/error_code.hpp>
  28. #include <boost/asio/detail/pop_options.hpp>
  29. namespace boost {
  30. namespace asio {
  31. namespace detail {
  32. // An alternative task_io_service implementation based on a two-lock queue.
  33. template <typename Task>
  34. class task_io_service
  35. : public boost::asio::detail::service_base<task_io_service<Task> >
  36. {
  37. public:
  38. typedef indirect_handler_queue handler_queue;
  39. // Constructor.
  40. task_io_service(boost::asio::io_service& io_service)
  41. : boost::asio::detail::service_base<task_io_service<Task> >(io_service),
  42. front_mutex_(),
  43. back_mutex_(),
  44. task_(0),
  45. outstanding_work_(0),
  46. front_stopped_(false),
  47. back_stopped_(false),
  48. back_shutdown_(false),
  49. back_first_idle_thread_(0),
  50. back_task_thread_(0)
  51. {
  52. }
  53. void init(size_t /*concurrency_hint*/)
  54. {
  55. }
  56. // Destroy all user-defined handler objects owned by the service.
  57. void shutdown_service()
  58. {
  59. boost::asio::detail::mutex::scoped_lock back_lock(back_mutex_);
  60. back_shutdown_ = true;
  61. back_lock.unlock();
  62. // Destroy handler objects.
  63. while (handler_queue::handler* h = handler_queue_.pop())
  64. if (h != &task_handler_)
  65. h->destroy();
  66. // Reset to initial state.
  67. task_ = 0;
  68. }
  69. // Initialise the task, if required.
  70. void init_task()
  71. {
  72. boost::asio::detail::mutex::scoped_lock back_lock(back_mutex_);
  73. if (!back_shutdown_ && !task_)
  74. {
  75. task_ = &use_service<Task>(this->get_io_service());
  76. handler_queue_.push(&task_handler_);
  77. interrupt_one_idle_thread(back_lock);
  78. }
  79. }
  80. // Run the event loop until interrupted or no more work.
  81. size_t run(boost::system::error_code& ec)
  82. {
  83. if (outstanding_work_ == 0)
  84. {
  85. stop();
  86. ec = boost::system::error_code();
  87. return 0;
  88. }
  89. typename call_stack<task_io_service>::context ctx(this);
  90. idle_thread_info this_idle_thread;
  91. this_idle_thread.next = 0;
  92. size_t n = 0;
  93. while (do_one(&this_idle_thread, ec))
  94. if (n != (std::numeric_limits<size_t>::max)())
  95. ++n;
  96. return n;
  97. }
  98. // Run until interrupted or one operation is performed.
  99. size_t run_one(boost::system::error_code& ec)
  100. {
  101. if (outstanding_work_ == 0)
  102. {
  103. stop();
  104. ec = boost::system::error_code();
  105. return 0;
  106. }
  107. typename call_stack<task_io_service>::context ctx(this);
  108. idle_thread_info this_idle_thread;
  109. this_idle_thread.next = 0;
  110. return do_one(&this_idle_thread, ec);
  111. }
  112. // Poll for operations without blocking.
  113. size_t poll(boost::system::error_code& ec)
  114. {
  115. if (outstanding_work_ == 0)
  116. {
  117. stop();
  118. ec = boost::system::error_code();
  119. return 0;
  120. }
  121. typename call_stack<task_io_service>::context ctx(this);
  122. size_t n = 0;
  123. while (do_one(0, ec))
  124. if (n != (std::numeric_limits<size_t>::max)())
  125. ++n;
  126. return n;
  127. }
  128. // Poll for one operation without blocking.
  129. size_t poll_one(boost::system::error_code& ec)
  130. {
  131. if (outstanding_work_ == 0)
  132. {
  133. stop();
  134. ec = boost::system::error_code();
  135. return 0;
  136. }
  137. typename call_stack<task_io_service>::context ctx(this);
  138. return do_one(0, ec);
  139. }
  140. // Interrupt the event processing loop.
  141. void stop()
  142. {
  143. boost::asio::detail::mutex::scoped_lock front_lock(front_mutex_);
  144. front_stopped_ = true;
  145. front_lock.unlock();
  146. boost::asio::detail::mutex::scoped_lock back_lock(back_mutex_);
  147. back_stopped_ = true;
  148. interrupt_all_idle_threads(back_lock);
  149. }
  150. // Reset in preparation for a subsequent run invocation.
  151. void reset()
  152. {
  153. boost::asio::detail::mutex::scoped_lock front_lock(front_mutex_);
  154. front_stopped_ = false;
  155. front_lock.unlock();
  156. boost::asio::detail::mutex::scoped_lock back_lock(back_mutex_);
  157. back_stopped_ = false;
  158. }
  159. // Notify that some work has started.
  160. void work_started()
  161. {
  162. ++outstanding_work_;
  163. }
  164. // Notify that some work has finished.
  165. void work_finished()
  166. {
  167. if (--outstanding_work_ == 0)
  168. stop();
  169. }
  170. // Request invocation of the given handler.
  171. template <typename Handler>
  172. void dispatch(Handler handler)
  173. {
  174. if (call_stack<task_io_service>::contains(this))
  175. boost_asio_handler_invoke_helpers::invoke(handler, &handler);
  176. else
  177. post(handler);
  178. }
  179. // Request invocation of the given handler and return immediately.
  180. template <typename Handler>
  181. void post(Handler handler)
  182. {
  183. // Allocate and construct an operation to wrap the handler.
  184. handler_queue::scoped_ptr ptr(handler_queue::wrap(handler));
  185. boost::asio::detail::mutex::scoped_lock back_lock(back_mutex_);
  186. // If the service has been shut down we silently discard the handler.
  187. if (back_shutdown_)
  188. return;
  189. // Add the handler to the end of the queue.
  190. handler_queue_.push(ptr.get());
  191. ptr.release();
  192. // An undelivered handler is treated as unfinished work.
  193. ++outstanding_work_;
  194. // Wake up a thread to execute the handler.
  195. interrupt_one_idle_thread(back_lock);
  196. }
  197. private:
  198. struct idle_thread_info;
  199. size_t do_one(idle_thread_info* this_idle_thread,
  200. boost::system::error_code& ec)
  201. {
  202. bool task_has_run = false;
  203. for (;;)
  204. {
  205. // The front lock must be held before we can pop items from the queue.
  206. boost::asio::detail::mutex::scoped_lock front_lock(front_mutex_);
  207. if (front_stopped_)
  208. {
  209. ec = boost::system::error_code();
  210. return 0;
  211. }
  212. if (handler_queue::handler* h = handler_queue_.pop())
  213. {
  214. if (h == &task_handler_)
  215. {
  216. bool more_handlers = handler_queue_.poppable();
  217. unsigned long front_version = handler_queue_.front_version();
  218. front_lock.unlock();
  219. // The task is always added to the back of the queue when we exit
  220. // this block.
  221. task_cleanup c(*this);
  222. // If we're polling and the task has already run then we're done.
  223. bool polling = !this_idle_thread;
  224. if (task_has_run && polling)
  225. {
  226. ec = boost::system::error_code();
  227. return 0;
  228. }
  229. // If we're considering going idle we need to check whether the queue
  230. // is still empty. If it is, add the thread to the list of idle
  231. // threads.
  232. if (!more_handlers && !polling)
  233. {
  234. boost::asio::detail::mutex::scoped_lock back_lock(back_mutex_);
  235. if (back_stopped_)
  236. {
  237. ec = boost::system::error_code();
  238. return 0;
  239. }
  240. else if (front_version == handler_queue_.back_version())
  241. {
  242. back_task_thread_ = this_idle_thread;
  243. }
  244. else
  245. {
  246. more_handlers = true;
  247. }
  248. }
  249. // Run the task. May throw an exception. Only block if the handler
  250. // queue is empty and we're not polling, otherwise we want to return
  251. // as soon as possible.
  252. task_has_run = true;
  253. task_->run(!more_handlers && !polling);
  254. }
  255. else
  256. {
  257. front_lock.unlock();
  258. handler_cleanup c(*this);
  259. // Invoke the handler. May throw an exception.
  260. h->invoke(); // invoke() deletes the handler object
  261. ec = boost::system::error_code();
  262. return 1;
  263. }
  264. }
  265. else if (this_idle_thread)
  266. {
  267. unsigned long front_version = handler_queue_.front_version();
  268. front_lock.unlock();
  269. // If we're considering going idle we need to check whether the queue
  270. // is still empty. If it is, add the thread to the list of idle
  271. // threads.
  272. boost::asio::detail::mutex::scoped_lock back_lock(back_mutex_);
  273. if (back_stopped_)
  274. {
  275. ec = boost::system::error_code();
  276. return 0;
  277. }
  278. else if (front_version == handler_queue_.back_version())
  279. {
  280. this_idle_thread->next = back_first_idle_thread_;
  281. back_first_idle_thread_ = this_idle_thread;
  282. this_idle_thread->wakeup_event.clear(back_lock);
  283. this_idle_thread->wakeup_event.wait(back_lock);
  284. }
  285. }
  286. else
  287. {
  288. ec = boost::system::error_code();
  289. return 0;
  290. }
  291. }
  292. }
  293. // Interrupt a single idle thread.
  294. void interrupt_one_idle_thread(
  295. boost::asio::detail::mutex::scoped_lock& back_lock)
  296. {
  297. if (back_first_idle_thread_)
  298. {
  299. idle_thread_info* idle_thread = back_first_idle_thread_;
  300. back_first_idle_thread_ = idle_thread->next;
  301. idle_thread->next = 0;
  302. idle_thread->wakeup_event.signal(back_lock);
  303. }
  304. else if (back_task_thread_ && task_)
  305. {
  306. back_task_thread_ = 0;
  307. task_->interrupt();
  308. }
  309. }
  310. // Interrupt all idle threads.
  311. void interrupt_all_idle_threads(
  312. boost::asio::detail::mutex::scoped_lock& back_lock)
  313. {
  314. while (back_first_idle_thread_)
  315. {
  316. idle_thread_info* idle_thread = back_first_idle_thread_;
  317. back_first_idle_thread_ = idle_thread->next;
  318. idle_thread->next = 0;
  319. idle_thread->wakeup_event.signal(back_lock);
  320. }
  321. if (back_task_thread_ && task_)
  322. {
  323. back_task_thread_ = 0;
  324. task_->interrupt();
  325. }
  326. }
  327. // Helper class to perform task-related operations on block exit.
  328. class task_cleanup;
  329. friend class task_cleanup;
  330. class task_cleanup
  331. {
  332. public:
  333. task_cleanup(task_io_service& task_io_svc)
  334. : task_io_service_(task_io_svc)
  335. {
  336. }
  337. ~task_cleanup()
  338. {
  339. // Reinsert the task at the end of the handler queue.
  340. boost::asio::detail::mutex::scoped_lock back_lock(
  341. task_io_service_.back_mutex_);
  342. task_io_service_.back_task_thread_ = 0;
  343. task_io_service_.handler_queue_.push(&task_io_service_.task_handler_);
  344. }
  345. private:
  346. task_io_service& task_io_service_;
  347. };
  348. // Helper class to perform handler-related operations on block exit.
  349. class handler_cleanup
  350. {
  351. public:
  352. handler_cleanup(task_io_service& task_io_svc)
  353. : task_io_service_(task_io_svc)
  354. {
  355. }
  356. ~handler_cleanup()
  357. {
  358. task_io_service_.work_finished();
  359. }
  360. private:
  361. task_io_service& task_io_service_;
  362. };
  363. // Mutexes to protect access to internal data.
  364. boost::asio::detail::mutex front_mutex_;
  365. boost::asio::detail::mutex back_mutex_;
  366. // The task to be run by this service.
  367. Task* task_;
  368. // Handler object to represent the position of the task in the queue.
  369. class task_handler
  370. : public handler_queue::handler
  371. {
  372. public:
  373. task_handler()
  374. : handler_queue::handler(0, 0)
  375. {
  376. }
  377. } task_handler_;
  378. // The count of unfinished work.
  379. boost::detail::atomic_count outstanding_work_;
  380. // The queue of handlers that are ready to be delivered.
  381. handler_queue handler_queue_;
  382. // Flag to indicate that the dispatcher has been stopped.
  383. bool front_stopped_;
  384. bool back_stopped_;
  385. // Flag to indicate that the dispatcher has been shut down.
  386. bool back_shutdown_;
  387. // Structure containing information about an idle thread.
  388. struct idle_thread_info
  389. {
  390. event wakeup_event;
  391. idle_thread_info* next;
  392. };
  393. // The number of threads that are currently idle.
  394. idle_thread_info* back_first_idle_thread_;
  395. // The thread that is currently blocked on the task.
  396. idle_thread_info* back_task_thread_;
  397. };
  398. } // namespace detail
  399. } // namespace asio
  400. } // namespace boost
  401. #include <boost/asio/detail/pop_options.hpp>
  402. #endif // BOOST_ASIO_DETAIL_TASK_IO_SERVICE_2LOCK_HPP