reactor_op_queue.hpp 13 KB


  1. //
  2. // reactor_op_queue.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_DETAIL_REACTOR_OP_QUEUE_HPP
  11. #define BOOST_ASIO_DETAIL_REACTOR_OP_QUEUE_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/push_options.hpp>
  16. #include <boost/asio/detail/push_options.hpp>
  17. #include <memory>
  18. #include <boost/asio/detail/pop_options.hpp>
  19. #include <boost/asio/error.hpp>
  20. #include <boost/asio/detail/handler_alloc_helpers.hpp>
  21. #include <boost/asio/detail/hash_map.hpp>
  22. #include <boost/asio/detail/noncopyable.hpp>
  23. namespace boost {
  24. namespace asio {
  25. namespace detail {
  26. template <typename Descriptor>
  27. class reactor_op_queue
  28. : private noncopyable
  29. {
  30. public:
  31. // Constructor.
  32. reactor_op_queue()
  33. : operations_(),
  34. cancelled_operations_(0),
  35. complete_operations_(0)
  36. {
  37. }
  38. // Add a new operation to the queue. Returns true if this is the only
  39. // operation for the given descriptor, in which case the reactor's event
  40. // demultiplexing function call may need to be interrupted and restarted.
  41. template <typename Operation>
  42. bool enqueue_operation(Descriptor descriptor, Operation operation)
  43. {
  44. // Allocate and construct an object to wrap the handler.
  45. typedef handler_alloc_traits<Operation, op<Operation> > alloc_traits;
  46. raw_handler_ptr<alloc_traits> raw_ptr(operation);
  47. handler_ptr<alloc_traits> ptr(raw_ptr, descriptor, operation);
  48. typedef typename operation_map::iterator iterator;
  49. typedef typename operation_map::value_type value_type;
  50. std::pair<iterator, bool> entry =
  51. operations_.insert(value_type(descriptor, ptr.get()));
  52. if (entry.second)
  53. {
  54. ptr.release();
  55. return true;
  56. }
  57. op_base* current_op = entry.first->second;
  58. while (current_op->next_)
  59. current_op = current_op->next_;
  60. current_op->next_ = ptr.release();
  61. return false;
  62. }
  63. // Cancel all operations associated with the descriptor. Any operations
  64. // pending for the descriptor will be notified that they have been cancelled
  65. // next time perform_cancellations is called. Returns true if any operations
  66. // were cancelled, in which case the reactor's event demultiplexing function
  67. // may need to be interrupted and restarted.
  68. bool cancel_operations(Descriptor descriptor)
  69. {
  70. typename operation_map::iterator i = operations_.find(descriptor);
  71. if (i != operations_.end())
  72. {
  73. op_base* last_op = i->second;
  74. while (last_op->next_)
  75. last_op = last_op->next_;
  76. last_op->next_ = cancelled_operations_;
  77. cancelled_operations_ = i->second;
  78. operations_.erase(i);
  79. return true;
  80. }
  81. return false;
  82. }
  83. // Whether there are no operations in the queue.
  84. bool empty() const
  85. {
  86. return operations_.empty();
  87. }
  88. // Determine whether there are any operations associated with the descriptor.
  89. bool has_operation(Descriptor descriptor) const
  90. {
  91. return operations_.find(descriptor) != operations_.end();
  92. }
  93. // Perform the first operation corresponding to the descriptor. Returns true
  94. // if there are more operations queued for the descriptor.
  95. bool perform_operation(Descriptor descriptor,
  96. const boost::system::error_code& result)
  97. {
  98. typename operation_map::iterator i = operations_.find(descriptor);
  99. if (i != operations_.end())
  100. {
  101. op_base* this_op = i->second;
  102. i->second = this_op->next_;
  103. this_op->next_ = complete_operations_;
  104. complete_operations_ = this_op;
  105. bool done = this_op->perform(result);
  106. if (done)
  107. {
  108. // Operation has finished.
  109. if (i->second)
  110. {
  111. return true;
  112. }
  113. else
  114. {
  115. operations_.erase(i);
  116. return false;
  117. }
  118. }
  119. else
  120. {
  121. // Operation wants to be called again. Leave it at the front of the
  122. // queue for this descriptor, and remove from the completed list.
  123. complete_operations_ = this_op->next_;
  124. this_op->next_ = i->second;
  125. i->second = this_op;
  126. return true;
  127. }
  128. }
  129. return false;
  130. }
  131. // Perform all operations corresponding to the descriptor.
  132. void perform_all_operations(Descriptor descriptor,
  133. const boost::system::error_code& result)
  134. {
  135. typename operation_map::iterator i = operations_.find(descriptor);
  136. if (i != operations_.end())
  137. {
  138. while (i->second)
  139. {
  140. op_base* this_op = i->second;
  141. i->second = this_op->next_;
  142. this_op->next_ = complete_operations_;
  143. complete_operations_ = this_op;
  144. bool done = this_op->perform(result);
  145. if (!done)
  146. {
  147. // Operation has not finished yet, so leave at front of queue, and
  148. // remove from the completed list.
  149. complete_operations_ = this_op->next_;
  150. this_op->next_ = i->second;
  151. i->second = this_op;
  152. return;
  153. }
  154. }
  155. operations_.erase(i);
  156. }
  157. }
  158. // Fill a descriptor set with the descriptors corresponding to each active
  159. // operation.
  160. template <typename Descriptor_Set>
  161. void get_descriptors(Descriptor_Set& descriptors)
  162. {
  163. typename operation_map::iterator i = operations_.begin();
  164. while (i != operations_.end())
  165. {
  166. Descriptor descriptor = i->first;
  167. ++i;
  168. if (!descriptors.set(descriptor))
  169. {
  170. boost::system::error_code ec(error::fd_set_failure);
  171. perform_all_operations(descriptor, ec);
  172. }
  173. }
  174. }
  175. // Perform the operations corresponding to the ready file descriptors
  176. // contained in the given descriptor set.
  177. template <typename Descriptor_Set>
  178. void perform_operations_for_descriptors(const Descriptor_Set& descriptors,
  179. const boost::system::error_code& result)
  180. {
  181. typename operation_map::iterator i = operations_.begin();
  182. while (i != operations_.end())
  183. {
  184. typename operation_map::iterator op_iter = i++;
  185. if (descriptors.is_set(op_iter->first))
  186. {
  187. op_base* this_op = op_iter->second;
  188. op_iter->second = this_op->next_;
  189. this_op->next_ = complete_operations_;
  190. complete_operations_ = this_op;
  191. bool done = this_op->perform(result);
  192. if (done)
  193. {
  194. if (!op_iter->second)
  195. operations_.erase(op_iter);
  196. }
  197. else
  198. {
  199. // Operation has not finished yet, so leave at front of queue, and
  200. // remove from the completed list.
  201. complete_operations_ = this_op->next_;
  202. this_op->next_ = op_iter->second;
  203. op_iter->second = this_op;
  204. }
  205. }
  206. }
  207. }
  208. // Perform any pending cancels for operations.
  209. void perform_cancellations()
  210. {
  211. while (cancelled_operations_)
  212. {
  213. op_base* this_op = cancelled_operations_;
  214. cancelled_operations_ = this_op->next_;
  215. this_op->next_ = complete_operations_;
  216. complete_operations_ = this_op;
  217. this_op->perform(boost::asio::error::operation_aborted);
  218. }
  219. }
  220. // Complete all operations that are waiting to be completed.
  221. void complete_operations()
  222. {
  223. while (complete_operations_)
  224. {
  225. op_base* next_op = complete_operations_->next_;
  226. complete_operations_->next_ = 0;
  227. complete_operations_->complete();
  228. complete_operations_ = next_op;
  229. }
  230. }
  231. // Destroy all operations owned by the queue.
  232. void destroy_operations()
  233. {
  234. while (cancelled_operations_)
  235. {
  236. op_base* next_op = cancelled_operations_->next_;
  237. cancelled_operations_->next_ = 0;
  238. cancelled_operations_->destroy();
  239. cancelled_operations_ = next_op;
  240. }
  241. while (complete_operations_)
  242. {
  243. op_base* next_op = complete_operations_->next_;
  244. complete_operations_->next_ = 0;
  245. complete_operations_->destroy();
  246. complete_operations_ = next_op;
  247. }
  248. typename operation_map::iterator i = operations_.begin();
  249. while (i != operations_.end())
  250. {
  251. typename operation_map::iterator op_iter = i++;
  252. op_base* curr_op = op_iter->second;
  253. operations_.erase(op_iter);
  254. while (curr_op)
  255. {
  256. op_base* next_op = curr_op->next_;
  257. curr_op->next_ = 0;
  258. curr_op->destroy();
  259. curr_op = next_op;
  260. }
  261. }
  262. }
  263. private:
  264. // Base class for reactor operations. A function pointer is used instead of
  265. // virtual functions to avoid the associated overhead.
  266. class op_base
  267. {
  268. public:
  269. // Get the descriptor associated with the operation.
  270. Descriptor descriptor() const
  271. {
  272. return descriptor_;
  273. }
  274. // Perform the operation.
  275. bool perform(const boost::system::error_code& result)
  276. {
  277. result_ = result;
  278. return perform_func_(this, result_, bytes_transferred_);
  279. }
  280. // Destroy the operation and post the handler.
  281. void complete()
  282. {
  283. complete_func_(this, result_, bytes_transferred_);
  284. }
  285. // Destroy the operation.
  286. void destroy()
  287. {
  288. destroy_func_(this);
  289. }
  290. protected:
  291. typedef bool (*perform_func_type)(op_base*,
  292. boost::system::error_code&, std::size_t&);
  293. typedef void (*complete_func_type)(op_base*,
  294. const boost::system::error_code&, std::size_t);
  295. typedef void (*destroy_func_type)(op_base*);
  296. // Construct an operation for the given descriptor.
  297. op_base(perform_func_type perform_func, complete_func_type complete_func,
  298. destroy_func_type destroy_func, Descriptor descriptor)
  299. : perform_func_(perform_func),
  300. complete_func_(complete_func),
  301. destroy_func_(destroy_func),
  302. descriptor_(descriptor),
  303. result_(),
  304. bytes_transferred_(0),
  305. next_(0)
  306. {
  307. }
  308. // Prevent deletion through this type.
  309. ~op_base()
  310. {
  311. }
  312. private:
  313. friend class reactor_op_queue<Descriptor>;
  314. // The function to be called to perform the operation.
  315. perform_func_type perform_func_;
  316. // The function to be called to delete the operation and post the handler.
  317. complete_func_type complete_func_;
  318. // The function to be called to delete the operation.
  319. destroy_func_type destroy_func_;
  320. // The descriptor associated with the operation.
  321. Descriptor descriptor_;
  322. // The result of the operation.
  323. boost::system::error_code result_;
  324. // The number of bytes transferred in the operation.
  325. std::size_t bytes_transferred_;
  326. // The next operation for the same file descriptor.
  327. op_base* next_;
  328. };
  329. // Adaptor class template for operations.
  330. template <typename Operation>
  331. class op
  332. : public op_base
  333. {
  334. public:
  335. // Constructor.
  336. op(Descriptor descriptor, Operation operation)
  337. : op_base(&op<Operation>::do_perform, &op<Operation>::do_complete,
  338. &op<Operation>::do_destroy, descriptor),
  339. operation_(operation)
  340. {
  341. }
  342. // Perform the operation.
  343. static bool do_perform(op_base* base,
  344. boost::system::error_code& result, std::size_t& bytes_transferred)
  345. {
  346. return static_cast<op<Operation>*>(base)->operation_.perform(
  347. result, bytes_transferred);
  348. }
  349. // Destroy the operation and post the handler.
  350. static void do_complete(op_base* base,
  351. const boost::system::error_code& result, std::size_t bytes_transferred)
  352. {
  353. // Take ownership of the operation object.
  354. typedef op<Operation> this_type;
  355. this_type* this_op(static_cast<this_type*>(base));
  356. typedef handler_alloc_traits<Operation, this_type> alloc_traits;
  357. handler_ptr<alloc_traits> ptr(this_op->operation_, this_op);
  358. // Make a copy of the error_code and the operation so that the memory can
  359. // be deallocated before the upcall is made.
  360. boost::system::error_code ec(result);
  361. Operation operation(this_op->operation_);
  362. // Free the memory associated with the operation.
  363. ptr.reset();
  364. // Make the upcall.
  365. operation.complete(ec, bytes_transferred);
  366. }
  367. // Destroy the operation.
  368. static void do_destroy(op_base* base)
  369. {
  370. // Take ownership of the operation object.
  371. typedef op<Operation> this_type;
  372. this_type* this_op(static_cast<this_type*>(base));
  373. typedef handler_alloc_traits<Operation, this_type> alloc_traits;
  374. handler_ptr<alloc_traits> ptr(this_op->operation_, this_op);
  375. // A sub-object of the operation may be the true owner of the memory
  376. // associated with the operation. Consequently, a local copy of the
  377. // operation is required to ensure that any owning sub-object remains
  378. // valid until after we have deallocated the memory here.
  379. Operation operation(this_op->operation_);
  380. (void)operation;
  381. // Free the memory associated with the operation.
  382. ptr.reset();
  383. }
  384. private:
  385. Operation operation_;
  386. };
  387. // The type for a map of operations.
  388. typedef hash_map<Descriptor, op_base*> operation_map;
  389. // The operations that are currently executing asynchronously.
  390. operation_map operations_;
  391. // The list of operations that have been cancelled.
  392. op_base* cancelled_operations_;
  393. // The list of operations waiting to be completed.
  394. op_base* complete_operations_;
  395. };
  396. } // namespace detail
  397. } // namespace asio
  398. } // namespace boost
  399. #include <boost/asio/detail/pop_options.hpp>
  400. #endif // BOOST_ASIO_DETAIL_REACTOR_OP_QUEUE_HPP