reactive_socket_service.hpp 53 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744
  1. //
  2. // reactive_socket_service.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2010 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP
  11. #define ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include "asio/detail/push_options.hpp"
  16. #include "asio/buffer.hpp"
  17. #include "asio/error.hpp"
  18. #include "asio/io_service.hpp"
  19. #include "asio/socket_base.hpp"
  20. #include "asio/detail/bind_handler.hpp"
  21. #include "asio/detail/buffer_sequence_adapter.hpp"
  22. #include "asio/detail/fenced_block.hpp"
  23. #include "asio/detail/noncopyable.hpp"
  24. #include "asio/detail/null_buffers_op.hpp"
  25. #include "asio/detail/reactor.hpp"
  26. #include "asio/detail/reactor_op.hpp"
  27. #include "asio/detail/socket_holder.hpp"
  28. #include "asio/detail/socket_ops.hpp"
  29. #include "asio/detail/socket_types.hpp"
  30. namespace asio {
  31. namespace detail {
  32. template <typename Protocol>
  33. class reactive_socket_service
  34. {
  35. public:
  36. // The protocol type.
  37. typedef Protocol protocol_type;
  38. // The endpoint type.
  39. typedef typename Protocol::endpoint endpoint_type;
  40. // The native type of a socket.
  41. typedef socket_type native_type;
  42. // The implementation type of the socket.
  43. class implementation_type
  44. : private asio::detail::noncopyable
  45. {
  46. public:
  47. // Default constructor.
  48. implementation_type()
  49. : socket_(invalid_socket),
  50. flags_(0),
  51. protocol_(endpoint_type().protocol())
  52. {
  53. }
  54. private:
  55. // Only this service will have access to the internal values.
  56. friend class reactive_socket_service<Protocol>;
  57. // The native socket representation.
  58. socket_type socket_;
  59. enum
  60. {
  61. // The user wants a non-blocking socket.
  62. user_set_non_blocking = 1,
  63. // The implementation wants a non-blocking socket (in order to be able to
  64. // perform asynchronous read and write operations).
  65. internal_non_blocking = 2,
  66. // Helper "flag" used to determine whether the socket is non-blocking.
  67. non_blocking = user_set_non_blocking | internal_non_blocking,
  68. // User wants connection_aborted errors, which are disabled by default.
  69. enable_connection_aborted = 4,
  70. // The user set the linger option. Needs to be checked when closing.
  71. user_set_linger = 8
  72. };
  73. // Flags indicating the current state of the socket.
  74. unsigned char flags_;
  75. // The protocol associated with the socket.
  76. protocol_type protocol_;
  77. // Per-descriptor data used by the reactor.
  78. reactor::per_descriptor_data reactor_data_;
  79. };
  80. // Constructor.
  81. reactive_socket_service(asio::io_service& io_service)
  82. : io_service_impl_(use_service<io_service_impl>(io_service)),
  83. reactor_(use_service<reactor>(io_service))
  84. {
  85. reactor_.init_task();
  86. }
  87. // Destroy all user-defined handler objects owned by the service.
  88. void shutdown_service()
  89. {
  90. }
  91. // Construct a new socket implementation.
  92. void construct(implementation_type& impl)
  93. {
  94. impl.socket_ = invalid_socket;
  95. impl.flags_ = 0;
  96. }
  97. // Destroy a socket implementation.
  98. void destroy(implementation_type& impl)
  99. {
  100. if (impl.socket_ != invalid_socket)
  101. {
  102. reactor_.close_descriptor(impl.socket_, impl.reactor_data_);
  103. if (impl.flags_ & implementation_type::non_blocking)
  104. {
  105. ioctl_arg_type non_blocking = 0;
  106. asio::error_code ignored_ec;
  107. socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec);
  108. impl.flags_ &= ~implementation_type::non_blocking;
  109. }
  110. if (impl.flags_ & implementation_type::user_set_linger)
  111. {
  112. ::linger opt;
  113. opt.l_onoff = 0;
  114. opt.l_linger = 0;
  115. asio::error_code ignored_ec;
  116. socket_ops::setsockopt(impl.socket_,
  117. SOL_SOCKET, SO_LINGER, &opt, sizeof(opt), ignored_ec);
  118. }
  119. asio::error_code ignored_ec;
  120. socket_ops::close(impl.socket_, ignored_ec);
  121. impl.socket_ = invalid_socket;
  122. }
  123. }
  124. // Open a new socket implementation.
  125. asio::error_code open(implementation_type& impl,
  126. const protocol_type& protocol, asio::error_code& ec)
  127. {
  128. if (is_open(impl))
  129. {
  130. ec = asio::error::already_open;
  131. return ec;
  132. }
  133. socket_holder sock(socket_ops::socket(protocol.family(),
  134. protocol.type(), protocol.protocol(), ec));
  135. if (sock.get() == invalid_socket)
  136. return ec;
  137. if (int err = reactor_.register_descriptor(sock.get(), impl.reactor_data_))
  138. {
  139. ec = asio::error_code(err,
  140. asio::error::get_system_category());
  141. return ec;
  142. }
  143. impl.socket_ = sock.release();
  144. impl.flags_ = 0;
  145. impl.protocol_ = protocol;
  146. ec = asio::error_code();
  147. return ec;
  148. }
  149. // Assign a native socket to a socket implementation.
  150. asio::error_code assign(implementation_type& impl,
  151. const protocol_type& protocol, const native_type& native_socket,
  152. asio::error_code& ec)
  153. {
  154. if (is_open(impl))
  155. {
  156. ec = asio::error::already_open;
  157. return ec;
  158. }
  159. if (int err = reactor_.register_descriptor(
  160. native_socket, impl.reactor_data_))
  161. {
  162. ec = asio::error_code(err,
  163. asio::error::get_system_category());
  164. return ec;
  165. }
  166. impl.socket_ = native_socket;
  167. impl.flags_ = 0;
  168. impl.protocol_ = protocol;
  169. ec = asio::error_code();
  170. return ec;
  171. }
  172. // Determine whether the socket is open.
  173. bool is_open(const implementation_type& impl) const
  174. {
  175. return impl.socket_ != invalid_socket;
  176. }
  177. // Destroy a socket implementation.
  178. asio::error_code close(implementation_type& impl,
  179. asio::error_code& ec)
  180. {
  181. if (is_open(impl))
  182. {
  183. reactor_.close_descriptor(impl.socket_, impl.reactor_data_);
  184. if (impl.flags_ & implementation_type::non_blocking)
  185. {
  186. ioctl_arg_type non_blocking = 0;
  187. asio::error_code ignored_ec;
  188. socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec);
  189. impl.flags_ &= ~implementation_type::non_blocking;
  190. }
  191. if (socket_ops::close(impl.socket_, ec) == socket_error_retval)
  192. return ec;
  193. impl.socket_ = invalid_socket;
  194. }
  195. ec = asio::error_code();
  196. return ec;
  197. }
  198. // Get the native socket representation.
  199. native_type native(implementation_type& impl)
  200. {
  201. return impl.socket_;
  202. }
  203. // Cancel all operations associated with the socket.
  204. asio::error_code cancel(implementation_type& impl,
  205. asio::error_code& ec)
  206. {
  207. if (!is_open(impl))
  208. {
  209. ec = asio::error::bad_descriptor;
  210. return ec;
  211. }
  212. reactor_.cancel_ops(impl.socket_, impl.reactor_data_);
  213. ec = asio::error_code();
  214. return ec;
  215. }
  216. // Determine whether the socket is at the out-of-band data mark.
  217. bool at_mark(const implementation_type& impl,
  218. asio::error_code& ec) const
  219. {
  220. if (!is_open(impl))
  221. {
  222. ec = asio::error::bad_descriptor;
  223. return false;
  224. }
  225. #if defined(SIOCATMARK)
  226. asio::detail::ioctl_arg_type value = 0;
  227. socket_ops::ioctl(impl.socket_, SIOCATMARK, &value, ec);
  228. # if defined(ENOTTY)
  229. if (ec.value() == ENOTTY)
  230. ec = asio::error::not_socket;
  231. # endif // defined(ENOTTY)
  232. #else // defined(SIOCATMARK)
  233. int value = sockatmark(impl.socket_);
  234. if (value == -1)
  235. ec = asio::error_code(errno,
  236. asio::error::get_system_category());
  237. else
  238. ec = asio::error_code();
  239. #endif // defined(SIOCATMARK)
  240. return ec ? false : value != 0;
  241. }
  242. // Determine the number of bytes available for reading.
  243. std::size_t available(const implementation_type& impl,
  244. asio::error_code& ec) const
  245. {
  246. if (!is_open(impl))
  247. {
  248. ec = asio::error::bad_descriptor;
  249. return 0;
  250. }
  251. asio::detail::ioctl_arg_type value = 0;
  252. socket_ops::ioctl(impl.socket_, FIONREAD, &value, ec);
  253. #if defined(ENOTTY)
  254. if (ec.value() == ENOTTY)
  255. ec = asio::error::not_socket;
  256. #endif // defined(ENOTTY)
  257. return ec ? static_cast<std::size_t>(0) : static_cast<std::size_t>(value);
  258. }
  259. // Bind the socket to the specified local endpoint.
  260. asio::error_code bind(implementation_type& impl,
  261. const endpoint_type& endpoint, asio::error_code& ec)
  262. {
  263. if (!is_open(impl))
  264. {
  265. ec = asio::error::bad_descriptor;
  266. return ec;
  267. }
  268. socket_ops::bind(impl.socket_, endpoint.data(), endpoint.size(), ec);
  269. return ec;
  270. }
  271. // Place the socket into the state where it will listen for new connections.
  272. asio::error_code listen(implementation_type& impl, int backlog,
  273. asio::error_code& ec)
  274. {
  275. if (!is_open(impl))
  276. {
  277. ec = asio::error::bad_descriptor;
  278. return ec;
  279. }
  280. socket_ops::listen(impl.socket_, backlog, ec);
  281. return ec;
  282. }
  283. // Set a socket option.
  284. template <typename Option>
  285. asio::error_code set_option(implementation_type& impl,
  286. const Option& option, asio::error_code& ec)
  287. {
  288. if (!is_open(impl))
  289. {
  290. ec = asio::error::bad_descriptor;
  291. return ec;
  292. }
  293. if (option.level(impl.protocol_) == custom_socket_option_level
  294. && option.name(impl.protocol_) == enable_connection_aborted_option)
  295. {
  296. if (option.size(impl.protocol_) != sizeof(int))
  297. {
  298. ec = asio::error::invalid_argument;
  299. }
  300. else
  301. {
  302. if (*reinterpret_cast<const int*>(option.data(impl.protocol_)))
  303. impl.flags_ |= implementation_type::enable_connection_aborted;
  304. else
  305. impl.flags_ &= ~implementation_type::enable_connection_aborted;
  306. ec = asio::error_code();
  307. }
  308. return ec;
  309. }
  310. else
  311. {
  312. if (option.level(impl.protocol_) == SOL_SOCKET
  313. && option.name(impl.protocol_) == SO_LINGER)
  314. {
  315. impl.flags_ |= implementation_type::user_set_linger;
  316. }
  317. socket_ops::setsockopt(impl.socket_,
  318. option.level(impl.protocol_), option.name(impl.protocol_),
  319. option.data(impl.protocol_), option.size(impl.protocol_), ec);
  320. #if defined(__MACH__) && defined(__APPLE__) \
  321. || defined(__NetBSD__) || defined(__FreeBSD__) || defined(__OpenBSD__)
  322. // To implement portable behaviour for SO_REUSEADDR with UDP sockets we
  323. // need to also set SO_REUSEPORT on BSD-based platforms.
  324. if (!ec && impl.protocol_.type() == SOCK_DGRAM
  325. && option.level(impl.protocol_) == SOL_SOCKET
  326. && option.name(impl.protocol_) == SO_REUSEADDR)
  327. {
  328. asio::error_code ignored_ec;
  329. socket_ops::setsockopt(impl.socket_, SOL_SOCKET, SO_REUSEPORT,
  330. option.data(impl.protocol_), option.size(impl.protocol_),
  331. ignored_ec);
  332. }
  333. #endif
  334. return ec;
  335. }
  336. }
  337. // Set a socket option.
  338. template <typename Option>
  339. asio::error_code get_option(const implementation_type& impl,
  340. Option& option, asio::error_code& ec) const
  341. {
  342. if (!is_open(impl))
  343. {
  344. ec = asio::error::bad_descriptor;
  345. return ec;
  346. }
  347. if (option.level(impl.protocol_) == custom_socket_option_level
  348. && option.name(impl.protocol_) == enable_connection_aborted_option)
  349. {
  350. if (option.size(impl.protocol_) != sizeof(int))
  351. {
  352. ec = asio::error::invalid_argument;
  353. }
  354. else
  355. {
  356. int* target = reinterpret_cast<int*>(option.data(impl.protocol_));
  357. if (impl.flags_ & implementation_type::enable_connection_aborted)
  358. *target = 1;
  359. else
  360. *target = 0;
  361. option.resize(impl.protocol_, sizeof(int));
  362. ec = asio::error_code();
  363. }
  364. return ec;
  365. }
  366. else
  367. {
  368. size_t size = option.size(impl.protocol_);
  369. socket_ops::getsockopt(impl.socket_,
  370. option.level(impl.protocol_), option.name(impl.protocol_),
  371. option.data(impl.protocol_), &size, ec);
  372. if (!ec)
  373. option.resize(impl.protocol_, size);
  374. return ec;
  375. }
  376. }
  377. // Perform an IO control command on the socket.
  378. template <typename IO_Control_Command>
  379. asio::error_code io_control(implementation_type& impl,
  380. IO_Control_Command& command, asio::error_code& ec)
  381. {
  382. if (!is_open(impl))
  383. {
  384. ec = asio::error::bad_descriptor;
  385. return ec;
  386. }
  387. socket_ops::ioctl(impl.socket_, command.name(),
  388. static_cast<ioctl_arg_type*>(command.data()), ec);
  389. // When updating the non-blocking mode we always perform the ioctl
  390. // syscall, even if the flags would otherwise indicate that the socket is
  391. // already in the correct state. This ensures that the underlying socket
  392. // is put into the state that has been requested by the user. If the ioctl
  393. // syscall was successful then we need to update the flags to match.
  394. if (!ec && command.name() == static_cast<int>(FIONBIO))
  395. {
  396. if (*static_cast<ioctl_arg_type*>(command.data()))
  397. {
  398. impl.flags_ |= implementation_type::user_set_non_blocking;
  399. }
  400. else
  401. {
  402. // Clearing the non-blocking mode always overrides any internally-set
  403. // non-blocking flag. Any subsequent asynchronous operations will need
  404. // to re-enable non-blocking I/O.
  405. impl.flags_ &= ~(implementation_type::user_set_non_blocking
  406. | implementation_type::internal_non_blocking);
  407. }
  408. }
  409. return ec;
  410. }
  411. // Get the local endpoint.
  412. endpoint_type local_endpoint(const implementation_type& impl,
  413. asio::error_code& ec) const
  414. {
  415. if (!is_open(impl))
  416. {
  417. ec = asio::error::bad_descriptor;
  418. return endpoint_type();
  419. }
  420. endpoint_type endpoint;
  421. std::size_t addr_len = endpoint.capacity();
  422. if (socket_ops::getsockname(impl.socket_, endpoint.data(), &addr_len, ec))
  423. return endpoint_type();
  424. endpoint.resize(addr_len);
  425. return endpoint;
  426. }
  427. // Get the remote endpoint.
  428. endpoint_type remote_endpoint(const implementation_type& impl,
  429. asio::error_code& ec) const
  430. {
  431. if (!is_open(impl))
  432. {
  433. ec = asio::error::bad_descriptor;
  434. return endpoint_type();
  435. }
  436. endpoint_type endpoint;
  437. std::size_t addr_len = endpoint.capacity();
  438. if (socket_ops::getpeername(impl.socket_, endpoint.data(), &addr_len, ec))
  439. return endpoint_type();
  440. endpoint.resize(addr_len);
  441. return endpoint;
  442. }
  443. /// Disable sends or receives on the socket.
  444. asio::error_code shutdown(implementation_type& impl,
  445. socket_base::shutdown_type what, asio::error_code& ec)
  446. {
  447. if (!is_open(impl))
  448. {
  449. ec = asio::error::bad_descriptor;
  450. return ec;
  451. }
  452. socket_ops::shutdown(impl.socket_, what, ec);
  453. return ec;
  454. }
  455. // Send the given data to the peer.
  456. template <typename ConstBufferSequence>
  457. size_t send(implementation_type& impl, const ConstBufferSequence& buffers,
  458. socket_base::message_flags flags, asio::error_code& ec)
  459. {
  460. if (!is_open(impl))
  461. {
  462. ec = asio::error::bad_descriptor;
  463. return 0;
  464. }
  465. buffer_sequence_adapter<asio::const_buffer,
  466. ConstBufferSequence> bufs(buffers);
  467. // A request to receive 0 bytes on a stream socket is a no-op.
  468. if (impl.protocol_.type() == SOCK_STREAM && bufs.all_empty())
  469. {
  470. ec = asio::error_code();
  471. return 0;
  472. }
  473. // Send the data.
  474. for (;;)
  475. {
  476. // Try to complete the operation without blocking.
  477. int bytes_sent = socket_ops::send(impl.socket_,
  478. bufs.buffers(), bufs.count(), flags, ec);
  479. // Check if operation succeeded.
  480. if (bytes_sent >= 0)
  481. return bytes_sent;
  482. // Operation failed.
  483. if ((impl.flags_ & implementation_type::user_set_non_blocking)
  484. || (ec != asio::error::would_block
  485. && ec != asio::error::try_again))
  486. return 0;
  487. // Wait for socket to become ready.
  488. if (socket_ops::poll_write(impl.socket_, ec) < 0)
  489. return 0;
  490. }
  491. }
  492. // Wait until data can be sent without blocking.
  493. size_t send(implementation_type& impl, const null_buffers&,
  494. socket_base::message_flags, asio::error_code& ec)
  495. {
  496. if (!is_open(impl))
  497. {
  498. ec = asio::error::bad_descriptor;
  499. return 0;
  500. }
  501. // Wait for socket to become ready.
  502. socket_ops::poll_write(impl.socket_, ec);
  503. return 0;
  504. }
  505. template <typename ConstBufferSequence>
  506. class send_op_base : public reactor_op
  507. {
  508. public:
  509. send_op_base(socket_type socket, const ConstBufferSequence& buffers,
  510. socket_base::message_flags flags, func_type complete_func)
  511. : reactor_op(&send_op_base::do_perform, complete_func),
  512. socket_(socket),
  513. buffers_(buffers),
  514. flags_(flags)
  515. {
  516. }
  517. static bool do_perform(reactor_op* base)
  518. {
  519. send_op_base* o(static_cast<send_op_base*>(base));
  520. buffer_sequence_adapter<asio::const_buffer,
  521. ConstBufferSequence> bufs(o->buffers_);
  522. for (;;)
  523. {
  524. // Send the data.
  525. asio::error_code ec;
  526. int bytes = socket_ops::send(o->socket_,
  527. bufs.buffers(), bufs.count(), o->flags_, ec);
  528. // Retry operation if interrupted by signal.
  529. if (ec == asio::error::interrupted)
  530. continue;
  531. // Check if we need to run the operation again.
  532. if (ec == asio::error::would_block
  533. || ec == asio::error::try_again)
  534. return false;
  535. o->ec_ = ec;
  536. o->bytes_transferred_ = (bytes < 0 ? 0 : bytes);
  537. return true;
  538. }
  539. }
  540. private:
  541. socket_type socket_;
  542. ConstBufferSequence buffers_;
  543. socket_base::message_flags flags_;
  544. };
  545. template <typename ConstBufferSequence, typename Handler>
  546. class send_op : public send_op_base<ConstBufferSequence>
  547. {
  548. public:
  549. send_op(socket_type socket, const ConstBufferSequence& buffers,
  550. socket_base::message_flags flags, Handler handler)
  551. : send_op_base<ConstBufferSequence>(socket,
  552. buffers, flags, &send_op::do_complete),
  553. handler_(handler)
  554. {
  555. }
  556. static void do_complete(io_service_impl* owner, operation* base,
  557. asio::error_code /*ec*/, std::size_t /*bytes_transferred*/)
  558. {
  559. // Take ownership of the handler object.
  560. send_op* o(static_cast<send_op*>(base));
  561. typedef handler_alloc_traits<Handler, send_op> alloc_traits;
  562. handler_ptr<alloc_traits> ptr(o->handler_, o);
  563. // Make the upcall if required.
  564. if (owner)
  565. {
  566. // Make a copy of the handler so that the memory can be deallocated
  567. // before the upcall is made. Even if we're not about to make an
  568. // upcall, a sub-object of the handler may be the true owner of the
  569. // memory associated with the handler. Consequently, a local copy of
  570. // the handler is required to ensure that any owning sub-object remains
  571. // valid until after we have deallocated the memory here.
  572. detail::binder2<Handler, asio::error_code, std::size_t>
  573. handler(o->handler_, o->ec_, o->bytes_transferred_);
  574. ptr.reset();
  575. asio::detail::fenced_block b;
  576. asio_handler_invoke_helpers::invoke(handler, handler);
  577. }
  578. }
  579. private:
  580. Handler handler_;
  581. };
  582. // Start an asynchronous send. The data being sent must be valid for the
  583. // lifetime of the asynchronous operation.
  584. template <typename ConstBufferSequence, typename Handler>
  585. void async_send(implementation_type& impl, const ConstBufferSequence& buffers,
  586. socket_base::message_flags flags, Handler handler)
  587. {
  588. // Allocate and construct an operation to wrap the handler.
  589. typedef send_op<ConstBufferSequence, Handler> value_type;
  590. typedef handler_alloc_traits<Handler, value_type> alloc_traits;
  591. raw_handler_ptr<alloc_traits> raw_ptr(handler);
  592. handler_ptr<alloc_traits> ptr(raw_ptr,
  593. impl.socket_, buffers, flags, handler);
  594. start_op(impl, reactor::write_op, ptr.get(), true,
  595. (impl.protocol_.type() == SOCK_STREAM
  596. && buffer_sequence_adapter<asio::const_buffer,
  597. ConstBufferSequence>::all_empty(buffers)));
  598. ptr.release();
  599. }
  600. // Start an asynchronous wait until data can be sent without blocking.
  601. template <typename Handler>
  602. void async_send(implementation_type& impl, const null_buffers&,
  603. socket_base::message_flags, Handler handler)
  604. {
  605. // Allocate and construct an operation to wrap the handler.
  606. typedef null_buffers_op<Handler> value_type;
  607. typedef handler_alloc_traits<Handler, value_type> alloc_traits;
  608. raw_handler_ptr<alloc_traits> raw_ptr(handler);
  609. handler_ptr<alloc_traits> ptr(raw_ptr, handler);
  610. start_op(impl, reactor::write_op, ptr.get(), false, false);
  611. ptr.release();
  612. }
  613. // Send a datagram to the specified endpoint. Returns the number of bytes
  614. // sent.
  615. template <typename ConstBufferSequence>
  616. size_t send_to(implementation_type& impl, const ConstBufferSequence& buffers,
  617. const endpoint_type& destination, socket_base::message_flags flags,
  618. asio::error_code& ec)
  619. {
  620. if (!is_open(impl))
  621. {
  622. ec = asio::error::bad_descriptor;
  623. return 0;
  624. }
  625. buffer_sequence_adapter<asio::const_buffer,
  626. ConstBufferSequence> bufs(buffers);
  627. // Send the data.
  628. for (;;)
  629. {
  630. // Try to complete the operation without blocking.
  631. int bytes_sent = socket_ops::sendto(impl.socket_, bufs.buffers(),
  632. bufs.count(), flags, destination.data(), destination.size(), ec);
  633. // Check if operation succeeded.
  634. if (bytes_sent >= 0)
  635. return bytes_sent;
  636. // Operation failed.
  637. if ((impl.flags_ & implementation_type::user_set_non_blocking)
  638. || (ec != asio::error::would_block
  639. && ec != asio::error::try_again))
  640. return 0;
  641. // Wait for socket to become ready.
  642. if (socket_ops::poll_write(impl.socket_, ec) < 0)
  643. return 0;
  644. }
  645. }
  646. // Wait until data can be sent without blocking.
  647. size_t send_to(implementation_type& impl, const null_buffers&,
  648. socket_base::message_flags, const endpoint_type&,
  649. asio::error_code& ec)
  650. {
  651. if (!is_open(impl))
  652. {
  653. ec = asio::error::bad_descriptor;
  654. return 0;
  655. }
  656. // Wait for socket to become ready.
  657. socket_ops::poll_write(impl.socket_, ec);
  658. return 0;
  659. }
  660. template <typename ConstBufferSequence>
  661. class send_to_op_base : public reactor_op
  662. {
  663. public:
  664. send_to_op_base(socket_type socket, const ConstBufferSequence& buffers,
  665. const endpoint_type& endpoint, socket_base::message_flags flags,
  666. func_type complete_func)
  667. : reactor_op(&send_to_op_base::do_perform, complete_func),
  668. socket_(socket),
  669. buffers_(buffers),
  670. destination_(endpoint),
  671. flags_(flags)
  672. {
  673. }
  674. static bool do_perform(reactor_op* base)
  675. {
  676. send_to_op_base* o(static_cast<send_to_op_base*>(base));
  677. buffer_sequence_adapter<asio::const_buffer,
  678. ConstBufferSequence> bufs(o->buffers_);
  679. for (;;)
  680. {
  681. // Send the data.
  682. asio::error_code ec;
  683. int bytes = socket_ops::sendto(o->socket_, bufs.buffers(), bufs.count(),
  684. o->flags_, o->destination_.data(), o->destination_.size(), ec);
  685. // Retry operation if interrupted by signal.
  686. if (ec == asio::error::interrupted)
  687. continue;
  688. // Check if we need to run the operation again.
  689. if (ec == asio::error::would_block
  690. || ec == asio::error::try_again)
  691. return false;
  692. o->ec_ = ec;
  693. o->bytes_transferred_ = (bytes < 0 ? 0 : bytes);
  694. return true;
  695. }
  696. }
  697. private:
  698. socket_type socket_;
  699. ConstBufferSequence buffers_;
  700. endpoint_type destination_;
  701. socket_base::message_flags flags_;
  702. };
  703. template <typename ConstBufferSequence, typename Handler>
  704. class send_to_op : public send_to_op_base<ConstBufferSequence>
  705. {
  706. public:
  707. send_to_op(socket_type socket, const ConstBufferSequence& buffers,
  708. const endpoint_type& endpoint, socket_base::message_flags flags,
  709. Handler handler)
  710. : send_to_op_base<ConstBufferSequence>(socket,
  711. buffers, endpoint, flags, &send_to_op::do_complete),
  712. handler_(handler)
  713. {
  714. }
  715. static void do_complete(io_service_impl* owner, operation* base,
  716. asio::error_code /*ec*/, std::size_t /*bytes_transferred*/)
  717. {
  718. // Take ownership of the handler object.
  719. send_to_op* o(static_cast<send_to_op*>(base));
  720. typedef handler_alloc_traits<Handler, send_to_op> alloc_traits;
  721. handler_ptr<alloc_traits> ptr(o->handler_, o);
  722. // Make the upcall if required.
  723. if (owner)
  724. {
  725. // Make a copy of the handler so that the memory can be deallocated
  726. // before the upcall is made. Even if we're not about to make an
  727. // upcall, a sub-object of the handler may be the true owner of the
  728. // memory associated with the handler. Consequently, a local copy of
  729. // the handler is required to ensure that any owning sub-object remains
  730. // valid until after we have deallocated the memory here.
  731. detail::binder2<Handler, asio::error_code, std::size_t>
  732. handler(o->handler_, o->ec_, o->bytes_transferred_);
  733. ptr.reset();
  734. asio::detail::fenced_block b;
  735. asio_handler_invoke_helpers::invoke(handler, handler);
  736. }
  737. }
  738. private:
  739. Handler handler_;
  740. };
  741. // Start an asynchronous send. The data being sent must be valid for the
  742. // lifetime of the asynchronous operation.
  743. template <typename ConstBufferSequence, typename Handler>
  744. void async_send_to(implementation_type& impl,
  745. const ConstBufferSequence& buffers,
  746. const endpoint_type& destination, socket_base::message_flags flags,
  747. Handler handler)
  748. {
  749. // Allocate and construct an operation to wrap the handler.
  750. typedef send_to_op<ConstBufferSequence, Handler> value_type;
  751. typedef handler_alloc_traits<Handler, value_type> alloc_traits;
  752. raw_handler_ptr<alloc_traits> raw_ptr(handler);
  753. handler_ptr<alloc_traits> ptr(raw_ptr, impl.socket_,
  754. buffers, destination, flags, handler);
  755. start_op(impl, reactor::write_op, ptr.get(), true, false);
  756. ptr.release();
  757. }
  758. // Start an asynchronous wait until data can be sent without blocking.
  759. template <typename Handler>
  760. void async_send_to(implementation_type& impl, const null_buffers&,
  761. socket_base::message_flags, const endpoint_type&, Handler handler)
  762. {
  763. // Allocate and construct an operation to wrap the handler.
  764. typedef null_buffers_op<Handler> value_type;
  765. typedef handler_alloc_traits<Handler, value_type> alloc_traits;
  766. raw_handler_ptr<alloc_traits> raw_ptr(handler);
  767. handler_ptr<alloc_traits> ptr(raw_ptr, handler);
  768. start_op(impl, reactor::write_op, ptr.get(), false, false);
  769. ptr.release();
  770. }
  771. // Receive some data from the peer. Returns the number of bytes received.
  772. template <typename MutableBufferSequence>
  773. size_t receive(implementation_type& impl,
  774. const MutableBufferSequence& buffers,
  775. socket_base::message_flags flags, asio::error_code& ec)
  776. {
  777. if (!is_open(impl))
  778. {
  779. ec = asio::error::bad_descriptor;
  780. return 0;
  781. }
  782. buffer_sequence_adapter<asio::mutable_buffer,
  783. MutableBufferSequence> bufs(buffers);
  784. // A request to receive 0 bytes on a stream socket is a no-op.
  785. if (impl.protocol_.type() == SOCK_STREAM && bufs.all_empty())
  786. {
  787. ec = asio::error_code();
  788. return 0;
  789. }
  790. // Receive some data.
  791. for (;;)
  792. {
  793. // Try to complete the operation without blocking.
  794. int bytes_recvd = socket_ops::recv(impl.socket_,
  795. bufs.buffers(), bufs.count(), flags, ec);
  796. // Check if operation succeeded.
  797. if (bytes_recvd > 0)
  798. return bytes_recvd;
  799. // Check for EOF.
  800. if (bytes_recvd == 0 && impl.protocol_.type() == SOCK_STREAM)
  801. {
  802. ec = asio::error::eof;
  803. return 0;
  804. }
  805. // Operation failed.
  806. if ((impl.flags_ & implementation_type::user_set_non_blocking)
  807. || (ec != asio::error::would_block
  808. && ec != asio::error::try_again))
  809. return 0;
  810. // Wait for socket to become ready.
  811. if (socket_ops::poll_read(impl.socket_, ec) < 0)
  812. return 0;
  813. }
  814. }
  815. // Wait until data can be received without blocking.
  816. size_t receive(implementation_type& impl, const null_buffers&,
  817. socket_base::message_flags, asio::error_code& ec)
  818. {
  819. if (!is_open(impl))
  820. {
  821. ec = asio::error::bad_descriptor;
  822. return 0;
  823. }
  824. // Wait for socket to become ready.
  825. socket_ops::poll_read(impl.socket_, ec);
  826. return 0;
  827. }
  828. template <typename MutableBufferSequence>
  829. class receive_op_base : public reactor_op
  830. {
  831. public:
  832. receive_op_base(socket_type socket, int protocol_type,
  833. const MutableBufferSequence& buffers,
  834. socket_base::message_flags flags, func_type complete_func)
  835. : reactor_op(&receive_op_base::do_perform, complete_func),
  836. socket_(socket),
  837. protocol_type_(protocol_type),
  838. buffers_(buffers),
  839. flags_(flags)
  840. {
  841. }
  842. static bool do_perform(reactor_op* base)
  843. {
  844. receive_op_base* o(static_cast<receive_op_base*>(base));
  845. buffer_sequence_adapter<asio::mutable_buffer,
  846. MutableBufferSequence> bufs(o->buffers_);
  847. for (;;)
  848. {
  849. // Receive some data.
  850. asio::error_code ec;
  851. int bytes = socket_ops::recv(o->socket_,
  852. bufs.buffers(), bufs.count(), o->flags_, ec);
  853. if (bytes == 0 && o->protocol_type_ == SOCK_STREAM)
  854. ec = asio::error::eof;
  855. // Retry operation if interrupted by signal.
  856. if (ec == asio::error::interrupted)
  857. continue;
  858. // Check if we need to run the operation again.
  859. if (ec == asio::error::would_block
  860. || ec == asio::error::try_again)
  861. return false;
  862. o->ec_ = ec;
  863. o->bytes_transferred_ = (bytes < 0 ? 0 : bytes);
  864. return true;
  865. }
  866. }
  867. private:
  868. socket_type socket_;
  869. int protocol_type_;
  870. MutableBufferSequence buffers_;
  871. socket_base::message_flags flags_;
  872. };
  873. template <typename MutableBufferSequence, typename Handler>
  874. class receive_op : public receive_op_base<MutableBufferSequence>
  875. {
  876. public:
  877. receive_op(socket_type socket, int protocol_type,
  878. const MutableBufferSequence& buffers,
  879. socket_base::message_flags flags, Handler handler)
  880. : receive_op_base<MutableBufferSequence>(socket,
  881. protocol_type, buffers, flags, &receive_op::do_complete),
  882. handler_(handler)
  883. {
  884. }
  885. static void do_complete(io_service_impl* owner, operation* base,
  886. asio::error_code /*ec*/, std::size_t /*bytes_transferred*/)
  887. {
  888. // Take ownership of the handler object.
  889. receive_op* o(static_cast<receive_op*>(base));
  890. typedef handler_alloc_traits<Handler, receive_op> alloc_traits;
  891. handler_ptr<alloc_traits> ptr(o->handler_, o);
  892. // Make the upcall if required.
  893. if (owner)
  894. {
  895. // Make a copy of the handler so that the memory can be deallocated
  896. // before the upcall is made. Even if we're not about to make an
  897. // upcall, a sub-object of the handler may be the true owner of the
  898. // memory associated with the handler. Consequently, a local copy of
  899. // the handler is required to ensure that any owning sub-object remains
  900. // valid until after we have deallocated the memory here.
  901. detail::binder2<Handler, asio::error_code, std::size_t>
  902. handler(o->handler_, o->ec_, o->bytes_transferred_);
  903. ptr.reset();
  904. asio::detail::fenced_block b;
  905. asio_handler_invoke_helpers::invoke(handler, handler);
  906. }
  907. }
  908. private:
  909. Handler handler_;
  910. };
  911. // Start an asynchronous receive. The buffer for the data being received
  912. // must be valid for the lifetime of the asynchronous operation.
  913. template <typename MutableBufferSequence, typename Handler>
  914. void async_receive(implementation_type& impl,
  915. const MutableBufferSequence& buffers,
  916. socket_base::message_flags flags, Handler handler)
  917. {
  918. // Allocate and construct an operation to wrap the handler.
  919. typedef receive_op<MutableBufferSequence, Handler> value_type;
  920. typedef handler_alloc_traits<Handler, value_type> alloc_traits;
  921. raw_handler_ptr<alloc_traits> raw_ptr(handler);
  922. int protocol_type = impl.protocol_.type();
  923. handler_ptr<alloc_traits> ptr(raw_ptr, impl.socket_,
  924. protocol_type, buffers, flags, handler);
  925. start_op(impl,
  926. (flags & socket_base::message_out_of_band)
  927. ? reactor::except_op : reactor::read_op,
  928. ptr.get(), (flags & socket_base::message_out_of_band) == 0,
  929. (impl.protocol_.type() == SOCK_STREAM
  930. && buffer_sequence_adapter<asio::mutable_buffer,
  931. MutableBufferSequence>::all_empty(buffers)));
  932. ptr.release();
  933. }
  934. // Wait until data can be received without blocking.
  935. template <typename Handler>
  936. void async_receive(implementation_type& impl, const null_buffers&,
  937. socket_base::message_flags flags, Handler handler)
  938. {
  939. // Allocate and construct an operation to wrap the handler.
  940. typedef null_buffers_op<Handler> value_type;
  941. typedef handler_alloc_traits<Handler, value_type> alloc_traits;
  942. raw_handler_ptr<alloc_traits> raw_ptr(handler);
  943. handler_ptr<alloc_traits> ptr(raw_ptr, handler);
  944. start_op(impl,
  945. (flags & socket_base::message_out_of_band)
  946. ? reactor::except_op : reactor::read_op,
  947. ptr.get(), false, false);
  948. ptr.release();
  949. }
  950. // Receive a datagram with the endpoint of the sender. Returns the number of
  951. // bytes received.
  952. template <typename MutableBufferSequence>
  953. size_t receive_from(implementation_type& impl,
  954. const MutableBufferSequence& buffers,
  955. endpoint_type& sender_endpoint, socket_base::message_flags flags,
  956. asio::error_code& ec)
  957. {
  958. if (!is_open(impl))
  959. {
  960. ec = asio::error::bad_descriptor;
  961. return 0;
  962. }
  963. buffer_sequence_adapter<asio::mutable_buffer,
  964. MutableBufferSequence> bufs(buffers);
  965. // Receive some data.
  966. for (;;)
  967. {
  968. // Try to complete the operation without blocking.
  969. std::size_t addr_len = sender_endpoint.capacity();
  970. int bytes_recvd = socket_ops::recvfrom(impl.socket_, bufs.buffers(),
  971. bufs.count(), flags, sender_endpoint.data(), &addr_len, ec);
  972. // Check if operation succeeded.
  973. if (bytes_recvd > 0)
  974. {
  975. sender_endpoint.resize(addr_len);
  976. return bytes_recvd;
  977. }
  978. // Check for EOF.
  979. if (bytes_recvd == 0 && impl.protocol_.type() == SOCK_STREAM)
  980. {
  981. ec = asio::error::eof;
  982. return 0;
  983. }
  984. // Operation failed.
  985. if ((impl.flags_ & implementation_type::user_set_non_blocking)
  986. || (ec != asio::error::would_block
  987. && ec != asio::error::try_again))
  988. return 0;
  989. // Wait for socket to become ready.
  990. if (socket_ops::poll_read(impl.socket_, ec) < 0)
  991. return 0;
  992. }
  993. }
  994. // Wait until data can be received without blocking.
  995. size_t receive_from(implementation_type& impl, const null_buffers&,
  996. endpoint_type& sender_endpoint, socket_base::message_flags,
  997. asio::error_code& ec)
  998. {
  999. if (!is_open(impl))
  1000. {
  1001. ec = asio::error::bad_descriptor;
  1002. return 0;
  1003. }
  1004. // Wait for socket to become ready.
  1005. socket_ops::poll_read(impl.socket_, ec);
  1006. // Reset endpoint since it can be given no sensible value at this time.
  1007. sender_endpoint = endpoint_type();
  1008. return 0;
  1009. }
  1010. template <typename MutableBufferSequence>
  1011. class receive_from_op_base : public reactor_op
  1012. {
  1013. public:
  1014. receive_from_op_base(socket_type socket, int protocol_type,
  1015. const MutableBufferSequence& buffers, endpoint_type& endpoint,
  1016. socket_base::message_flags flags, func_type complete_func)
  1017. : reactor_op(&receive_from_op_base::do_perform, complete_func),
  1018. socket_(socket),
  1019. protocol_type_(protocol_type),
  1020. buffers_(buffers),
  1021. sender_endpoint_(endpoint),
  1022. flags_(flags)
  1023. {
  1024. }
  1025. static bool do_perform(reactor_op* base)
  1026. {
  1027. receive_from_op_base* o(static_cast<receive_from_op_base*>(base));
  1028. buffer_sequence_adapter<asio::mutable_buffer,
  1029. MutableBufferSequence> bufs(o->buffers_);
  1030. for (;;)
  1031. {
  1032. // Receive some data.
  1033. asio::error_code ec;
  1034. std::size_t addr_len = o->sender_endpoint_.capacity();
  1035. int bytes = socket_ops::recvfrom(o->socket_, bufs.buffers(),
  1036. bufs.count(), o->flags_, o->sender_endpoint_.data(), &addr_len, ec);
  1037. if (bytes == 0 && o->protocol_type_ == SOCK_STREAM)
  1038. ec = asio::error::eof;
  1039. // Retry operation if interrupted by signal.
  1040. if (ec == asio::error::interrupted)
  1041. continue;
  1042. // Check if we need to run the operation again.
  1043. if (ec == asio::error::would_block
  1044. || ec == asio::error::try_again)
  1045. return false;
  1046. o->sender_endpoint_.resize(addr_len);
  1047. o->ec_ = ec;
  1048. o->bytes_transferred_ = (bytes < 0 ? 0 : bytes);
  1049. return true;
  1050. }
  1051. }
  1052. private:
  1053. socket_type socket_;
  1054. int protocol_type_;
  1055. MutableBufferSequence buffers_;
  1056. endpoint_type& sender_endpoint_;
  1057. socket_base::message_flags flags_;
  1058. };
  1059. template <typename MutableBufferSequence, typename Handler>
  1060. class receive_from_op : public receive_from_op_base<MutableBufferSequence>
  1061. {
  1062. public:
  1063. receive_from_op(socket_type socket, int protocol_type,
  1064. const MutableBufferSequence& buffers, endpoint_type& endpoint,
  1065. socket_base::message_flags flags, Handler handler)
  1066. : receive_from_op_base<MutableBufferSequence>(socket, protocol_type,
  1067. buffers, endpoint, flags, &receive_from_op::do_complete),
  1068. handler_(handler)
  1069. {
  1070. }
  1071. static void do_complete(io_service_impl* owner, operation* base,
  1072. asio::error_code /*ec*/, std::size_t /*bytes_transferred*/)
  1073. {
  1074. // Take ownership of the handler object.
  1075. receive_from_op* o(static_cast<receive_from_op*>(base));
  1076. typedef handler_alloc_traits<Handler, receive_from_op> alloc_traits;
  1077. handler_ptr<alloc_traits> ptr(o->handler_, o);
  1078. // Make the upcall if required.
  1079. if (owner)
  1080. {
  1081. // Make a copy of the handler so that the memory can be deallocated
  1082. // before the upcall is made. Even if we're not about to make an
  1083. // upcall, a sub-object of the handler may be the true owner of the
  1084. // memory associated with the handler. Consequently, a local copy of
  1085. // the handler is required to ensure that any owning sub-object remains
  1086. // valid until after we have deallocated the memory here.
  1087. detail::binder2<Handler, asio::error_code, std::size_t>
  1088. handler(o->handler_, o->ec_, o->bytes_transferred_);
  1089. ptr.reset();
  1090. asio::detail::fenced_block b;
  1091. asio_handler_invoke_helpers::invoke(handler, handler);
  1092. }
  1093. }
  1094. private:
  1095. Handler handler_;
  1096. };
  1097. // Start an asynchronous receive. The buffer for the data being received and
  1098. // the sender_endpoint object must both be valid for the lifetime of the
  1099. // asynchronous operation.
  1100. template <typename MutableBufferSequence, typename Handler>
  1101. void async_receive_from(implementation_type& impl,
  1102. const MutableBufferSequence& buffers, endpoint_type& sender_endpoint,
  1103. socket_base::message_flags flags, Handler handler)
  1104. {
  1105. // Allocate and construct an operation to wrap the handler.
  1106. typedef receive_from_op<MutableBufferSequence, Handler> value_type;
  1107. typedef handler_alloc_traits<Handler, value_type> alloc_traits;
  1108. raw_handler_ptr<alloc_traits> raw_ptr(handler);
  1109. int protocol_type = impl.protocol_.type();
  1110. handler_ptr<alloc_traits> ptr(raw_ptr, impl.socket_,
  1111. protocol_type, buffers, sender_endpoint, flags, handler);
  1112. start_op(impl,
  1113. (flags & socket_base::message_out_of_band)
  1114. ? reactor::except_op : reactor::read_op,
  1115. ptr.get(), true, false);
  1116. ptr.release();
  1117. }
  1118. // Wait until data can be received without blocking.
  1119. template <typename Handler>
  1120. void async_receive_from(implementation_type& impl,
  1121. const null_buffers&, endpoint_type& sender_endpoint,
  1122. socket_base::message_flags flags, Handler handler)
  1123. {
  1124. // Allocate and construct an operation to wrap the handler.
  1125. typedef null_buffers_op<Handler> value_type;
  1126. typedef handler_alloc_traits<Handler, value_type> alloc_traits;
  1127. raw_handler_ptr<alloc_traits> raw_ptr(handler);
  1128. handler_ptr<alloc_traits> ptr(raw_ptr, handler);
  1129. // Reset endpoint since it can be given no sensible value at this time.
  1130. sender_endpoint = endpoint_type();
  1131. start_op(impl,
  1132. (flags & socket_base::message_out_of_band)
  1133. ? reactor::except_op : reactor::read_op,
  1134. ptr.get(), false, false);
  1135. ptr.release();
  1136. }
  1137. // Accept a new connection.
  1138. template <typename Socket>
  1139. asio::error_code accept(implementation_type& impl,
  1140. Socket& peer, endpoint_type* peer_endpoint, asio::error_code& ec)
  1141. {
  1142. if (!is_open(impl))
  1143. {
  1144. ec = asio::error::bad_descriptor;
  1145. return ec;
  1146. }
  1147. // We cannot accept a socket that is already open.
  1148. if (peer.is_open())
  1149. {
  1150. ec = asio::error::already_open;
  1151. return ec;
  1152. }
  1153. // Accept a socket.
  1154. for (;;)
  1155. {
  1156. // Try to complete the operation without blocking.
  1157. socket_holder new_socket;
  1158. std::size_t addr_len = 0;
  1159. if (peer_endpoint)
  1160. {
  1161. addr_len = peer_endpoint->capacity();
  1162. new_socket.reset(socket_ops::accept(impl.socket_,
  1163. peer_endpoint->data(), &addr_len, ec));
  1164. }
  1165. else
  1166. {
  1167. new_socket.reset(socket_ops::accept(impl.socket_, 0, 0, ec));
  1168. }
  1169. // Check if operation succeeded.
  1170. if (new_socket.get() >= 0)
  1171. {
  1172. if (peer_endpoint)
  1173. peer_endpoint->resize(addr_len);
  1174. peer.assign(impl.protocol_, new_socket.get(), ec);
  1175. if (!ec)
  1176. new_socket.release();
  1177. return ec;
  1178. }
  1179. // Operation failed.
  1180. if (ec == asio::error::would_block
  1181. || ec == asio::error::try_again)
  1182. {
  1183. if (impl.flags_ & implementation_type::user_set_non_blocking)
  1184. return ec;
  1185. // Fall through to retry operation.
  1186. }
  1187. else if (ec == asio::error::connection_aborted)
  1188. {
  1189. if (impl.flags_ & implementation_type::enable_connection_aborted)
  1190. return ec;
  1191. // Fall through to retry operation.
  1192. }
  1193. #if defined(EPROTO)
  1194. else if (ec.value() == EPROTO)
  1195. {
  1196. if (impl.flags_ & implementation_type::enable_connection_aborted)
  1197. return ec;
  1198. // Fall through to retry operation.
  1199. }
  1200. #endif // defined(EPROTO)
  1201. else
  1202. return ec;
  1203. // Wait for socket to become ready.
  1204. if (socket_ops::poll_read(impl.socket_, ec) < 0)
  1205. return ec;
  1206. }
  1207. }
  1208. template <typename Socket>
  1209. class accept_op_base : public reactor_op
  1210. {
  1211. public:
  1212. accept_op_base(socket_type socket, Socket& peer,
  1213. const protocol_type& protocol, endpoint_type* peer_endpoint,
  1214. bool enable_connection_aborted, func_type complete_func)
  1215. : reactor_op(&accept_op_base::do_perform, complete_func),
  1216. socket_(socket),
  1217. peer_(peer),
  1218. protocol_(protocol),
  1219. peer_endpoint_(peer_endpoint),
  1220. enable_connection_aborted_(enable_connection_aborted)
  1221. {
  1222. }
  1223. static bool do_perform(reactor_op* base)
  1224. {
  1225. accept_op_base* o(static_cast<accept_op_base*>(base));
  1226. for (;;)
  1227. {
  1228. // Accept the waiting connection.
  1229. asio::error_code ec;
  1230. socket_holder new_socket;
  1231. std::size_t addr_len = 0;
  1232. std::size_t* addr_len_p = 0;
  1233. socket_addr_type* addr = 0;
  1234. if (o->peer_endpoint_)
  1235. {
  1236. addr_len = o->peer_endpoint_->capacity();
  1237. addr_len_p = &addr_len;
  1238. addr = o->peer_endpoint_->data();
  1239. }
  1240. new_socket.reset(socket_ops::accept(o->socket_, addr, addr_len_p, ec));
  1241. // Retry operation if interrupted by signal.
  1242. if (ec == asio::error::interrupted)
  1243. continue;
  1244. // Check if we need to run the operation again.
  1245. if (ec == asio::error::would_block
  1246. || ec == asio::error::try_again)
  1247. return false;
  1248. if (ec == asio::error::connection_aborted
  1249. && !o->enable_connection_aborted_)
  1250. return false;
  1251. #if defined(EPROTO)
  1252. if (ec.value() == EPROTO && !o->enable_connection_aborted_)
  1253. return false;
  1254. #endif // defined(EPROTO)
  1255. // Transfer ownership of the new socket to the peer object.
  1256. if (!ec)
  1257. {
  1258. if (o->peer_endpoint_)
  1259. o->peer_endpoint_->resize(addr_len);
  1260. o->peer_.assign(o->protocol_, new_socket.get(), ec);
  1261. if (!ec)
  1262. new_socket.release();
  1263. }
  1264. o->ec_ = ec;
  1265. return true;
  1266. }
  1267. }
  1268. private:
  1269. socket_type socket_;
  1270. Socket& peer_;
  1271. protocol_type protocol_;
  1272. endpoint_type* peer_endpoint_;
  1273. bool enable_connection_aborted_;
  1274. };
  1275. template <typename Socket, typename Handler>
  1276. class accept_op : public accept_op_base<Socket>
  1277. {
  1278. public:
  1279. accept_op(socket_type socket, Socket& peer, const protocol_type& protocol,
  1280. endpoint_type* peer_endpoint, bool enable_connection_aborted,
  1281. Handler handler)
  1282. : accept_op_base<Socket>(socket, peer, protocol, peer_endpoint,
  1283. enable_connection_aborted, &accept_op::do_complete),
  1284. handler_(handler)
  1285. {
  1286. }
  1287. static void do_complete(io_service_impl* owner, operation* base,
  1288. asio::error_code /*ec*/, std::size_t /*bytes_transferred*/)
  1289. {
  1290. // Take ownership of the handler object.
  1291. accept_op* o(static_cast<accept_op*>(base));
  1292. typedef handler_alloc_traits<Handler, accept_op> alloc_traits;
  1293. handler_ptr<alloc_traits> ptr(o->handler_, o);
  1294. // Make the upcall if required.
  1295. if (owner)
  1296. {
  1297. // Make a copy of the handler so that the memory can be deallocated
  1298. // before the upcall is made. Even if we're not about to make an
  1299. // upcall, a sub-object of the handler may be the true owner of the
  1300. // memory associated with the handler. Consequently, a local copy of
  1301. // the handler is required to ensure that any owning sub-object remains
  1302. // valid until after we have deallocated the memory here.
  1303. detail::binder1<Handler, asio::error_code>
  1304. handler(o->handler_, o->ec_);
  1305. ptr.reset();
  1306. asio::detail::fenced_block b;
  1307. asio_handler_invoke_helpers::invoke(handler, handler);
  1308. }
  1309. }
  1310. private:
  1311. Handler handler_;
  1312. };
  1313. // Start an asynchronous accept. The peer and peer_endpoint objects
  1314. // must be valid until the accept's handler is invoked.
  1315. template <typename Socket, typename Handler>
  1316. void async_accept(implementation_type& impl, Socket& peer,
  1317. endpoint_type* peer_endpoint, Handler handler)
  1318. {
  1319. // Allocate and construct an operation to wrap the handler.
  1320. typedef accept_op<Socket, Handler> value_type;
  1321. typedef handler_alloc_traits<Handler, value_type> alloc_traits;
  1322. raw_handler_ptr<alloc_traits> raw_ptr(handler);
  1323. bool enable_connection_aborted =
  1324. (impl.flags_ & implementation_type::enable_connection_aborted) != 0;
  1325. handler_ptr<alloc_traits> ptr(raw_ptr, impl.socket_, peer,
  1326. impl.protocol_, peer_endpoint, enable_connection_aborted, handler);
  1327. start_accept_op(impl, ptr.get(), peer.is_open());
  1328. ptr.release();
  1329. }
  1330. // Connect the socket to the specified endpoint.
  1331. asio::error_code connect(implementation_type& impl,
  1332. const endpoint_type& peer_endpoint, asio::error_code& ec)
  1333. {
  1334. if (!is_open(impl))
  1335. {
  1336. ec = asio::error::bad_descriptor;
  1337. return ec;
  1338. }
  1339. // Perform the connect operation.
  1340. socket_ops::connect(impl.socket_,
  1341. peer_endpoint.data(), peer_endpoint.size(), ec);
  1342. if (ec != asio::error::in_progress
  1343. && ec != asio::error::would_block)
  1344. {
  1345. // The connect operation finished immediately.
  1346. return ec;
  1347. }
  1348. // Wait for socket to become ready.
  1349. if (socket_ops::poll_connect(impl.socket_, ec) < 0)
  1350. return ec;
  1351. // Get the error code from the connect operation.
  1352. int connect_error = 0;
  1353. size_t connect_error_len = sizeof(connect_error);
  1354. if (socket_ops::getsockopt(impl.socket_, SOL_SOCKET, SO_ERROR,
  1355. &connect_error, &connect_error_len, ec) == socket_error_retval)
  1356. return ec;
  1357. // Return the result of the connect operation.
  1358. ec = asio::error_code(connect_error,
  1359. asio::error::get_system_category());
  1360. return ec;
  1361. }
  1362. class connect_op_base : public reactor_op
  1363. {
  1364. public:
  1365. connect_op_base(socket_type socket, func_type complete_func)
  1366. : reactor_op(&connect_op_base::do_perform, complete_func),
  1367. socket_(socket)
  1368. {
  1369. }
  1370. static bool do_perform(reactor_op* base)
  1371. {
  1372. connect_op_base* o(static_cast<connect_op_base*>(base));
  1373. // Get the error code from the connect operation.
  1374. int connect_error = 0;
  1375. size_t connect_error_len = sizeof(connect_error);
  1376. if (socket_ops::getsockopt(o->socket_, SOL_SOCKET, SO_ERROR,
  1377. &connect_error, &connect_error_len, o->ec_) == socket_error_retval)
  1378. return true;
  1379. // The connection failed so the handler will be posted with an error code.
  1380. if (connect_error)
  1381. {
  1382. o->ec_ = asio::error_code(connect_error,
  1383. asio::error::get_system_category());
  1384. }
  1385. return true;
  1386. }
  1387. private:
  1388. socket_type socket_;
  1389. };
  1390. template <typename Handler>
  1391. class connect_op : public connect_op_base
  1392. {
  1393. public:
  1394. connect_op(socket_type socket, Handler handler)
  1395. : connect_op_base(socket, &connect_op::do_complete),
  1396. handler_(handler)
  1397. {
  1398. }
  1399. static void do_complete(io_service_impl* owner, operation* base,
  1400. asio::error_code /*ec*/, std::size_t /*bytes_transferred*/)
  1401. {
  1402. // Take ownership of the handler object.
  1403. connect_op* o(static_cast<connect_op*>(base));
  1404. typedef handler_alloc_traits<Handler, connect_op> alloc_traits;
  1405. handler_ptr<alloc_traits> ptr(o->handler_, o);
  1406. // Make the upcall if required.
  1407. if (owner)
  1408. {
  1409. // Make a copy of the handler so that the memory can be deallocated
  1410. // before the upcall is made. Even if we're not about to make an
  1411. // upcall, a sub-object of the handler may be the true owner of the
  1412. // memory associated with the handler. Consequently, a local copy of
  1413. // the handler is required to ensure that any owning sub-object remains
  1414. // valid until after we have deallocated the memory here.
  1415. detail::binder1<Handler, asio::error_code>
  1416. handler(o->handler_, o->ec_);
  1417. ptr.reset();
  1418. asio::detail::fenced_block b;
  1419. asio_handler_invoke_helpers::invoke(handler, handler);
  1420. }
  1421. }
  1422. private:
  1423. Handler handler_;
  1424. };
  1425. // Start an asynchronous connect.
  1426. template <typename Handler>
  1427. void async_connect(implementation_type& impl,
  1428. const endpoint_type& peer_endpoint, Handler handler)
  1429. {
  1430. // Allocate and construct an operation to wrap the handler.
  1431. typedef connect_op<Handler> value_type;
  1432. typedef handler_alloc_traits<Handler, value_type> alloc_traits;
  1433. raw_handler_ptr<alloc_traits> raw_ptr(handler);
  1434. handler_ptr<alloc_traits> ptr(raw_ptr, impl.socket_, handler);
  1435. start_connect_op(impl, ptr.get(), peer_endpoint);
  1436. ptr.release();
  1437. }
  1438. private:
  1439. // Start the asynchronous read or write operation.
  1440. void start_op(implementation_type& impl, int op_type,
  1441. reactor_op* op, bool non_blocking, bool noop)
  1442. {
  1443. if (!noop)
  1444. {
  1445. if (is_open(impl))
  1446. {
  1447. if (!non_blocking || is_non_blocking(impl)
  1448. || set_non_blocking(impl, op->ec_))
  1449. {
  1450. reactor_.start_op(op_type, impl.socket_,
  1451. impl.reactor_data_, op, non_blocking);
  1452. return;
  1453. }
  1454. }
  1455. else
  1456. op->ec_ = asio::error::bad_descriptor;
  1457. }
  1458. io_service_impl_.post_immediate_completion(op);
  1459. }
  1460. // Start the asynchronous accept operation.
  1461. void start_accept_op(implementation_type& impl,
  1462. reactor_op* op, bool peer_is_open)
  1463. {
  1464. if (!peer_is_open)
  1465. start_op(impl, reactor::read_op, op, true, false);
  1466. else
  1467. {
  1468. op->ec_ = asio::error::already_open;
  1469. io_service_impl_.post_immediate_completion(op);
  1470. }
  1471. }
  1472. // Start the asynchronous connect operation.
  1473. void start_connect_op(implementation_type& impl,
  1474. reactor_op* op, const endpoint_type& peer_endpoint)
  1475. {
  1476. if (is_open(impl))
  1477. {
  1478. if (is_non_blocking(impl) || set_non_blocking(impl, op->ec_))
  1479. {
  1480. if (socket_ops::connect(impl.socket_, peer_endpoint.data(),
  1481. peer_endpoint.size(), op->ec_) != 0)
  1482. {
  1483. if (op->ec_ == asio::error::in_progress
  1484. || op->ec_ == asio::error::would_block)
  1485. {
  1486. op->ec_ = asio::error_code();
  1487. reactor_.start_op(reactor::connect_op,
  1488. impl.socket_, impl.reactor_data_, op, false);
  1489. return;
  1490. }
  1491. }
  1492. }
  1493. }
  1494. else
  1495. op->ec_ = asio::error::bad_descriptor;
  1496. io_service_impl_.post_immediate_completion(op);
  1497. }
  1498. // Determine whether the socket has been set non-blocking.
  1499. bool is_non_blocking(implementation_type& impl) const
  1500. {
  1501. return (impl.flags_ & implementation_type::non_blocking);
  1502. }
  1503. // Set the internal non-blocking flag.
  1504. bool set_non_blocking(implementation_type& impl,
  1505. asio::error_code& ec)
  1506. {
  1507. ioctl_arg_type non_blocking = 1;
  1508. if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
  1509. return false;
  1510. impl.flags_ |= implementation_type::internal_non_blocking;
  1511. return true;
  1512. }
  1513. // The io_service implementation used to post completions.
  1514. io_service_impl& io_service_impl_;
  1515. // The selector that performs event demultiplexing for the service.
  1516. reactor& reactor_;
  1517. };
  1518. } // namespace detail
  1519. } // namespace asio
  1520. #include "asio/detail/pop_options.hpp"
  1521. #endif // ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP