session.py 11 KB


  1. # Copyright (C) 2009 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import sys
  16. import socket
  17. import struct
  18. import errno
  19. import os
  20. import threading
  21. import bind10_config
  22. import isc.cc.message
  23. class ProtocolError(Exception): pass
  24. class NetworkError(Exception): pass
  25. class SessionError(Exception): pass
  26. class SessionTimeout(Exception): pass
  27. class Session:
  28. MSGQ_DEFAULT_TIMEOUT = 4000
  29. def __init__(self, socket_file=None):
  30. self._socket = None
  31. self._lname = None
  32. self._sequence = 1
  33. self._closed = False
  34. self._queue = []
  35. self._lock = threading.RLock()
  36. self.set_timeout(self.MSGQ_DEFAULT_TIMEOUT);
  37. self._recv_len_size = 0
  38. self._recv_size = 0
  39. if socket_file is None:
  40. if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
  41. self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"]
  42. else:
  43. self.socket_file = bind10_config.BIND10_MSGQ_SOCKET_FILE
  44. else:
  45. self.socket_file = socket_file
  46. try:
  47. self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  48. self._socket.connect(self.socket_file)
  49. self.sendmsg({ "type": "getlname" })
  50. env, msg = self.recvmsg(False)
  51. if not env:
  52. raise ProtocolError("Could not get local name")
  53. self._lname = msg["lname"]
  54. if not self._lname:
  55. raise ProtocolError("Could not get local name")
  56. except socket.error as se:
  57. raise SessionError(se)
  58. @property
  59. def lname(self):
  60. return self._lname
  61. def close(self):
  62. self._socket.close()
  63. self._lname = None
  64. self._closed = True
  65. def sendmsg(self, env, msg = None):
  66. with self._lock:
  67. if self._closed:
  68. raise SessionError("Session has been closed.")
  69. if type(env) == dict:
  70. env = isc.cc.message.to_wire(env)
  71. if len(env) > 65535:
  72. raise ProtocolError("Envelope too large")
  73. if type(msg) == dict:
  74. msg = isc.cc.message.to_wire(msg)
  75. self._socket.setblocking(1)
  76. length = 2 + len(env);
  77. if msg:
  78. length += len(msg)
  79. self._socket.send(struct.pack("!I", length))
  80. self._socket.send(struct.pack("!H", len(env)))
  81. self._socket.send(env)
  82. if msg:
  83. self._socket.send(msg)
  84. def recvmsg(self, nonblock = True, seq = None):
  85. """Reads a message. If nonblock is true, and there is no
  86. message to read, it returns (None, None).
  87. If seq is not None, it should be a value as returned by
  88. group_sendmsg(), in which case only the response to
  89. that message is returned, and others will be queued until
  90. the next call to this method.
  91. If seq is None, only messages that are *not* responses
  92. will be returned, and responses will be queued.
  93. The queue is checked for relevant messages before data
  94. is read from the socket.
  95. Raises a SessionError if there is a JSON decode problem in
  96. the message that is read, or if the session has been closed
  97. prior to the call of recvmsg()"""
  98. with self._lock:
  99. if len(self._queue) > 0:
  100. i = 0;
  101. for env, msg in self._queue:
  102. if seq != None and "reply" in env and seq == env["reply"]:
  103. return self._queue.pop(i)
  104. elif seq == None and "reply" not in env:
  105. return self._queue.pop(i)
  106. else:
  107. i = i + 1
  108. if self._closed:
  109. raise SessionError("Session has been closed.")
  110. data = self._receive_full_buffer(nonblock)
  111. if data and len(data) > 2:
  112. header_length = struct.unpack('>H', data[0:2])[0]
  113. data_length = len(data) - 2 - header_length
  114. try:
  115. if data_length > 0:
  116. env = isc.cc.message.from_wire(data[2:header_length+2])
  117. msg = isc.cc.message.from_wire(data[header_length + 2:])
  118. if (seq == None and "reply" not in env) or (seq != None and "reply" in env and seq == env["reply"]):
  119. return env, msg
  120. else:
  121. self._queue.append((env,msg))
  122. return self.recvmsg(nonblock, seq)
  123. else:
  124. return isc.cc.message.from_wire(data[2:header_length+2]), None
  125. except ValueError as ve:
  126. # TODO: when we have logging here, add a debug
  127. # message printing the data that we were unable
  128. # to parse as JSON
  129. raise SessionError(ve)
  130. return None, None
  131. def _receive_bytes(self, size):
  132. """Try to get size bytes of data from the socket.
  133. Raises a ProtocolError if the size is 0.
  134. Raises any error from recv().
  135. Returns whatever data was available (if >0 bytes).
  136. """
  137. data = self._socket.recv(size)
  138. if len(data) == 0: # server closed connection
  139. raise ProtocolError("Read of 0 bytes: connection closed")
  140. return data
  141. def _receive_len_data(self):
  142. """Reads self._recv_len_size bytes of data from the socket into
  143. self._recv_len_data
  144. This is done through class variables so in the case of
  145. an EAGAIN we can continue on a subsequent call.
  146. Raises a ProtocolError, a socket.error (which may be
  147. timeout or eagain), or reads until we have all data we need.
  148. """
  149. while self._recv_len_size > 0:
  150. new_data = self._receive_bytes(self._recv_len_size)
  151. self._recv_len_data += new_data
  152. self._recv_len_size -= len(new_data)
  153. def _receive_data(self):
  154. """Reads self._recv_size bytes of data from the socket into
  155. self._recv_data.
  156. This is done through class variables so in the case of
  157. an EAGAIN we can continue on a subsequent call.
  158. Raises a ProtocolError, a socket.error (which may be
  159. timeout or eagain), or reads until we have all data we need.
  160. """
  161. while self._recv_size > 0:
  162. new_data = self._receive_bytes(self._recv_size)
  163. self._recv_data += new_data
  164. self._recv_size -= len(new_data)
  165. def _receive_full_buffer(self, nonblock):
  166. if nonblock:
  167. self._socket.setblocking(0)
  168. else:
  169. self._socket.setblocking(1)
  170. if self._socket_timeout == 0.0:
  171. self._socket.settimeout(None)
  172. else:
  173. self._socket.settimeout(self._socket_timeout)
  174. try:
  175. # we might be in a call following an EAGAIN, in which case
  176. # we simply continue. In the first case, either
  177. # recv_size or recv_len size are not zero
  178. # they may never both be non-zero (we are either starting
  179. # a full read, or continuing one of the reads
  180. assert self._recv_size == 0 or self._recv_len_size == 0
  181. if self._recv_size == 0:
  182. if self._recv_len_size == 0:
  183. # both zero, start a new full read
  184. self._recv_len_size = 4
  185. self._recv_len_data = bytearray()
  186. self._receive_len_data()
  187. self._recv_size = struct.unpack('>I', self._recv_len_data)[0]
  188. self._recv_data = bytearray()
  189. self._receive_data()
  190. # no EAGAIN, so copy data and reset internal counters
  191. data = self._recv_data
  192. self._recv_len_size = 0
  193. self._recv_size = 0
  194. return (data)
  195. except socket.timeout:
  196. raise SessionTimeout("recv() on cc session timed out")
  197. except socket.error as se:
  198. # Only keep data in case of EAGAIN
  199. if se.errno == errno.EAGAIN:
  200. return None
  201. # unknown state otherwise, best to drop data
  202. self._recv_len_size = 0
  203. self._recv_size = 0
  204. # ctrl-c can result in EINTR, return None to prevent
  205. # stacktrace output
  206. if se.errno == errno.EINTR:
  207. return None
  208. raise se
  209. def _next_sequence(self):
  210. self._sequence += 1
  211. return self._sequence
  212. def group_subscribe(self, group, instance = "*"):
  213. self.sendmsg({
  214. "type": "subscribe",
  215. "group": group,
  216. "instance": instance,
  217. })
  218. def group_unsubscribe(self, group, instance = "*"):
  219. self.sendmsg({
  220. "type": "unsubscribe",
  221. "group": group,
  222. "instance": instance,
  223. })
  224. def group_sendmsg(self, msg, group, instance = "*", to = "*"):
  225. seq = self._next_sequence()
  226. self.sendmsg({
  227. "type": "send",
  228. "from": self._lname,
  229. "to": to,
  230. "group": group,
  231. "instance": instance,
  232. "seq": seq,
  233. }, isc.cc.message.to_wire(msg))
  234. return seq
  235. def has_queued_msgs(self):
  236. return len(self._queue) > 0
  237. def group_recvmsg(self, nonblock = True, seq = None):
  238. env, msg = self.recvmsg(nonblock, seq)
  239. if env == None:
  240. # return none twice to match normal return value
  241. # (so caller won't get a type error on no data)
  242. return (None, None)
  243. return (msg, env)
  244. def group_reply(self, routing, msg):
  245. seq = self._next_sequence()
  246. self.sendmsg({
  247. "type": "send",
  248. "from": self._lname,
  249. "to": routing["from"],
  250. "group": routing["group"],
  251. "instance": routing["instance"],
  252. "seq": seq,
  253. "reply": routing["seq"],
  254. }, isc.cc.message.to_wire(msg))
  255. return seq
  256. def set_timeout(self, milliseconds):
  257. """Sets the socket timeout for blocking reads to the given
  258. number of milliseconds"""
  259. self._socket_timeout = milliseconds / 1000.0
  260. def get_timeout(self):
  261. """Returns the current timeout for blocking reads (in milliseconds)"""
  262. return self._socket_timeout * 1000.0
  263. if __name__ == "__main__":
  264. import doctest
  265. doctest.testmod()