epoll_reactor.hpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. //
  2. // epoll_reactor.hpp
  3. // ~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2010 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef ASIO_DETAIL_EPOLL_REACTOR_HPP
  11. #define ASIO_DETAIL_EPOLL_REACTOR_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include "asio/detail/push_options.hpp"
  16. #include "asio/detail/epoll_reactor_fwd.hpp"
  17. #if defined(ASIO_HAS_EPOLL)
  18. #include "asio/detail/push_options.hpp"
  19. #include <cstddef>
  20. #include <sys/epoll.h>
  21. #include <boost/config.hpp>
  22. #include <boost/throw_exception.hpp>
  23. #include "asio/detail/pop_options.hpp"
  24. #include "asio/error.hpp"
  25. #include "asio/io_service.hpp"
  26. #include "asio/system_error.hpp"
  27. #include "asio/detail/hash_map.hpp"
  28. #include "asio/detail/mutex.hpp"
  29. #include "asio/detail/op_queue.hpp"
  30. #include "asio/detail/reactor_op.hpp"
  31. #include "asio/detail/select_interrupter.hpp"
  32. #include "asio/detail/service_base.hpp"
  33. #include "asio/detail/socket_types.hpp"
  34. #include "asio/detail/timer_op.hpp"
  35. #include "asio/detail/timer_queue_base.hpp"
  36. #include "asio/detail/timer_queue_fwd.hpp"
  37. #include "asio/detail/timer_queue_set.hpp"
  38. #if (__GLIBC__ > 2) || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 8)
  39. # define ASIO_HAS_TIMERFD 1
  40. #endif // (__GLIBC__ > 2) || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 8)
  41. #if defined(ASIO_HAS_TIMERFD)
  42. # include "asio/detail/push_options.hpp"
  43. # include <sys/timerfd.h>
  44. # include "asio/detail/pop_options.hpp"
  45. #endif // defined(ASIO_HAS_TIMERFD)
  46. namespace asio {
  47. namespace detail {
  48. class epoll_reactor
  49. : public asio::detail::service_base<epoll_reactor>
  50. {
  51. public:
  52. enum { read_op = 0, write_op = 1,
  53. connect_op = 1, except_op = 2, max_ops = 3 };
  54. // Per-descriptor queues.
  55. struct descriptor_state
  56. {
  57. descriptor_state() {}
  58. descriptor_state(const descriptor_state&) {}
  59. void operator=(const descriptor_state&) {}
  60. mutex mutex_;
  61. op_queue<reactor_op> op_queue_[max_ops];
  62. bool shutdown_;
  63. };
  64. // Per-descriptor data.
  65. typedef descriptor_state* per_descriptor_data;
  66. // Constructor.
  67. epoll_reactor(asio::io_service& io_service)
  68. : asio::detail::service_base<epoll_reactor>(io_service),
  69. io_service_(use_service<io_service_impl>(io_service)),
  70. mutex_(),
  71. epoll_fd_(do_epoll_create()),
  72. #if defined(ASIO_HAS_TIMERFD)
  73. timer_fd_(timerfd_create(CLOCK_MONOTONIC, 0)),
  74. #else // defined(ASIO_HAS_TIMERFD)
  75. timer_fd_(-1),
  76. #endif // defined(ASIO_HAS_TIMERFD)
  77. interrupter_(),
  78. shutdown_(false)
  79. {
  80. // Add the interrupter's descriptor to epoll.
  81. epoll_event ev = { 0, { 0 } };
  82. ev.events = EPOLLIN | EPOLLERR | EPOLLET;
  83. ev.data.ptr = &interrupter_;
  84. epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);
  85. interrupter_.interrupt();
  86. // Add the timer descriptor to epoll.
  87. if (timer_fd_ != -1)
  88. {
  89. ev.events = EPOLLIN | EPOLLERR;
  90. ev.data.ptr = &timer_fd_;
  91. epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev);
  92. }
  93. }
  94. // Destructor.
  95. ~epoll_reactor()
  96. {
  97. close(epoll_fd_);
  98. if (timer_fd_ != -1)
  99. close(timer_fd_);
  100. }
  101. // Destroy all user-defined handler objects owned by the service.
  102. void shutdown_service()
  103. {
  104. mutex::scoped_lock lock(mutex_);
  105. shutdown_ = true;
  106. lock.unlock();
  107. op_queue<operation> ops;
  108. descriptor_map::iterator iter = registered_descriptors_.begin();
  109. descriptor_map::iterator end = registered_descriptors_.end();
  110. while (iter != end)
  111. {
  112. for (int i = 0; i < max_ops; ++i)
  113. ops.push(iter->second.op_queue_[i]);
  114. iter->second.shutdown_ = true;
  115. ++iter;
  116. }
  117. timer_queues_.get_all_timers(ops);
  118. }
  119. // Initialise the task.
  120. void init_task()
  121. {
  122. io_service_.init_task();
  123. }
  124. // Register a socket with the reactor. Returns 0 on success, system error
  125. // code on failure.
  126. int register_descriptor(socket_type descriptor,
  127. per_descriptor_data& descriptor_data)
  128. {
  129. mutex::scoped_lock lock(registered_descriptors_mutex_);
  130. descriptor_map::iterator new_entry = registered_descriptors_.insert(
  131. std::make_pair(descriptor, descriptor_state())).first;
  132. descriptor_data = &new_entry->second;
  133. epoll_event ev = { 0, { 0 } };
  134. ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLOUT | EPOLLPRI | EPOLLET;
  135. ev.data.ptr = descriptor_data;
  136. int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
  137. if (result != 0)
  138. return errno;
  139. descriptor_data->shutdown_ = false;
  140. return 0;
  141. }
  142. // Start a new operation. The reactor operation will be performed when the
  143. // given descriptor is flagged as ready, or an error has occurred.
  144. void start_op(int op_type, socket_type descriptor,
  145. per_descriptor_data& descriptor_data,
  146. reactor_op* op, bool allow_speculative)
  147. {
  148. mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
  149. if (descriptor_data->shutdown_)
  150. return;
  151. if (descriptor_data->op_queue_[op_type].empty())
  152. {
  153. if (allow_speculative
  154. && (op_type != read_op
  155. || descriptor_data->op_queue_[except_op].empty()))
  156. {
  157. if (op->perform())
  158. {
  159. descriptor_lock.unlock();
  160. io_service_.post_immediate_completion(op);
  161. return;
  162. }
  163. }
  164. else
  165. {
  166. epoll_event ev = { 0, { 0 } };
  167. ev.events = EPOLLIN | EPOLLERR | EPOLLHUP
  168. | EPOLLOUT | EPOLLPRI | EPOLLET;
  169. ev.data.ptr = descriptor_data;
  170. epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
  171. }
  172. }
  173. descriptor_data->op_queue_[op_type].push(op);
  174. io_service_.work_started();
  175. }
  176. // Cancel all operations associated with the given descriptor. The
  177. // handlers associated with the descriptor will be invoked with the
  178. // operation_aborted error.
  179. void cancel_ops(socket_type descriptor, per_descriptor_data& descriptor_data)
  180. {
  181. mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
  182. op_queue<operation> ops;
  183. for (int i = 0; i < max_ops; ++i)
  184. {
  185. while (reactor_op* op = descriptor_data->op_queue_[i].front())
  186. {
  187. op->ec_ = asio::error::operation_aborted;
  188. descriptor_data->op_queue_[i].pop();
  189. ops.push(op);
  190. }
  191. }
  192. descriptor_lock.unlock();
  193. io_service_.post_deferred_completions(ops);
  194. }
  195. // Cancel any operations that are running against the descriptor and remove
  196. // its registration from the reactor.
  197. void close_descriptor(socket_type descriptor,
  198. per_descriptor_data& descriptor_data)
  199. {
  200. mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
  201. mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
  202. // Remove the descriptor from the set of known descriptors. The descriptor
  203. // will be automatically removed from the epoll set when it is closed.
  204. descriptor_data->shutdown_ = true;
  205. op_queue<operation> ops;
  206. for (int i = 0; i < max_ops; ++i)
  207. {
  208. while (reactor_op* op = descriptor_data->op_queue_[i].front())
  209. {
  210. op->ec_ = asio::error::operation_aborted;
  211. descriptor_data->op_queue_[i].pop();
  212. ops.push(op);
  213. }
  214. }
  215. descriptor_lock.unlock();
  216. registered_descriptors_.erase(descriptor);
  217. descriptors_lock.unlock();
  218. io_service_.post_deferred_completions(ops);
  219. }
  220. // Add a new timer queue to the reactor.
  221. template <typename Time_Traits>
  222. void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
  223. {
  224. mutex::scoped_lock lock(mutex_);
  225. timer_queues_.insert(&timer_queue);
  226. }
  227. // Remove a timer queue from the reactor.
  228. template <typename Time_Traits>
  229. void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
  230. {
  231. mutex::scoped_lock lock(mutex_);
  232. timer_queues_.erase(&timer_queue);
  233. }
  234. // Schedule a new operation in the given timer queue to expire at the
  235. // specified absolute time.
  236. template <typename Time_Traits>
  237. void schedule_timer(timer_queue<Time_Traits>& timer_queue,
  238. const typename Time_Traits::time_type& time, timer_op* op, void* token)
  239. {
  240. mutex::scoped_lock lock(mutex_);
  241. if (!shutdown_)
  242. {
  243. bool earliest = timer_queue.enqueue_timer(time, op, token);
  244. io_service_.work_started();
  245. if (earliest)
  246. {
  247. #if defined(ASIO_HAS_TIMERFD)
  248. if (timer_fd_ != -1)
  249. {
  250. itimerspec new_timeout;
  251. itimerspec old_timeout;
  252. int flags = get_timeout(new_timeout);
  253. timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);
  254. return;
  255. }
  256. #endif // defined(ASIO_HAS_TIMERFD)
  257. interrupter_.interrupt();
  258. }
  259. }
  260. }
  261. // Cancel the timer operations associated with the given token. Returns the
  262. // number of operations that have been posted or dispatched.
  263. template <typename Time_Traits>
  264. std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
  265. {
  266. mutex::scoped_lock lock(mutex_);
  267. op_queue<operation> ops;
  268. std::size_t n = timer_queue.cancel_timer(token, ops);
  269. lock.unlock();
  270. io_service_.post_deferred_completions(ops);
  271. return n;
  272. }
  273. // Run epoll once until interrupted or events are ready to be dispatched.
  274. void run(bool block, op_queue<operation>& ops)
  275. {
  276. // Calculate a timeout only if timerfd is not used.
  277. int timeout;
  278. if (timer_fd_ != -1)
  279. timeout = block ? -1 : 0;
  280. else
  281. {
  282. mutex::scoped_lock lock(mutex_);
  283. timeout = block ? get_timeout() : 0;
  284. }
  285. // Block on the epoll descriptor.
  286. epoll_event events[128];
  287. int num_events = epoll_wait(epoll_fd_, events, 128, timeout);
  288. #if defined(ASIO_HAS_TIMERFD)
  289. bool check_timers = (timer_fd_ == -1);
  290. #else // defined(ASIO_HAS_TIMERFD)
  291. bool check_timers = true;
  292. #endif // defined(ASIO_HAS_TIMERFD)
  293. // Dispatch the waiting events.
  294. for (int i = 0; i < num_events; ++i)
  295. {
  296. void* ptr = events[i].data.ptr;
  297. if (ptr == &interrupter_)
  298. {
  299. // No need to reset the interrupter since we're leaving the descriptor
  300. // in a ready-to-read state and relying on edge-triggered notifications
  301. // to make it so that we only get woken up when the descriptor's epoll
  302. // registration is updated.
  303. #if defined(ASIO_HAS_TIMERFD)
  304. if (timer_fd_ == -1)
  305. check_timers = true;
  306. #else // defined(ASIO_HAS_TIMERFD)
  307. check_timers = true;
  308. #endif // defined(ASIO_HAS_TIMERFD)
  309. }
  310. #if defined(ASIO_HAS_TIMERFD)
  311. else if (ptr == &timer_fd_)
  312. {
  313. check_timers = true;
  314. }
  315. #endif // defined(ASIO_HAS_TIMERFD)
  316. else
  317. {
  318. descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
  319. mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
  320. // Exception operations must be processed first to ensure that any
  321. // out-of-band data is read before normal data.
  322. static const int flag[max_ops] = { EPOLLIN, EPOLLOUT, EPOLLPRI };
  323. for (int j = max_ops - 1; j >= 0; --j)
  324. {
  325. if (events[i].events & (flag[j] | EPOLLERR | EPOLLHUP))
  326. {
  327. while (reactor_op* op = descriptor_data->op_queue_[j].front())
  328. {
  329. if (op->perform())
  330. {
  331. descriptor_data->op_queue_[j].pop();
  332. ops.push(op);
  333. }
  334. else
  335. break;
  336. }
  337. }
  338. }
  339. }
  340. }
  341. if (check_timers)
  342. {
  343. mutex::scoped_lock common_lock(mutex_);
  344. timer_queues_.get_ready_timers(ops);
  345. #if defined(ASIO_HAS_TIMERFD)
  346. if (timer_fd_ != -1)
  347. {
  348. itimerspec new_timeout;
  349. itimerspec old_timeout;
  350. int flags = get_timeout(new_timeout);
  351. timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);
  352. }
  353. #endif // defined(ASIO_HAS_TIMERFD)
  354. }
  355. }
  356. // Interrupt the select loop.
  357. void interrupt()
  358. {
  359. epoll_event ev = { 0, { 0 } };
  360. ev.events = EPOLLIN | EPOLLERR | EPOLLET;
  361. ev.data.ptr = &interrupter_;
  362. epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, interrupter_.read_descriptor(), &ev);
  363. }
  364. private:
  365. // The hint to pass to epoll_create to size its data structures.
  366. enum { epoll_size = 20000 };
  367. // Create the epoll file descriptor. Throws an exception if the descriptor
  368. // cannot be created.
  369. static int do_epoll_create()
  370. {
  371. int fd = epoll_create(epoll_size);
  372. if (fd == -1)
  373. {
  374. boost::throw_exception(
  375. asio::system_error(
  376. asio::error_code(errno,
  377. asio::error::get_system_category()),
  378. "epoll"));
  379. }
  380. return fd;
  381. }
  382. // Get the timeout value for the epoll_wait call. The timeout value is
  383. // returned as a number of milliseconds. A return value of -1 indicates
  384. // that epoll_wait should block indefinitely.
  385. int get_timeout()
  386. {
  387. // By default we will wait no longer than 5 minutes. This will ensure that
  388. // any changes to the system clock are detected after no longer than this.
  389. return timer_queues_.wait_duration_msec(5 * 60 * 1000);
  390. }
  391. #if defined(ASIO_HAS_TIMERFD)
  392. // Get the timeout value for the timer descriptor. The return value is the
  393. // flag argument to be used when calling timerfd_settime.
  394. int get_timeout(itimerspec& ts)
  395. {
  396. ts.it_interval.tv_sec = 0;
  397. ts.it_interval.tv_nsec = 0;
  398. long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
  399. ts.it_value.tv_sec = usec / 1000000;
  400. ts.it_value.tv_nsec = usec ? (usec % 1000000) * 1000 : 1;
  401. return usec ? 0 : TFD_TIMER_ABSTIME;
  402. }
  403. #endif // defined(ASIO_HAS_TIMERFD)
  404. // The io_service implementation used to post completions.
  405. io_service_impl& io_service_;
  406. // Mutex to protect access to internal data.
  407. mutex mutex_;
  408. // The epoll file descriptor.
  409. int epoll_fd_;
  410. // The timer file descriptor.
  411. int timer_fd_;
  412. // The interrupter is used to break a blocking epoll_wait call.
  413. select_interrupter interrupter_;
  414. // The timer queues.
  415. timer_queue_set timer_queues_;
  416. // Whether the service has been shut down.
  417. bool shutdown_;
  418. // Mutex to protect access to the registered descriptors.
  419. mutex registered_descriptors_mutex_;
  420. // Keep track of all registered descriptors. This code relies on the fact that
  421. // the hash_map implementation pools deleted nodes, meaning that we can assume
  422. // our descriptor_state pointer remains valid even after the entry is removed.
  423. // Technically this is not true for C++98, as that standard says that spliced
  424. // elements in a list are invalidated. However, C++0x fixes this shortcoming
  425. // so we'll just assume that C++98 std::list implementations will do the right
  426. // thing anyway.
  427. typedef detail::hash_map<socket_type, descriptor_state> descriptor_map;
  428. descriptor_map registered_descriptors_;
  429. };
  430. } // namespace detail
  431. } // namespace asio
  432. #endif // defined(ASIO_HAS_EPOLL)
  433. #include "asio/detail/pop_options.hpp"
  434. #endif // ASIO_DETAIL_EPOLL_REACTOR_HPP