strand_service.hpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. //
  2. // strand_service.hpp
  3. // ~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_DETAIL_STRAND_SERVICE_HPP
  11. #define BOOST_ASIO_DETAIL_STRAND_SERVICE_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/push_options.hpp>
  16. #include <boost/asio/detail/push_options.hpp>
  17. #include <boost/aligned_storage.hpp>
  18. #include <boost/assert.hpp>
  19. #include <boost/detail/atomic_count.hpp>
  20. #include <boost/intrusive_ptr.hpp>
  21. #include <boost/asio/detail/pop_options.hpp>
  22. #include <boost/asio/io_service.hpp>
  23. #include <boost/asio/detail/bind_handler.hpp>
  24. #include <boost/asio/detail/call_stack.hpp>
  25. #include <boost/asio/detail/handler_alloc_helpers.hpp>
  26. #include <boost/asio/detail/handler_invoke_helpers.hpp>
  27. #include <boost/asio/detail/mutex.hpp>
  28. #include <boost/asio/detail/noncopyable.hpp>
  29. #include <boost/asio/detail/service_base.hpp>
  30. namespace boost {
  31. namespace asio {
  32. namespace detail {
  33. // Default service implementation for a strand.
  34. class strand_service
  35. : public boost::asio::detail::service_base<strand_service>
  36. {
  37. public:
  38. class handler_base;
  39. class invoke_current_handler;
  40. class post_next_waiter_on_exit;
  41. // The underlying implementation of a strand.
  42. class strand_impl
  43. {
  44. #if defined (__BORLANDC__)
  45. public:
  46. #else
  47. private:
  48. #endif
  49. void add_ref()
  50. {
  51. ++ref_count_;
  52. }
  53. void release()
  54. {
  55. if (--ref_count_ == 0)
  56. delete this;
  57. }
  58. private:
  59. // Only this service will have access to the internal values.
  60. friend class strand_service;
  61. friend class post_next_waiter_on_exit;
  62. friend class invoke_current_handler;
  63. strand_impl(strand_service& owner)
  64. : owner_(owner),
  65. current_handler_(0),
  66. first_waiter_(0),
  67. last_waiter_(0),
  68. ref_count_(0)
  69. {
  70. // Insert implementation into linked list of all implementations.
  71. boost::asio::detail::mutex::scoped_lock lock(owner_.mutex_);
  72. next_ = owner_.impl_list_;
  73. prev_ = 0;
  74. if (owner_.impl_list_)
  75. owner_.impl_list_->prev_ = this;
  76. owner_.impl_list_ = this;
  77. }
  78. ~strand_impl()
  79. {
  80. // Remove implementation from linked list of all implementations.
  81. boost::asio::detail::mutex::scoped_lock lock(owner_.mutex_);
  82. if (owner_.impl_list_ == this)
  83. owner_.impl_list_ = next_;
  84. if (prev_)
  85. prev_->next_ = next_;
  86. if (next_)
  87. next_->prev_= prev_;
  88. next_ = 0;
  89. prev_ = 0;
  90. lock.unlock();
  91. if (current_handler_)
  92. {
  93. current_handler_->destroy();
  94. }
  95. while (first_waiter_)
  96. {
  97. handler_base* next = first_waiter_->next_;
  98. first_waiter_->destroy();
  99. first_waiter_ = next;
  100. }
  101. }
  102. // Mutex to protect access to internal data.
  103. boost::asio::detail::mutex mutex_;
  104. // The service that owns this implementation.
  105. strand_service& owner_;
  106. // The handler that is ready to execute. If this pointer is non-null then it
  107. // indicates that a handler holds the lock.
  108. handler_base* current_handler_;
  109. // The start of the list of waiting handlers for the strand.
  110. handler_base* first_waiter_;
  111. // The end of the list of waiting handlers for the strand.
  112. handler_base* last_waiter_;
  113. // Storage for posted handlers.
  114. typedef boost::aligned_storage<128> handler_storage_type;
  115. #if defined(__BORLANDC__)
  116. boost::aligned_storage<128> handler_storage_;
  117. #else
  118. handler_storage_type handler_storage_;
  119. #endif
  120. // Pointers to adjacent socket implementations in linked list.
  121. strand_impl* next_;
  122. strand_impl* prev_;
  123. // The reference count on the strand implementation.
  124. boost::detail::atomic_count ref_count_;
  125. #if !defined(__BORLANDC__)
  126. friend void intrusive_ptr_add_ref(strand_impl* p)
  127. {
  128. p->add_ref();
  129. }
  130. friend void intrusive_ptr_release(strand_impl* p)
  131. {
  132. p->release();
  133. }
  134. #endif
  135. };
  136. friend class strand_impl;
  137. typedef boost::intrusive_ptr<strand_impl> implementation_type;
  138. // Base class for all handler types.
  139. class handler_base
  140. {
  141. public:
  142. typedef void (*invoke_func_type)(handler_base*,
  143. strand_service&, implementation_type&);
  144. typedef void (*destroy_func_type)(handler_base*);
  145. handler_base(invoke_func_type invoke_func, destroy_func_type destroy_func)
  146. : next_(0),
  147. invoke_func_(invoke_func),
  148. destroy_func_(destroy_func)
  149. {
  150. }
  151. void invoke(strand_service& service_impl, implementation_type& impl)
  152. {
  153. invoke_func_(this, service_impl, impl);
  154. }
  155. void destroy()
  156. {
  157. destroy_func_(this);
  158. }
  159. protected:
  160. ~handler_base()
  161. {
  162. }
  163. private:
  164. friend class strand_service;
  165. friend class strand_impl;
  166. friend class post_next_waiter_on_exit;
  167. handler_base* next_;
  168. invoke_func_type invoke_func_;
  169. destroy_func_type destroy_func_;
  170. };
  171. // Helper class to allow handlers to be dispatched.
  172. class invoke_current_handler
  173. {
  174. public:
  175. invoke_current_handler(strand_service& service_impl,
  176. const implementation_type& impl)
  177. : service_impl_(service_impl),
  178. impl_(impl)
  179. {
  180. }
  181. void operator()()
  182. {
  183. impl_->current_handler_->invoke(service_impl_, impl_);
  184. }
  185. friend void* asio_handler_allocate(std::size_t size,
  186. invoke_current_handler* this_handler)
  187. {
  188. return this_handler->do_handler_allocate(size);
  189. }
  190. friend void asio_handler_deallocate(void*, std::size_t,
  191. invoke_current_handler*)
  192. {
  193. }
  194. void* do_handler_allocate(std::size_t size)
  195. {
  196. #if defined(__BORLANDC__)
  197. BOOST_ASSERT(size <= boost::aligned_storage<128>::size);
  198. #else
  199. BOOST_ASSERT(size <= strand_impl::handler_storage_type::size);
  200. #endif
  201. (void)size;
  202. return impl_->handler_storage_.address();
  203. }
  204. // The asio_handler_invoke hook is not defined here since the default one
  205. // provides the correct behaviour, and including it here breaks MSVC 7.1
  206. // in some situations.
  207. private:
  208. strand_service& service_impl_;
  209. implementation_type impl_;
  210. };
  211. // Helper class to automatically enqueue next waiter on block exit.
  212. class post_next_waiter_on_exit
  213. {
  214. public:
  215. post_next_waiter_on_exit(strand_service& service_impl,
  216. implementation_type& impl)
  217. : service_impl_(service_impl),
  218. impl_(impl),
  219. cancelled_(false)
  220. {
  221. }
  222. ~post_next_waiter_on_exit()
  223. {
  224. if (!cancelled_)
  225. {
  226. boost::asio::detail::mutex::scoped_lock lock(impl_->mutex_);
  227. impl_->current_handler_ = impl_->first_waiter_;
  228. if (impl_->current_handler_)
  229. {
  230. impl_->first_waiter_ = impl_->first_waiter_->next_;
  231. if (impl_->first_waiter_ == 0)
  232. impl_->last_waiter_ = 0;
  233. lock.unlock();
  234. service_impl_.get_io_service().post(
  235. invoke_current_handler(service_impl_, impl_));
  236. }
  237. }
  238. }
  239. void cancel()
  240. {
  241. cancelled_ = true;
  242. }
  243. private:
  244. strand_service& service_impl_;
  245. implementation_type& impl_;
  246. bool cancelled_;
  247. };
  248. // Class template for a waiter.
  249. template <typename Handler>
  250. class handler_wrapper
  251. : public handler_base
  252. {
  253. public:
  254. handler_wrapper(Handler handler)
  255. : handler_base(&handler_wrapper<Handler>::do_invoke,
  256. &handler_wrapper<Handler>::do_destroy),
  257. handler_(handler)
  258. {
  259. }
  260. static void do_invoke(handler_base* base,
  261. strand_service& service_impl, implementation_type& impl)
  262. {
  263. // Take ownership of the handler object.
  264. typedef handler_wrapper<Handler> this_type;
  265. this_type* h(static_cast<this_type*>(base));
  266. typedef handler_alloc_traits<Handler, this_type> alloc_traits;
  267. handler_ptr<alloc_traits> ptr(h->handler_, h);
  268. post_next_waiter_on_exit p1(service_impl, impl);
  269. // Make a copy of the handler so that the memory can be deallocated before
  270. // the upcall is made.
  271. Handler handler(h->handler_);
  272. // A handler object must still be valid when the next waiter is posted
  273. // since destroying the last handler might cause the strand object to be
  274. // destroyed. Therefore we create a second post_next_waiter_on_exit object
  275. // that will be destroyed before the handler object.
  276. p1.cancel();
  277. post_next_waiter_on_exit p2(service_impl, impl);
  278. // Free the memory associated with the handler.
  279. ptr.reset();
  280. // Indicate that this strand is executing on the current thread.
  281. call_stack<strand_impl>::context ctx(impl.get());
  282. // Make the upcall.
  283. boost_asio_handler_invoke_helpers::invoke(handler, &handler);
  284. }
  285. static void do_destroy(handler_base* base)
  286. {
  287. // Take ownership of the handler object.
  288. typedef handler_wrapper<Handler> this_type;
  289. this_type* h(static_cast<this_type*>(base));
  290. typedef handler_alloc_traits<Handler, this_type> alloc_traits;
  291. handler_ptr<alloc_traits> ptr(h->handler_, h);
  292. // A sub-object of the handler may be the true owner of the memory
  293. // associated with the handler. Consequently, a local copy of the handler
  294. // is required to ensure that any owning sub-object remains valid until
  295. // after we have deallocated the memory here.
  296. Handler handler(h->handler_);
  297. (void)handler;
  298. // Free the memory associated with the handler.
  299. ptr.reset();
  300. }
  301. private:
  302. Handler handler_;
  303. };
  304. // Construct a new strand service for the specified io_service.
  305. explicit strand_service(boost::asio::io_service& io_service)
  306. : boost::asio::detail::service_base<strand_service>(io_service),
  307. mutex_(),
  308. impl_list_(0)
  309. {
  310. }
  311. // Destroy all user-defined handler objects owned by the service.
  312. void shutdown_service()
  313. {
  314. // Construct a list of all handlers to be destroyed.
  315. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  316. strand_impl* impl = impl_list_;
  317. handler_base* first_handler = 0;
  318. while (impl)
  319. {
  320. if (impl->current_handler_)
  321. {
  322. impl->current_handler_->next_ = first_handler;
  323. first_handler = impl->current_handler_;
  324. impl->current_handler_ = 0;
  325. }
  326. if (impl->first_waiter_)
  327. {
  328. impl->last_waiter_->next_ = first_handler;
  329. first_handler = impl->first_waiter_;
  330. impl->first_waiter_ = 0;
  331. impl->last_waiter_ = 0;
  332. }
  333. impl = impl->next_;
  334. }
  335. // Destroy all handlers without holding the lock.
  336. lock.unlock();
  337. while (first_handler)
  338. {
  339. handler_base* next = first_handler->next_;
  340. first_handler->destroy();
  341. first_handler = next;
  342. }
  343. }
  344. // Construct a new strand implementation.
  345. void construct(implementation_type& impl)
  346. {
  347. impl = implementation_type(new strand_impl(*this));
  348. }
  349. // Destroy a strand implementation.
  350. void destroy(implementation_type& impl)
  351. {
  352. implementation_type().swap(impl);
  353. }
  354. // Request the io_service to invoke the given handler.
  355. template <typename Handler>
  356. void dispatch(implementation_type& impl, Handler handler)
  357. {
  358. if (call_stack<strand_impl>::contains(impl.get()))
  359. {
  360. boost_asio_handler_invoke_helpers::invoke(handler, &handler);
  361. }
  362. else
  363. {
  364. // Allocate and construct an object to wrap the handler.
  365. typedef handler_wrapper<Handler> value_type;
  366. typedef handler_alloc_traits<Handler, value_type> alloc_traits;
  367. raw_handler_ptr<alloc_traits> raw_ptr(handler);
  368. handler_ptr<alloc_traits> ptr(raw_ptr, handler);
  369. boost::asio::detail::mutex::scoped_lock lock(impl->mutex_);
  370. if (impl->current_handler_ == 0)
  371. {
  372. // This handler now has the lock, so can be dispatched immediately.
  373. impl->current_handler_ = ptr.release();
  374. lock.unlock();
  375. this->get_io_service().dispatch(invoke_current_handler(*this, impl));
  376. }
  377. else
  378. {
  379. // Another handler already holds the lock, so this handler must join
  380. // the list of waiters. The handler will be posted automatically when
  381. // its turn comes.
  382. if (impl->last_waiter_)
  383. {
  384. impl->last_waiter_->next_ = ptr.get();
  385. impl->last_waiter_ = impl->last_waiter_->next_;
  386. }
  387. else
  388. {
  389. impl->first_waiter_ = ptr.get();
  390. impl->last_waiter_ = ptr.get();
  391. }
  392. ptr.release();
  393. }
  394. }
  395. }
  396. // Request the io_service to invoke the given handler and return immediately.
  397. template <typename Handler>
  398. void post(implementation_type& impl, Handler handler)
  399. {
  400. // Allocate and construct an object to wrap the handler.
  401. typedef handler_wrapper<Handler> value_type;
  402. typedef handler_alloc_traits<Handler, value_type> alloc_traits;
  403. raw_handler_ptr<alloc_traits> raw_ptr(handler);
  404. handler_ptr<alloc_traits> ptr(raw_ptr, handler);
  405. boost::asio::detail::mutex::scoped_lock lock(impl->mutex_);
  406. if (impl->current_handler_ == 0)
  407. {
  408. // This handler now has the lock, so can be dispatched immediately.
  409. impl->current_handler_ = ptr.release();
  410. lock.unlock();
  411. this->get_io_service().post(invoke_current_handler(*this, impl));
  412. }
  413. else
  414. {
  415. // Another handler already holds the lock, so this handler must join the
  416. // list of waiters. The handler will be posted automatically when its turn
  417. // comes.
  418. if (impl->last_waiter_)
  419. {
  420. impl->last_waiter_->next_ = ptr.get();
  421. impl->last_waiter_ = impl->last_waiter_->next_;
  422. }
  423. else
  424. {
  425. impl->first_waiter_ = ptr.get();
  426. impl->last_waiter_ = ptr.get();
  427. }
  428. ptr.release();
  429. }
  430. }
  431. private:
  432. // Mutex to protect access to the linked list of implementations.
  433. boost::asio::detail::mutex mutex_;
  434. // The head of a linked list of all implementations.
  435. strand_impl* impl_list_;
  436. };
  437. } // namespace detail
  438. } // namespace asio
  439. } // namespace boost
  440. #if defined(__BORLANDC__)
  441. namespace boost {
  442. inline void intrusive_ptr_add_ref(
  443. boost::asio::detail::strand_service::strand_impl* p)
  444. {
  445. p->add_ref();
  446. }
  447. inline void intrusive_ptr_release(
  448. boost::asio::detail::strand_service::strand_impl* p)
  449. {
  450. p->release();
  451. }
  452. } // namespace boost
  453. #endif // defined(__BORLANDC__)
  454. #include <boost/asio/detail/pop_options.hpp>
  455. #endif // BOOST_ASIO_DETAIL_STRAND_SERVICE_HPP