session.cc 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. // Copyright (C) 2009 Internet Systems Consortium, Inc. ("ISC")
  2. //
  3. // Permission to use, copy, modify, and/or distribute this software for any
  4. // purpose with or without fee is hereby granted, provided that the above
  5. // copyright notice and this permission notice appear in all copies.
  6. //
  7. // THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
  8. // REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
  9. // AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
  10. // INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
  11. // LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
  12. // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
  13. // PERFORMANCE OF THIS SOFTWARE.
  14. // $Id$
  15. #include <config.h>
  16. #include "session_config.h"
  17. #include <stdint.h>
  18. #include <cstdio>
  19. #include <vector>
  20. #include <iostream>
  21. #include <sstream>
  22. #include <sys/un.h>
  23. #include <boost/bind.hpp>
  24. #include <boost/function.hpp>
  25. #include <asio.hpp>
  26. #include <asio/error_code.hpp>
  27. #include <asio/system_error.hpp>
  28. #include <exceptions/exceptions.h>
  29. #include "data.h"
  30. #include "session.h"
  31. using namespace std;
  32. using namespace isc::cc;
  33. using namespace isc::data;
  34. // some of the asio names conflict with socket API system calls
  35. // (e.g. write(2)) so we don't import the entire asio namespace.
  36. using asio::io_service;
  37. using asio::ip::tcp;
  38. #include <sys/types.h>
  39. #include <sys/socket.h>
  40. #include <netinet/in.h>
  41. namespace isc {
  42. namespace cc {
  43. class SessionImpl {
  44. public:
  45. SessionImpl() : sequence_(-1) { queue_ = Element::createFromString("[]"); }
  46. virtual ~SessionImpl() {}
  47. virtual void establish(const char& socket_file) = 0;
  48. virtual int getSocket() = 0;
  49. virtual void disconnect() = 0;
  50. virtual void writeData(const void* data, size_t datalen) = 0;
  51. virtual size_t readDataLength() = 0;
  52. virtual void readData(void* data, size_t datalen) = 0;
  53. virtual void startRead(boost::function<void()> user_handler) = 0;
  54. int sequence_; // the next sequence number to use
  55. std::string lname_;
  56. ElementPtr queue_;
  57. };
  58. class ASIOSession : public SessionImpl {
  59. public:
  60. ASIOSession(io_service& io_service) :
  61. io_service_(io_service), socket_(io_service_), data_length_(0)
  62. {}
  63. virtual void establish(const char& socket_file);
  64. virtual void disconnect();
  65. virtual int getSocket() { return (socket_.native()); }
  66. virtual void writeData(const void* data, size_t datalen);
  67. virtual size_t readDataLength();
  68. virtual void readData(void* data, size_t datalen);
  69. virtual void startRead(boost::function<void()> user_handler);
  70. private:
  71. void internalRead(const asio::error_code& error,
  72. size_t bytes_transferred);
  73. private:
  74. io_service& io_service_;
  75. asio::local::stream_protocol::socket socket_;
  76. uint32_t data_length_;
  77. boost::function<void()> user_handler_;
  78. asio::error_code error_;
  79. };
  80. void
  81. ASIOSession::establish(const char& socket_file) {
  82. try {
  83. socket_.connect(asio::local::stream_protocol::endpoint(&socket_file), error_);
  84. } catch (asio::system_error& se) {
  85. isc_throw(SessionError, se.what());
  86. }
  87. if (error_) {
  88. isc_throw(SessionError, "Unable to connect to message queue.");
  89. }
  90. }
  91. void
  92. ASIOSession::disconnect() {
  93. socket_.close();
  94. data_length_ = 0;
  95. }
  96. void
  97. ASIOSession::writeData(const void* data, size_t datalen) {
  98. try {
  99. asio::write(socket_, asio::buffer(data, datalen));
  100. } catch (const asio::system_error& asio_ex) {
  101. isc_throw(SessionError, "ASIO write failed: " << asio_ex.what());
  102. }
  103. }
  104. size_t
  105. ASIOSession::readDataLength() {
  106. size_t ret_len = data_length_;
  107. if (ret_len == 0) {
  108. readData(&data_length_, sizeof(data_length_));
  109. if (data_length_ == 0) {
  110. isc_throw(SessionError, "ASIO read: data length is not ready");
  111. }
  112. ret_len = ntohl(data_length_);
  113. }
  114. data_length_ = 0;
  115. return (ret_len);
  116. }
  117. void
  118. ASIOSession::readData(void* data, size_t datalen) {
  119. try {
  120. asio::read(socket_, asio::buffer(data, datalen));
  121. } catch (const asio::system_error& asio_ex) {
  122. // to hide boost specific exceptions, we catch them explicitly
  123. // and convert it to SessionError.
  124. isc_throw(SessionError, "ASIO read failed: " << asio_ex.what());
  125. }
  126. }
  127. void
  128. ASIOSession::startRead(boost::function<void()> user_handler) {
  129. data_length_ = 0;
  130. user_handler_ = user_handler;
  131. async_read(socket_, asio::buffer(&data_length_,
  132. sizeof(data_length_)),
  133. boost::bind(&ASIOSession::internalRead, this,
  134. asio::placeholders::error,
  135. asio::placeholders::bytes_transferred));
  136. }
  137. void
  138. ASIOSession::internalRead(const asio::error_code& error,
  139. size_t bytes_transferred)
  140. {
  141. if (!error) {
  142. assert(bytes_transferred == sizeof(data_length_));
  143. data_length_ = ntohl(data_length_);
  144. if (data_length_ == 0) {
  145. isc_throw(SessionError, "Invalid message length (0)");
  146. }
  147. user_handler_();
  148. } else {
  149. isc_throw(SessionError, "asynchronous read failed");
  150. }
  151. }
  152. class SocketSession : public SessionImpl {
  153. public:
  154. SocketSession() : sock_(-1) {}
  155. virtual ~SocketSession() { disconnect(); }
  156. virtual int getSocket() { return (sock_); }
  157. void establish(const char& socket_file);
  158. virtual void disconnect()
  159. {
  160. if (sock_ >= 0) {
  161. close(sock_);
  162. }
  163. sock_ = -1;
  164. }
  165. virtual void writeData(const void* data, size_t datalen);
  166. virtual void readData(void* data, size_t datalen);
  167. virtual size_t readDataLength();
  168. virtual void startRead(boost::function<void()> user_handler UNUSED_PARAM)
  169. {} // nothing to do for this class
  170. private:
  171. int sock_;
  172. };
  173. namespace { // maybe unnecessary.
  174. // This is a helper class to make the establish() method (below) exception-safe
  175. // with the RAII approach.
  176. class SessionHolder {
  177. public:
  178. SessionHolder(SessionImpl* obj) : impl_obj_(obj) {}
  179. ~SessionHolder()
  180. {
  181. if (impl_obj_ != NULL) {
  182. impl_obj_->disconnect();
  183. }
  184. }
  185. void clear() { impl_obj_ = NULL; }
  186. SessionImpl* impl_obj_;
  187. };
  188. }
  189. void
  190. SocketSession::establish(const char& socket_file) {
  191. struct sockaddr_un sun;
  192. #ifdef HAVE_SA_LEN
  193. sun.sun_len = sizeof(struct sockaddr_un);
  194. #endif
  195. if (strlen(&socket_file) >= sizeof(sun.sun_path)) {
  196. isc_throw(SessionError, "Unable to connect to message queue; "
  197. "socket file path too long: " << socket_file);
  198. }
  199. sun.sun_family = AF_UNIX;
  200. strncpy(sun.sun_path, &socket_file, sizeof(sun.sun_path) - 1);
  201. int s = socket(AF_UNIX, SOCK_STREAM, 0);
  202. if (s < 0) {
  203. isc_throw(SessionError, "socket() failed");
  204. }
  205. if (connect(s, (struct sockaddr *)&sun, sizeof(sun)) < 0) {
  206. close(s);
  207. isc_throw(SessionError, "Unable to connect to message queue");
  208. }
  209. sock_ = s;
  210. }
  211. void
  212. SocketSession::writeData(const void* data, const size_t datalen) {
  213. int cc = write(sock_, data, datalen);
  214. if (cc != datalen) {
  215. isc_throw(SessionError, "Write failed: expect " << datalen <<
  216. ", actual " << cc);
  217. }
  218. }
  219. size_t
  220. SocketSession::readDataLength() {
  221. uint32_t length;
  222. readData(&length, sizeof(length));
  223. return (ntohl(length));
  224. }
  225. void
  226. SocketSession::readData(void* data, const size_t datalen) {
  227. int cc = read(sock_, data, datalen);
  228. if (cc != datalen) {
  229. isc_throw(SessionError, "Read failed: expect " << datalen <<
  230. ", actual " << cc);
  231. }
  232. }
  233. Session::Session() : impl_(new SocketSession)
  234. {}
  235. Session::Session(io_service& io_service) : impl_(new ASIOSession(io_service))
  236. {}
  237. Session::~Session() {
  238. delete impl_;
  239. }
  240. void
  241. Session::disconnect() {
  242. impl_->disconnect();
  243. }
  244. int
  245. Session::getSocket() const {
  246. return (impl_->getSocket());
  247. }
  248. void
  249. Session::startRead(boost::function<void()> read_callback) {
  250. impl_->startRead(read_callback);
  251. }
  252. void
  253. Session::establish(const char* socket_file) {
  254. if (socket_file == NULL) {
  255. socket_file = getenv("BIND10_MSGQ_SOCKET_FILE");
  256. }
  257. if (socket_file == NULL) {
  258. socket_file = BIND10_MSGQ_SOCKET_FILE;
  259. }
  260. impl_->establish(*socket_file);
  261. // once established, encapsulate the implementation object so that we
  262. // can safely release the internal resource when exception happens
  263. // below.
  264. SessionHolder session_holder(impl_);
  265. //
  266. // send a request for our local name, and wait for a response
  267. //
  268. ElementPtr get_lname_msg =
  269. Element::createFromString("{ \"type\": \"getlname\" }");
  270. sendmsg(get_lname_msg);
  271. ElementPtr routing, msg;
  272. recvmsg(routing, msg, false);
  273. impl_->lname_ = msg->get("lname")->stringValue();
  274. cout << "My local name is: " << impl_->lname_ << endl;
  275. // At this point there's no risk of resource leak.
  276. session_holder.clear();
  277. }
  278. //
  279. // Convert to wire format and send this on the TCP stream with its length prefix
  280. //
  281. void
  282. Session::sendmsg(ElementPtr& msg) {
  283. std::string header_wire = msg->toWire();
  284. unsigned int length = 2 + header_wire.length();
  285. unsigned int length_net = htonl(length);
  286. unsigned short header_length = header_wire.length();
  287. unsigned short header_length_net = htons(header_length);
  288. impl_->writeData(&length_net, sizeof(length_net));
  289. impl_->writeData(&header_length_net, sizeof(header_length_net));
  290. impl_->writeData(header_wire.data(), header_length);
  291. }
  292. void
  293. Session::sendmsg(ElementPtr& env, ElementPtr& msg) {
  294. std::string header_wire = env->toWire();
  295. std::string body_wire = msg->toWire();
  296. unsigned int length = 2 + header_wire.length() + body_wire.length();
  297. unsigned int length_net = htonl(length);
  298. unsigned short header_length = header_wire.length();
  299. unsigned short header_length_net = htons(header_length);
  300. impl_->writeData(&length_net, sizeof(length_net));
  301. impl_->writeData(&header_length_net, sizeof(header_length_net));
  302. impl_->writeData(header_wire.data(), header_length);
  303. impl_->writeData(body_wire.data(), body_wire.length());
  304. }
  305. bool
  306. Session::recvmsg(ElementPtr& msg, bool nonblock, int seq) {
  307. ElementPtr l_env;
  308. return recvmsg(l_env, msg, nonblock, seq);
  309. }
  310. bool
  311. Session::recvmsg(ElementPtr& env, ElementPtr& msg,
  312. bool nonblock, int seq) {
  313. size_t length = impl_->readDataLength();
  314. ElementPtr l_env, l_msg;
  315. if (hasQueuedMsgs()) {
  316. ElementPtr q_el;
  317. for (int i = 0; i < impl_->queue_->size(); i++) {
  318. q_el = impl_->queue_->get(i);
  319. if (( seq == -1 &&
  320. !q_el->get(0)->contains("reply")
  321. ) || (
  322. q_el->get(0)->contains("reply") &&
  323. q_el->get(0)->get("reply")->intValue() == seq
  324. )
  325. ) {
  326. env = q_el->get(0);
  327. msg = q_el->get(1);
  328. impl_->queue_->remove(i);
  329. return true;
  330. }
  331. }
  332. }
  333. unsigned short header_length_net;
  334. impl_->readData(&header_length_net, sizeof(header_length_net));
  335. unsigned short header_length = ntohs(header_length_net);
  336. if (header_length > length || length < 2) {
  337. isc_throw(SessionError, "Length parameters invalid: total=" << length
  338. << ", header=" << header_length);
  339. }
  340. // remove the header-length bytes from the total length
  341. length -= 2;
  342. std::vector<char> buffer(length);
  343. impl_->readData(&buffer[0], length);
  344. std::string header_wire = std::string(&buffer[0], header_length);
  345. std::string body_wire = std::string(&buffer[0] + header_length,
  346. length - header_length);
  347. std::stringstream header_wire_stream;
  348. header_wire_stream << header_wire;
  349. l_env = Element::fromWire(header_wire_stream, header_length);
  350. std::stringstream body_wire_stream;
  351. body_wire_stream << body_wire;
  352. l_msg = Element::fromWire(body_wire_stream, length - header_length);
  353. if ((seq == -1 &&
  354. !l_env->contains("reply")
  355. ) || (
  356. l_env->contains("reply") &&
  357. l_env->get("reply")->intValue() == seq
  358. )
  359. ) {
  360. env = l_env;
  361. msg = l_msg;
  362. return true;
  363. } else {
  364. ElementPtr q_el = Element::createFromString("[]");
  365. q_el->add(l_env);
  366. q_el->add(l_msg);
  367. impl_->queue_->add(q_el);
  368. return recvmsg(env, msg, nonblock, seq);
  369. }
  370. // XXXMLG handle non-block here, and return false for short reads
  371. }
  372. void
  373. Session::subscribe(std::string group, std::string instance) {
  374. ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
  375. env->set("type", Element::create("subscribe"));
  376. env->set("group", Element::create(group));
  377. env->set("instance", Element::create(instance));
  378. sendmsg(env);
  379. }
  380. void
  381. Session::unsubscribe(std::string group, std::string instance) {
  382. ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
  383. env->set("type", Element::create("unsubscribe"));
  384. env->set("group", Element::create(group));
  385. env->set("instance", Element::create(instance));
  386. sendmsg(env);
  387. }
  388. int
  389. Session::group_sendmsg(ElementPtr msg, std::string group,
  390. std::string instance, std::string to)
  391. {
  392. ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
  393. int nseq = ++impl_->sequence_;
  394. env->set("type", Element::create("send"));
  395. env->set("from", Element::create(impl_->lname_));
  396. env->set("to", Element::create(to));
  397. env->set("group", Element::create(group));
  398. env->set("instance", Element::create(instance));
  399. env->set("seq", Element::create(nseq));
  400. //env->set("msg", Element::create(msg->toWire()));
  401. sendmsg(env, msg);
  402. return nseq;
  403. }
  404. bool
  405. Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg,
  406. bool nonblock, int seq)
  407. {
  408. return (recvmsg(envelope, msg, nonblock, seq));
  409. }
  410. int
  411. Session::reply(ElementPtr& envelope, ElementPtr& newmsg) {
  412. ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
  413. int nseq = ++impl_->sequence_;
  414. env->set("type", Element::create("send"));
  415. env->set("from", Element::create(impl_->lname_));
  416. env->set("to", Element::create(envelope->get("from")->stringValue()));
  417. env->set("group", Element::create(envelope->get("group")->stringValue()));
  418. env->set("instance", Element::create(envelope->get("instance")->stringValue()));
  419. env->set("seq", Element::create(nseq));
  420. env->set("reply", Element::create(envelope->get("seq")->intValue()));
  421. sendmsg(env, newmsg);
  422. return nseq;
  423. }
  424. bool
  425. Session::hasQueuedMsgs()
  426. {
  427. return (impl_->queue_->size() > 0);
  428. }
  429. }
  430. }