kqueue_reactor.hpp 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714
  1. //
  2. // kqueue_reactor.hpp
  3. // ~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. // Copyright (c) 2005 Stefan Arentz (stefan at soze dot com)
  7. //
  8. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  9. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  10. //
  11. #ifndef BOOST_ASIO_DETAIL_KQUEUE_REACTOR_HPP
  12. #define BOOST_ASIO_DETAIL_KQUEUE_REACTOR_HPP
  13. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  14. # pragma once
  15. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  16. #include <boost/asio/detail/push_options.hpp>
  17. #include <boost/asio/detail/kqueue_reactor_fwd.hpp>
  18. #if defined(BOOST_ASIO_HAS_KQUEUE)
  19. #include <boost/asio/detail/push_options.hpp>
  20. #include <cstddef>
  21. #include <vector>
  22. #include <sys/types.h>
  23. #include <sys/event.h>
  24. #include <sys/time.h>
  25. #include <boost/config.hpp>
  26. #include <boost/date_time/posix_time/posix_time_types.hpp>
  27. #include <boost/throw_exception.hpp>
  28. #include <boost/system/system_error.hpp>
  29. #include <boost/asio/detail/pop_options.hpp>
  30. #include <boost/asio/error.hpp>
  31. #include <boost/asio/io_service.hpp>
  32. #include <boost/asio/detail/bind_handler.hpp>
  33. #include <boost/asio/detail/mutex.hpp>
  34. #include <boost/asio/detail/task_io_service.hpp>
  35. #include <boost/asio/detail/thread.hpp>
  36. #include <boost/asio/detail/reactor_op_queue.hpp>
  37. #include <boost/asio/detail/select_interrupter.hpp>
  38. #include <boost/asio/detail/service_base.hpp>
  39. #include <boost/asio/detail/signal_blocker.hpp>
  40. #include <boost/asio/detail/socket_types.hpp>
  41. #include <boost/asio/detail/timer_queue.hpp>
  42. // Older versions of Mac OS X may not define EV_OOBAND.
  43. #if !defined(EV_OOBAND)
  44. # define EV_OOBAND EV_FLAG1
  45. #endif // !defined(EV_OOBAND)
  46. namespace boost {
  47. namespace asio {
  48. namespace detail {
  49. template <bool Own_Thread>
  50. class kqueue_reactor
  51. : public boost::asio::detail::service_base<kqueue_reactor<Own_Thread> >
  52. {
  53. public:
  54. // Per-descriptor data.
  55. struct per_descriptor_data
  56. {
  57. bool allow_speculative_read;
  58. bool allow_speculative_write;
  59. };
  60. // Constructor.
  61. kqueue_reactor(boost::asio::io_service& io_service)
  62. : boost::asio::detail::service_base<
  63. kqueue_reactor<Own_Thread> >(io_service),
  64. mutex_(),
  65. kqueue_fd_(do_kqueue_create()),
  66. wait_in_progress_(false),
  67. interrupter_(),
  68. read_op_queue_(),
  69. write_op_queue_(),
  70. except_op_queue_(),
  71. pending_cancellations_(),
  72. stop_thread_(false),
  73. thread_(0),
  74. shutdown_(false),
  75. need_kqueue_wait_(true)
  76. {
  77. // Start the reactor's internal thread only if needed.
  78. if (Own_Thread)
  79. {
  80. boost::asio::detail::signal_blocker sb;
  81. thread_ = new boost::asio::detail::thread(
  82. bind_handler(&kqueue_reactor::call_run_thread, this));
  83. }
  84. // Add the interrupter's descriptor to the kqueue.
  85. struct kevent event;
  86. EV_SET(&event, interrupter_.read_descriptor(),
  87. EVFILT_READ, EV_ADD, 0, 0, 0);
  88. ::kevent(kqueue_fd_, &event, 1, 0, 0, 0);
  89. }
  90. // Destructor.
  91. ~kqueue_reactor()
  92. {
  93. shutdown_service();
  94. close(kqueue_fd_);
  95. }
  96. // Destroy all user-defined handler objects owned by the service.
  97. void shutdown_service()
  98. {
  99. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  100. shutdown_ = true;
  101. stop_thread_ = true;
  102. lock.unlock();
  103. if (thread_)
  104. {
  105. interrupter_.interrupt();
  106. thread_->join();
  107. delete thread_;
  108. thread_ = 0;
  109. }
  110. read_op_queue_.destroy_operations();
  111. write_op_queue_.destroy_operations();
  112. except_op_queue_.destroy_operations();
  113. for (std::size_t i = 0; i < timer_queues_.size(); ++i)
  114. timer_queues_[i]->destroy_timers();
  115. timer_queues_.clear();
  116. }
  117. // Initialise the task, but only if the reactor is not in its own thread.
  118. void init_task()
  119. {
  120. if (!Own_Thread)
  121. {
  122. typedef task_io_service<kqueue_reactor<Own_Thread> > task_io_service_type;
  123. use_service<task_io_service_type>(this->get_io_service()).init_task();
  124. }
  125. }
  126. // Register a socket with the reactor. Returns 0 on success, system error
  127. // code on failure.
  128. int register_descriptor(socket_type, per_descriptor_data& descriptor_data)
  129. {
  130. descriptor_data.allow_speculative_read = true;
  131. descriptor_data.allow_speculative_write = true;
  132. return 0;
  133. }
  134. // Start a new read operation. The handler object will be invoked when the
  135. // given descriptor is ready to be read, or an error has occurred.
  136. template <typename Handler>
  137. void start_read_op(socket_type descriptor,
  138. per_descriptor_data& descriptor_data, Handler handler,
  139. bool allow_speculative_read = true)
  140. {
  141. if (allow_speculative_read && descriptor_data.allow_speculative_read)
  142. {
  143. boost::system::error_code ec;
  144. std::size_t bytes_transferred = 0;
  145. if (handler.perform(ec, bytes_transferred))
  146. {
  147. handler.complete(ec, bytes_transferred);
  148. return;
  149. }
  150. // We only get one shot at a speculative read in this function.
  151. allow_speculative_read = false;
  152. }
  153. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  154. if (shutdown_)
  155. return;
  156. if (!allow_speculative_read)
  157. need_kqueue_wait_ = true;
  158. else if (!read_op_queue_.has_operation(descriptor))
  159. {
  160. // Speculative reads are ok as there are no queued read operations.
  161. descriptor_data.allow_speculative_read = true;
  162. boost::system::error_code ec;
  163. std::size_t bytes_transferred = 0;
  164. if (handler.perform(ec, bytes_transferred))
  165. {
  166. handler.complete(ec, bytes_transferred);
  167. return;
  168. }
  169. }
  170. // Speculative reads are not ok as there will be queued read operations.
  171. descriptor_data.allow_speculative_read = false;
  172. if (read_op_queue_.enqueue_operation(descriptor, handler))
  173. {
  174. struct kevent event;
  175. EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, 0, 0, 0);
  176. if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
  177. {
  178. boost::system::error_code ec(errno,
  179. boost::asio::error::get_system_category());
  180. read_op_queue_.perform_all_operations(descriptor, ec);
  181. }
  182. }
  183. }
  184. // Start a new write operation. The handler object will be invoked when the
  185. // given descriptor is ready to be written, or an error has occurred.
  186. template <typename Handler>
  187. void start_write_op(socket_type descriptor,
  188. per_descriptor_data& descriptor_data, Handler handler,
  189. bool allow_speculative_write = true)
  190. {
  191. if (allow_speculative_write && descriptor_data.allow_speculative_write)
  192. {
  193. boost::system::error_code ec;
  194. std::size_t bytes_transferred = 0;
  195. if (handler.perform(ec, bytes_transferred))
  196. {
  197. handler.complete(ec, bytes_transferred);
  198. return;
  199. }
  200. // We only get one shot at a speculative write in this function.
  201. allow_speculative_write = false;
  202. }
  203. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  204. if (shutdown_)
  205. return;
  206. if (!allow_speculative_write)
  207. need_kqueue_wait_ = true;
  208. else if (!write_op_queue_.has_operation(descriptor))
  209. {
  210. // Speculative writes are ok as there are no queued write operations.
  211. descriptor_data.allow_speculative_write = true;
  212. boost::system::error_code ec;
  213. std::size_t bytes_transferred = 0;
  214. if (handler.perform(ec, bytes_transferred))
  215. {
  216. handler.complete(ec, bytes_transferred);
  217. return;
  218. }
  219. }
  220. // Speculative writes are not ok as there will be queued write operations.
  221. descriptor_data.allow_speculative_write = false;
  222. if (write_op_queue_.enqueue_operation(descriptor, handler))
  223. {
  224. struct kevent event;
  225. EV_SET(&event, descriptor, EVFILT_WRITE, EV_ADD, 0, 0, 0);
  226. if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
  227. {
  228. boost::system::error_code ec(errno,
  229. boost::asio::error::get_system_category());
  230. write_op_queue_.perform_all_operations(descriptor, ec);
  231. }
  232. }
  233. }
  234. // Start a new exception operation. The handler object will be invoked when
  235. // the given descriptor has exception information, or an error has occurred.
  236. template <typename Handler>
  237. void start_except_op(socket_type descriptor,
  238. per_descriptor_data&, Handler handler)
  239. {
  240. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  241. if (shutdown_)
  242. return;
  243. if (except_op_queue_.enqueue_operation(descriptor, handler))
  244. {
  245. struct kevent event;
  246. if (read_op_queue_.has_operation(descriptor))
  247. EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, 0, 0, 0);
  248. else
  249. EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, EV_OOBAND, 0, 0);
  250. if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
  251. {
  252. boost::system::error_code ec(errno,
  253. boost::asio::error::get_system_category());
  254. except_op_queue_.perform_all_operations(descriptor, ec);
  255. }
  256. }
  257. }
  258. // Start a new write operation. The handler object will be invoked when the
  259. // given descriptor is ready to be written, or an error has occurred.
  260. template <typename Handler>
  261. void start_connect_op(socket_type descriptor,
  262. per_descriptor_data& descriptor_data, Handler handler)
  263. {
  264. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  265. if (shutdown_)
  266. return;
  267. // Speculative writes are not ok as there will be queued write operations.
  268. descriptor_data.allow_speculative_write = false;
  269. if (write_op_queue_.enqueue_operation(descriptor, handler))
  270. {
  271. struct kevent event;
  272. EV_SET(&event, descriptor, EVFILT_WRITE, EV_ADD, 0, 0, 0);
  273. if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
  274. {
  275. boost::system::error_code ec(errno,
  276. boost::asio::error::get_system_category());
  277. write_op_queue_.perform_all_operations(descriptor, ec);
  278. }
  279. }
  280. }
  281. // Cancel all operations associated with the given descriptor. The
  282. // handlers associated with the descriptor will be invoked with the
  283. // operation_aborted error.
  284. void cancel_ops(socket_type descriptor, per_descriptor_data&)
  285. {
  286. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  287. cancel_ops_unlocked(descriptor);
  288. }
  289. // Cancel any operations that are running against the descriptor and remove
  290. // its registration from the reactor.
  291. void close_descriptor(socket_type descriptor, per_descriptor_data&)
  292. {
  293. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  294. // Remove the descriptor from kqueue.
  295. struct kevent event[2];
  296. EV_SET(&event[0], descriptor, EVFILT_READ, EV_DELETE, 0, 0, 0);
  297. EV_SET(&event[1], descriptor, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
  298. ::kevent(kqueue_fd_, event, 2, 0, 0, 0);
  299. // Cancel any outstanding operations associated with the descriptor.
  300. cancel_ops_unlocked(descriptor);
  301. }
  302. // Add a new timer queue to the reactor.
  303. template <typename Time_Traits>
  304. void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
  305. {
  306. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  307. timer_queues_.push_back(&timer_queue);
  308. }
  309. // Remove a timer queue from the reactor.
  310. template <typename Time_Traits>
  311. void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
  312. {
  313. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  314. for (std::size_t i = 0; i < timer_queues_.size(); ++i)
  315. {
  316. if (timer_queues_[i] == &timer_queue)
  317. {
  318. timer_queues_.erase(timer_queues_.begin() + i);
  319. return;
  320. }
  321. }
  322. }
  323. // Schedule a timer in the given timer queue to expire at the specified
  324. // absolute time. The handler object will be invoked when the timer expires.
  325. template <typename Time_Traits, typename Handler>
  326. void schedule_timer(timer_queue<Time_Traits>& timer_queue,
  327. const typename Time_Traits::time_type& time, Handler handler, void* token)
  328. {
  329. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  330. if (!shutdown_)
  331. if (timer_queue.enqueue_timer(time, handler, token))
  332. interrupter_.interrupt();
  333. }
  334. // Cancel the timer associated with the given token. Returns the number of
  335. // handlers that have been posted or dispatched.
  336. template <typename Time_Traits>
  337. std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
  338. {
  339. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  340. std::size_t n = timer_queue.cancel_timer(token);
  341. if (n > 0)
  342. interrupter_.interrupt();
  343. return n;
  344. }
  345. private:
  346. friend class task_io_service<kqueue_reactor<Own_Thread> >;
  347. // Run the kqueue loop.
  348. void run(bool block)
  349. {
  350. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  351. // Dispatch any operation cancellations that were made while the select
  352. // loop was not running.
  353. read_op_queue_.perform_cancellations();
  354. write_op_queue_.perform_cancellations();
  355. except_op_queue_.perform_cancellations();
  356. for (std::size_t i = 0; i < timer_queues_.size(); ++i)
  357. timer_queues_[i]->dispatch_cancellations();
  358. // Check if the thread is supposed to stop.
  359. if (stop_thread_)
  360. {
  361. complete_operations_and_timers(lock);
  362. return;
  363. }
  364. // We can return immediately if there's no work to do and the reactor is
  365. // not supposed to block.
  366. if (!block && read_op_queue_.empty() && write_op_queue_.empty()
  367. && except_op_queue_.empty() && all_timer_queues_are_empty())
  368. {
  369. complete_operations_and_timers(lock);
  370. return;
  371. }
  372. // Determine how long to block while waiting for events.
  373. timespec timeout_buf = { 0, 0 };
  374. timespec* timeout = block ? get_timeout(timeout_buf) : &timeout_buf;
  375. wait_in_progress_ = true;
  376. lock.unlock();
  377. // Block on the kqueue descriptor.
  378. struct kevent events[128];
  379. int num_events = (block || need_kqueue_wait_)
  380. ? kevent(kqueue_fd_, 0, 0, events, 128, timeout)
  381. : 0;
  382. lock.lock();
  383. wait_in_progress_ = false;
  384. // Block signals while performing operations.
  385. boost::asio::detail::signal_blocker sb;
  386. // Dispatch the waiting events.
  387. for (int i = 0; i < num_events; ++i)
  388. {
  389. int descriptor = events[i].ident;
  390. if (descriptor == interrupter_.read_descriptor())
  391. {
  392. interrupter_.reset();
  393. }
  394. else if (events[i].filter == EVFILT_READ)
  395. {
  396. // Dispatch operations associated with the descriptor.
  397. bool more_reads = false;
  398. bool more_except = false;
  399. if (events[i].flags & EV_ERROR)
  400. {
  401. boost::system::error_code error(
  402. events[i].data, boost::asio::error::get_system_category());
  403. except_op_queue_.perform_all_operations(descriptor, error);
  404. read_op_queue_.perform_all_operations(descriptor, error);
  405. }
  406. else if (events[i].flags & EV_OOBAND)
  407. {
  408. boost::system::error_code error;
  409. more_except = except_op_queue_.perform_operation(descriptor, error);
  410. if (events[i].data > 0)
  411. more_reads = read_op_queue_.perform_operation(descriptor, error);
  412. else
  413. more_reads = read_op_queue_.has_operation(descriptor);
  414. }
  415. else
  416. {
  417. boost::system::error_code error;
  418. more_reads = read_op_queue_.perform_operation(descriptor, error);
  419. more_except = except_op_queue_.has_operation(descriptor);
  420. }
  421. // Update the descriptor in the kqueue.
  422. struct kevent event;
  423. if (more_reads)
  424. EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, 0, 0, 0);
  425. else if (more_except)
  426. EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, EV_OOBAND, 0, 0);
  427. else
  428. EV_SET(&event, descriptor, EVFILT_READ, EV_DELETE, 0, 0, 0);
  429. if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
  430. {
  431. boost::system::error_code error(errno,
  432. boost::asio::error::get_system_category());
  433. except_op_queue_.perform_all_operations(descriptor, error);
  434. read_op_queue_.perform_all_operations(descriptor, error);
  435. }
  436. }
  437. else if (events[i].filter == EVFILT_WRITE)
  438. {
  439. // Dispatch operations associated with the descriptor.
  440. bool more_writes = false;
  441. if (events[i].flags & EV_ERROR)
  442. {
  443. boost::system::error_code error(
  444. events[i].data, boost::asio::error::get_system_category());
  445. write_op_queue_.perform_all_operations(descriptor, error);
  446. }
  447. else
  448. {
  449. boost::system::error_code error;
  450. more_writes = write_op_queue_.perform_operation(descriptor, error);
  451. }
  452. // Update the descriptor in the kqueue.
  453. struct kevent event;
  454. if (more_writes)
  455. EV_SET(&event, descriptor, EVFILT_WRITE, EV_ADD, 0, 0, 0);
  456. else
  457. EV_SET(&event, descriptor, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
  458. if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
  459. {
  460. boost::system::error_code error(errno,
  461. boost::asio::error::get_system_category());
  462. write_op_queue_.perform_all_operations(descriptor, error);
  463. }
  464. }
  465. }
  466. read_op_queue_.perform_cancellations();
  467. write_op_queue_.perform_cancellations();
  468. except_op_queue_.perform_cancellations();
  469. for (std::size_t i = 0; i < timer_queues_.size(); ++i)
  470. {
  471. timer_queues_[i]->dispatch_timers();
  472. timer_queues_[i]->dispatch_cancellations();
  473. }
  474. // Issue any pending cancellations.
  475. for (std::size_t i = 0; i < pending_cancellations_.size(); ++i)
  476. cancel_ops_unlocked(pending_cancellations_[i]);
  477. pending_cancellations_.clear();
  478. // Determine whether kqueue needs to be called next time the reactor is run.
  479. need_kqueue_wait_ = !read_op_queue_.empty()
  480. || !write_op_queue_.empty() || !except_op_queue_.empty();
  481. complete_operations_and_timers(lock);
  482. }
  483. // Run the select loop in the thread.
  484. void run_thread()
  485. {
  486. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  487. while (!stop_thread_)
  488. {
  489. lock.unlock();
  490. run(true);
  491. lock.lock();
  492. }
  493. }
  494. // Entry point for the select loop thread.
  495. static void call_run_thread(kqueue_reactor* reactor)
  496. {
  497. reactor->run_thread();
  498. }
  499. // Interrupt the select loop.
  500. void interrupt()
  501. {
  502. interrupter_.interrupt();
  503. }
  504. // Create the kqueue file descriptor. Throws an exception if the descriptor
  505. // cannot be created.
  506. static int do_kqueue_create()
  507. {
  508. int fd = kqueue();
  509. if (fd == -1)
  510. {
  511. boost::throw_exception(
  512. boost::system::system_error(
  513. boost::system::error_code(errno,
  514. boost::asio::error::get_system_category()),
  515. "kqueue"));
  516. }
  517. return fd;
  518. }
  519. // Check if all timer queues are empty.
  520. bool all_timer_queues_are_empty() const
  521. {
  522. for (std::size_t i = 0; i < timer_queues_.size(); ++i)
  523. if (!timer_queues_[i]->empty())
  524. return false;
  525. return true;
  526. }
  527. // Get the timeout value for the kevent call.
  528. timespec* get_timeout(timespec& ts)
  529. {
  530. if (all_timer_queues_are_empty())
  531. return 0;
  532. // By default we will wait no longer than 5 minutes. This will ensure that
  533. // any changes to the system clock are detected after no longer than this.
  534. boost::posix_time::time_duration minimum_wait_duration
  535. = boost::posix_time::minutes(5);
  536. for (std::size_t i = 0; i < timer_queues_.size(); ++i)
  537. {
  538. boost::posix_time::time_duration wait_duration
  539. = timer_queues_[i]->wait_duration();
  540. if (wait_duration < minimum_wait_duration)
  541. minimum_wait_duration = wait_duration;
  542. }
  543. if (minimum_wait_duration > boost::posix_time::time_duration())
  544. {
  545. ts.tv_sec = minimum_wait_duration.total_seconds();
  546. ts.tv_nsec = minimum_wait_duration.total_nanoseconds() % 1000000000;
  547. }
  548. else
  549. {
  550. ts.tv_sec = 0;
  551. ts.tv_nsec = 0;
  552. }
  553. return &ts;
  554. }
  555. // Cancel all operations associated with the given descriptor. The do_cancel
  556. // function of the handler objects will be invoked. This function does not
  557. // acquire the kqueue_reactor's mutex.
  558. void cancel_ops_unlocked(socket_type descriptor)
  559. {
  560. bool interrupt = read_op_queue_.cancel_operations(descriptor);
  561. interrupt = write_op_queue_.cancel_operations(descriptor) || interrupt;
  562. interrupt = except_op_queue_.cancel_operations(descriptor) || interrupt;
  563. if (interrupt)
  564. interrupter_.interrupt();
  565. }
  566. // Clean up operations and timers. We must not hold the lock since the
  567. // destructors may make calls back into this reactor. We make a copy of the
  568. // vector of timer queues since the original may be modified while the lock
  569. // is not held.
  570. void complete_operations_and_timers(
  571. boost::asio::detail::mutex::scoped_lock& lock)
  572. {
  573. timer_queues_for_cleanup_ = timer_queues_;
  574. lock.unlock();
  575. read_op_queue_.complete_operations();
  576. write_op_queue_.complete_operations();
  577. except_op_queue_.complete_operations();
  578. for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i)
  579. timer_queues_for_cleanup_[i]->complete_timers();
  580. }
  581. // Mutex to protect access to internal data.
  582. boost::asio::detail::mutex mutex_;
  583. // The kqueue file descriptor.
  584. int kqueue_fd_;
  585. // Whether the kqueue wait call is currently in progress
  586. bool wait_in_progress_;
  587. // The interrupter is used to break a blocking kevent call.
  588. select_interrupter interrupter_;
  589. // The queue of read operations.
  590. reactor_op_queue<socket_type> read_op_queue_;
  591. // The queue of write operations.
  592. reactor_op_queue<socket_type> write_op_queue_;
  593. // The queue of except operations.
  594. reactor_op_queue<socket_type> except_op_queue_;
  595. // The timer queues.
  596. std::vector<timer_queue_base*> timer_queues_;
  597. // A copy of the timer queues, used when cleaning up timers. The copy is
  598. // stored as a class data member to avoid unnecessary memory allocation.
  599. std::vector<timer_queue_base*> timer_queues_for_cleanup_;
  600. // The descriptors that are pending cancellation.
  601. std::vector<socket_type> pending_cancellations_;
  602. // Does the reactor loop thread need to stop.
  603. bool stop_thread_;
  604. // The thread that is running the reactor loop.
  605. boost::asio::detail::thread* thread_;
  606. // Whether the service has been shut down.
  607. bool shutdown_;
  608. // Whether we need to call kqueue the next time the reactor is run.
  609. bool need_kqueue_wait_;
  610. };
  611. } // namespace detail
  612. } // namespace asio
  613. } // namespace boost
  614. #endif // defined(BOOST_ASIO_HAS_KQUEUE)
  615. #include <boost/asio/detail/pop_options.hpp>
  616. #endif // BOOST_ASIO_DETAIL_KQUEUE_REACTOR_HPP