session.cc 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. #include "data.h"
  2. #include "session.h"
  3. #include <cstdio>
  4. #include <iostream>
  5. #include <sstream>
  6. using namespace std;
  7. using namespace ISC::CC;
  8. using namespace ISC::Data;
  9. #include <sys/types.h>
  10. #include <sys/socket.h>
  11. #include <netinet/in.h>
  12. Session::Session()
  13. {
  14. sock = -1;
  15. }
  16. void
  17. Session::disconnect()
  18. {
  19. close(sock);
  20. sock = -1;
  21. }
  22. void
  23. Session::establish()
  24. {
  25. int ret;
  26. struct sockaddr_in sin;
  27. sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  28. if (sock < -1)
  29. throw SessionError("socket() failed");
  30. sin.sin_len = sizeof(struct sockaddr_in);
  31. sin.sin_family = AF_INET;
  32. sin.sin_port = htons(9912);
  33. sin.sin_addr.s_addr = INADDR_ANY;
  34. ret = connect(sock, (struct sockaddr *)&sin, sizeof(sin));
  35. if (ret < 0)
  36. throw SessionError("connect() failed");
  37. //
  38. // send a request for our local name, and wait for a response
  39. //
  40. std::string get_lname_str = "{ \"type\": \"getlname\" }";
  41. std::stringstream get_lname_stream;
  42. get_lname_stream.str(get_lname_str);
  43. ElementPtr get_lname_msg = Element::create_from_string(get_lname_stream);
  44. sendmsg(get_lname_msg);
  45. ElementPtr msg;
  46. recvmsg(msg, false);
  47. lname = msg->get("lname")->string_value();
  48. cout << "My local name is: " << lname << endl;
  49. }
  50. //
  51. // Convert to wire format and send this on the TCP stream with its length prefix
  52. //
  53. void
  54. Session::sendmsg(ElementPtr& msg)
  55. {
  56. std::string wire = msg->to_wire();
  57. unsigned int length = wire.length();
  58. unsigned int length_net = htonl(length);
  59. unsigned int ret;
  60. ret = write(sock, &length_net, 4);
  61. if (ret != 4)
  62. throw SessionError("Short write");
  63. ret = write(sock, wire.c_str(), length);
  64. if (ret != length)
  65. throw SessionError("Short write");
  66. }
  67. bool
  68. Session::recvmsg(ElementPtr& msg, bool nonblock)
  69. {
  70. unsigned int length_net;
  71. unsigned int ret;
  72. ret = read(sock, &length_net, 4);
  73. if (ret != 4)
  74. throw SessionError("Short read");
  75. unsigned int length = ntohl(length_net);
  76. char *buffer = new char[length];
  77. ret = read(sock, buffer, length);
  78. if (ret != length)
  79. throw SessionError("Short read");
  80. std::string wire = std::string(buffer, length);
  81. delete [] buffer;
  82. std::stringstream wire_stream;
  83. wire_stream <<wire;
  84. msg = Element::from_wire(wire_stream, length);
  85. return (true);
  86. // XXXMLG handle non-block here, and return false for short reads
  87. }
  88. void
  89. Session::subscribe(std::string group, std::string instance, std::string subtype)
  90. {
  91. ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
  92. env->set("type", Element::create("subscribe"));
  93. env->set("group", Element::create(group));
  94. env->set("instance", Element::create(instance));
  95. env->set("subtype", Element::create(subtype));
  96. sendmsg(env);
  97. }
  98. void
  99. Session::unsubscribe(std::string group, std::string instance)
  100. {
  101. ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
  102. env->set("type", Element::create("unsubscribe"));
  103. env->set("group", Element::create(group));
  104. env->set("instance", Element::create(instance));
  105. sendmsg(env);
  106. }
  107. unsigned int
  108. Session::group_sendmsg(ElementPtr& msg, std::string group, std::string instance, std::string to)
  109. {
  110. ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
  111. env->set("type", Element::create("send"));
  112. env->set("from", Element::create(lname));
  113. env->set("to", Element::create(to));
  114. env->set("group", Element::create(group));
  115. env->set("instance", Element::create(instance));
  116. env->set("seq", Element::create(sequence));
  117. env->set("msg", Element::create(msg->to_wire()));
  118. sendmsg(env);
  119. return (sequence++);
  120. }
  121. bool
  122. Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg, bool nonblock)
  123. {
  124. bool got_message = recvmsg(envelope, nonblock);
  125. if (!got_message) {
  126. return false;
  127. }
  128. msg = Element::from_wire(envelope->get("msg")->string_value());
  129. envelope->remove("msg");
  130. return (true);
  131. }
  132. unsigned int
  133. Session::reply(ElementPtr& envelope, ElementPtr& newmsg)
  134. {
  135. ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
  136. env->set("type", Element::create("send"));
  137. env->set("from", Element::create(lname));
  138. env->set("to", Element::create(envelope->get("from")->string_value()));
  139. env->set("group", Element::create(envelope->get("group")->string_value()));
  140. env->set("instance", Element::create(envelope->get("instance")->string_value()));
  141. env->set("seq", Element::create(sequence));
  142. env->set("msg", Element::create(newmsg->to_wire()));
  143. env->set("reply", Element::create(envelope->get("seq")->string_value()));
  144. sendmsg(env);
  145. return (sequence++);
  146. }