session.cc 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. // Copyright (C) 2009 Internet Systems Consortium, Inc. ("ISC")
  2. //
  3. // Permission to use, copy, modify, and/or distribute this software for any
  4. // purpose with or without fee is hereby granted, provided that the above
  5. // copyright notice and this permission notice appear in all copies.
  6. //
  7. // THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
  8. // REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
  9. // AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
  10. // INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
  11. // LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
  12. // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
  13. // PERFORMANCE OF THIS SOFTWARE.
  14. // $Id$
  15. #include "data.h"
  16. #include "session.h"
  17. #include <cstdio>
  18. #include <vector>
  19. #include <iostream>
  20. #include <sstream>
  21. using namespace std;
  22. using namespace isc::cc;
  23. using namespace isc::data;
  24. #include <sys/types.h>
  25. #include <sys/socket.h>
  26. #include <netinet/in.h>
  27. Session::Session()
  28. {
  29. sock = -1;
  30. sequence = 1;
  31. }
  32. void
  33. Session::disconnect()
  34. {
  35. close(sock);
  36. sock = -1;
  37. }
  38. void
  39. Session::establish()
  40. {
  41. int ret;
  42. struct sockaddr_in sin;
  43. sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  44. if (sock < -1)
  45. throw SessionError("socket() failed");
  46. sin.sin_family = AF_INET;
  47. sin.sin_port = htons(9912);
  48. sin.sin_addr.s_addr = INADDR_ANY;
  49. #ifdef HAVE_SIN_LEN
  50. sin.sin_len = sizeof(struct sockaddr_in);
  51. #endif
  52. ret = connect(sock, (struct sockaddr *)&sin, sizeof(sin));
  53. if (ret < 0)
  54. throw SessionError("Unable to connect to message queue");
  55. //
  56. // send a request for our local name, and wait for a response
  57. //
  58. std::string get_lname_str = "{ \"type\": \"getlname\" }";
  59. std::stringstream get_lname_stream;
  60. get_lname_stream.str(get_lname_str);
  61. ElementPtr get_lname_msg = Element::createFromString(get_lname_stream);
  62. sendmsg(get_lname_msg);
  63. ElementPtr routing, msg;
  64. recvmsg(routing, msg, false);
  65. lname = msg->get("lname")->stringValue();
  66. cout << "My local name is: " << lname << endl;
  67. }
  68. //
  69. // Convert to wire format and send this on the TCP stream with its length prefix
  70. //
  71. void
  72. Session::sendmsg(ElementPtr& msg)
  73. {
  74. std::string header_wire = msg->toWire();
  75. unsigned int length = 2 + header_wire.length();
  76. unsigned int length_net = htonl(length);
  77. unsigned short header_length = header_wire.length();
  78. unsigned short header_length_net = htons(header_length);
  79. unsigned int ret;
  80. ret = write(sock, &length_net, 4);
  81. if (ret != 4)
  82. throw SessionError("Short write");
  83. ret = write(sock, &header_length_net, 2);
  84. if (ret != 2)
  85. throw SessionError("Short write");
  86. ret = write(sock, header_wire.c_str(), header_length);
  87. if (ret != header_length) {
  88. throw SessionError("Short write");
  89. }
  90. }
  91. void
  92. Session::sendmsg(ElementPtr& env, ElementPtr& msg)
  93. {
  94. std::string header_wire = env->toWire();
  95. std::string body_wire = msg->toWire();
  96. unsigned int length = 2 + header_wire.length() + body_wire.length();
  97. unsigned int length_net = htonl(length);
  98. unsigned short header_length = header_wire.length();
  99. unsigned short header_length_net = htons(header_length);
  100. unsigned int ret;
  101. ret = write(sock, &length_net, 4);
  102. if (ret != 4)
  103. throw SessionError("Short write");
  104. ret = write(sock, &header_length_net, 2);
  105. if (ret != 2)
  106. throw SessionError("Short write");
  107. ret = write(sock, header_wire.c_str(), header_length);
  108. if (ret != header_length) {
  109. throw SessionError("Short write");
  110. }
  111. ret = write(sock, body_wire.c_str(), body_wire.length());
  112. if (ret != body_wire.length()) {
  113. throw SessionError("Short write");
  114. }
  115. }
  116. bool
  117. Session::recvmsg(ElementPtr& msg, bool nonblock)
  118. {
  119. unsigned int length_net;
  120. unsigned short header_length_net;
  121. unsigned int ret;
  122. ret = read(sock, &length_net, 4);
  123. if (ret != 4)
  124. throw SessionError("Short read");
  125. ret = read(sock, &header_length_net, 2);
  126. if (ret != 2)
  127. throw SessionError("Short read");
  128. unsigned int length = ntohl(length_net) - 2;
  129. unsigned short header_length = ntohs(header_length_net);
  130. if (header_length != length) {
  131. throw SessionError("Received non-empty body where only a header expected");
  132. }
  133. std::vector<char> buffer(length);
  134. ret = read(sock, &buffer[0], length);
  135. if (ret != length) {
  136. throw SessionError("Short read");
  137. }
  138. std::string wire = std::string(&buffer[0], length);
  139. std::stringstream wire_stream;
  140. wire_stream << wire;
  141. msg = Element::fromWire(wire_stream, length);
  142. return (true);
  143. // XXXMLG handle non-block here, and return false for short reads
  144. }
  145. bool
  146. Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock)
  147. {
  148. unsigned int length_net;
  149. unsigned short header_length_net;
  150. unsigned int ret;
  151. ret = read(sock, &length_net, 4);
  152. if (ret != 4)
  153. throw SessionError("Short read");
  154. ret = read(sock, &header_length_net, 2);
  155. if (ret != 2)
  156. throw SessionError("Short read");
  157. unsigned int length = ntohl(length_net);
  158. unsigned short header_length = ntohs(header_length_net);
  159. if (header_length > length)
  160. throw SessionError("Bad header length");
  161. // remove the header-length bytes from the total length
  162. length -= 2;
  163. std::vector<char> buffer(length);
  164. ret = read(sock, &buffer[0], length);
  165. if (ret != length) {
  166. throw SessionError("Short read");
  167. }
  168. std::string header_wire = std::string(&buffer[0], header_length);
  169. std::string body_wire = std::string(&buffer[0] + header_length, length - header_length);
  170. std::stringstream header_wire_stream;
  171. header_wire_stream << header_wire;
  172. env = Element::fromWire(header_wire_stream, header_length);
  173. std::stringstream body_wire_stream;
  174. body_wire_stream << body_wire;
  175. msg = Element::fromWire(body_wire_stream, length - header_length);
  176. return (true);
  177. // XXXMLG handle non-block here, and return false for short reads
  178. }
  179. void
  180. Session::subscribe(std::string group, std::string instance)
  181. {
  182. ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
  183. env->set("type", Element::create("subscribe"));
  184. env->set("group", Element::create(group));
  185. env->set("instance", Element::create(instance));
  186. sendmsg(env);
  187. }
  188. void
  189. Session::unsubscribe(std::string group, std::string instance)
  190. {
  191. ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
  192. env->set("type", Element::create("unsubscribe"));
  193. env->set("group", Element::create(group));
  194. env->set("instance", Element::create(instance));
  195. sendmsg(env);
  196. }
  197. unsigned int
  198. Session::group_sendmsg(ElementPtr msg, std::string group,
  199. std::string instance, std::string to)
  200. {
  201. ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
  202. env->set("type", Element::create("send"));
  203. env->set("from", Element::create(lname));
  204. env->set("to", Element::create(to));
  205. env->set("group", Element::create(group));
  206. env->set("instance", Element::create(instance));
  207. env->set("seq", Element::create(sequence));
  208. //env->set("msg", Element::create(msg->toWire()));
  209. sendmsg(env, msg);
  210. return (sequence++);
  211. }
  212. bool
  213. Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg, bool nonblock)
  214. {
  215. bool got_message = recvmsg(envelope, msg, nonblock);
  216. if (!got_message) {
  217. return false;
  218. }
  219. return (true);
  220. }
  221. unsigned int
  222. Session::reply(ElementPtr& envelope, ElementPtr& newmsg)
  223. {
  224. ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
  225. env->set("type", Element::create("send"));
  226. env->set("from", Element::create(lname));
  227. env->set("to", Element::create(envelope->get("from")->stringValue()));
  228. env->set("group", Element::create(envelope->get("group")->stringValue()));
  229. env->set("instance", Element::create(envelope->get("instance")->stringValue()));
  230. env->set("seq", Element::create(sequence));
  231. env->set("reply", Element::create(envelope->get("seq")->intValue()));
  232. sendmsg(env, newmsg);
  233. return (sequence++);
  234. }