session.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. # Copyright (C) 2009 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import sys
  16. import socket
  17. import struct
  18. import os
  19. import threading
  20. import bind10_config
  21. import isc.cc.message
  22. class ProtocolError(Exception): pass
  23. class NetworkError(Exception): pass
  24. class SessionError(Exception): pass
  25. class Session:
  26. def __init__(self, socket_file=None):
  27. self._socket = None
  28. self._lname = None
  29. self._recvbuffer = bytearray()
  30. self._recvlength = 0
  31. self._sequence = 1
  32. self._closed = False
  33. self._queue = []
  34. self._lock = threading.RLock()
  35. if socket_file is None:
  36. if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
  37. self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"]
  38. else:
  39. self.socket_file = bind10_config.BIND10_MSGQ_SOCKET_FILE
  40. else:
  41. self.socket_file = socket_file
  42. try:
  43. self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  44. self._socket.connect(self.socket_file)
  45. self.sendmsg({ "type": "getlname" })
  46. env, msg = self.recvmsg(False)
  47. if not env:
  48. raise ProtocolError("Could not get local name")
  49. self._lname = msg["lname"]
  50. if not self._lname:
  51. raise ProtocolError("Could not get local name")
  52. except socket.error as se:
  53. raise SessionError(se)
  54. @property
  55. def lname(self):
  56. return self._lname
  57. def close(self):
  58. self._socket.close()
  59. self._lname = None
  60. self._closed = True
  61. def sendmsg(self, env, msg = None):
  62. with self._lock:
  63. if self._closed:
  64. raise SessionError("Session has been closed.")
  65. if type(env) == dict:
  66. env = isc.cc.message.to_wire(env)
  67. if type(msg) == dict:
  68. msg = isc.cc.message.to_wire(msg)
  69. self._socket.setblocking(1)
  70. length = 2 + len(env);
  71. if msg:
  72. length += len(msg)
  73. self._socket.send(struct.pack("!I", length))
  74. self._socket.send(struct.pack("!H", len(env)))
  75. self._socket.send(env)
  76. if msg:
  77. self._socket.send(msg)
  78. def recvmsg(self, nonblock = True, seq = None):
  79. with self._lock:
  80. if len(self._queue) > 0:
  81. i = 0;
  82. for env, msg in self._queue:
  83. if seq != None and "reply" in env and seq == env["reply"]:
  84. return self._queue.pop(i)
  85. elif seq == None and "reply" not in env:
  86. return self._queue.pop(i)
  87. else:
  88. i = i + 1
  89. if self._closed:
  90. raise SessionError("Session has been closed.")
  91. data = self._receive_full_buffer(nonblock)
  92. if data and len(data) > 2:
  93. header_length = struct.unpack('>H', data[0:2])[0]
  94. data_length = len(data) - 2 - header_length
  95. if data_length > 0:
  96. env = isc.cc.message.from_wire(data[2:header_length+2])
  97. msg = isc.cc.message.from_wire(data[header_length + 2:])
  98. if (seq == None and "reply" not in env) or (seq != None and "reply" in env and seq == env["reply"]):
  99. return env, msg
  100. else:
  101. tmp = None
  102. if "reply" in env:
  103. tmp = env["reply"]
  104. self._queue.append((env,msg))
  105. return self.recvmsg(nonblock, seq)
  106. else:
  107. return isc.cc.message.from_wire(data[2:header_length+2]), None
  108. return None, None
  109. def _receive_full_buffer(self, nonblock):
  110. if nonblock:
  111. self._socket.setblocking(0)
  112. else:
  113. self._socket.setblocking(1)
  114. if self._recvlength == 0:
  115. length = 4
  116. length -= len(self._recvbuffer)
  117. try:
  118. data = self._socket.recv(length)
  119. except:
  120. return None
  121. if data == "": # server closed connection
  122. raise ProtocolError("Read of 0 bytes: connection closed")
  123. self._recvbuffer += data
  124. if len(self._recvbuffer) < 4:
  125. return None
  126. self._recvlength = struct.unpack('>I', self._recvbuffer)[0]
  127. self._recvbuffer = bytearray()
  128. length = self._recvlength - len(self._recvbuffer)
  129. while (length > 0):
  130. try:
  131. data = self._socket.recv(length)
  132. except:
  133. return None
  134. if data == "": # server closed connection
  135. raise ProtocolError("Read of 0 bytes: connection closed")
  136. self._recvbuffer += data
  137. length -= len(data)
  138. data = self._recvbuffer
  139. self._recvbuffer = bytearray()
  140. self._recvlength = 0
  141. return (data)
  142. def _next_sequence(self):
  143. self._sequence += 1
  144. return self._sequence
  145. def group_subscribe(self, group, instance = "*"):
  146. self.sendmsg({
  147. "type": "subscribe",
  148. "group": group,
  149. "instance": instance,
  150. })
  151. def group_unsubscribe(self, group, instance = "*"):
  152. self.sendmsg({
  153. "type": "unsubscribe",
  154. "group": group,
  155. "instance": instance,
  156. })
  157. def group_sendmsg(self, msg, group, instance = "*", to = "*"):
  158. seq = self._next_sequence()
  159. self.sendmsg({
  160. "type": "send",
  161. "from": self._lname,
  162. "to": to,
  163. "group": group,
  164. "instance": instance,
  165. "seq": seq,
  166. }, isc.cc.message.to_wire(msg))
  167. return seq
  168. def has_queued_msgs(self):
  169. return len(self._queue) > 0
  170. def group_recvmsg(self, nonblock = True, seq = None):
  171. env, msg = self.recvmsg(nonblock, seq)
  172. if env == None:
  173. # return none twice to match normal return value
  174. # (so caller won't get a type error on no data)
  175. return (None, None)
  176. return (msg, env)
  177. def group_reply(self, routing, msg):
  178. seq = self._next_sequence()
  179. self.sendmsg({
  180. "type": "send",
  181. "from": self._lname,
  182. "to": routing["from"],
  183. "group": routing["group"],
  184. "instance": routing["instance"],
  185. "seq": seq,
  186. "reply": routing["seq"],
  187. }, isc.cc.message.to_wire(msg))
  188. return seq
  189. if __name__ == "__main__":
  190. import doctest
  191. doctest.testmod()