session.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. # Copyright (C) 2009 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import sys
  16. import socket
  17. import struct
  18. import errno
  19. import os
  20. import threading
  21. import bind10_config
  22. import isc.cc.message
  23. from isc.util.common_defs import *
  24. class ProtocolError(Exception): pass
  25. class NetworkError(Exception): pass
  26. class SessionError(Exception): pass
  27. class SessionTimeout(Exception): pass
  28. class Session:
  29. MSGQ_DEFAULT_TIMEOUT = 4000
  30. def __init__(self, socket_file=None):
  31. self._socket = None
  32. self._lname = None
  33. self._sequence = 1
  34. self._closed = False
  35. self._queue = []
  36. self._lock = threading.RLock()
  37. self.set_timeout(self.MSGQ_DEFAULT_TIMEOUT);
  38. self._recv_len_size = 0
  39. self._recv_size = 0
  40. if socket_file is None:
  41. if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
  42. self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"]
  43. else:
  44. self.socket_file = bind10_config.BIND10_MSGQ_SOCKET_FILE
  45. else:
  46. self.socket_file = socket_file
  47. try:
  48. self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  49. self._socket.connect(self.socket_file)
  50. self.sendmsg({ "type": "getlname" })
  51. env, msg = self.recvmsg(False)
  52. if not env:
  53. raise ProtocolError("Could not get local name")
  54. self._lname = msg["lname"]
  55. if not self._lname:
  56. raise ProtocolError("Could not get local name")
  57. except socket.error as se:
  58. raise SessionError(se)
  59. @property
  60. def lname(self):
  61. return self._lname
  62. def close(self):
  63. self._socket.close()
  64. self._lname = None
  65. self._closed = True
  66. def sendmsg(self, env, msg=None):
  67. with self._lock:
  68. if self._closed:
  69. raise SessionError("Session has been closed.")
  70. if type(env) == dict:
  71. env = isc.cc.message.to_wire(env)
  72. if len(env) > 65535:
  73. raise ProtocolError("Envelope too large")
  74. if type(msg) == dict:
  75. msg = isc.cc.message.to_wire(msg)
  76. length = 2 + len(env);
  77. if msg is not None:
  78. length += len(msg)
  79. # Build entire message.
  80. data = struct.pack("!I", length)
  81. data += struct.pack("!H", len(env))
  82. data += env
  83. if msg is not None:
  84. data += msg
  85. # Send it in the blocking mode. On some systems send() may
  86. # actually send only part of the data, so we need to repeat it
  87. # until all data have been sent out.
  88. self._socket.setblocking(1)
  89. while len(data) > 0:
  90. cc = self._socket.send(data)
  91. data = data[cc:]
  92. def recvmsg(self, nonblock = True, seq = None):
  93. """Reads a message. If nonblock is true, and there is no
  94. message to read, it returns (None, None).
  95. If seq is not None, it should be a value as returned by
  96. group_sendmsg(), in which case only the response to
  97. that message is returned, and others will be queued until
  98. the next call to this method.
  99. If seq is None, only messages that are *not* responses
  100. will be returned, and responses will be queued.
  101. The queue is checked for relevant messages before data
  102. is read from the socket.
  103. Raises a SessionError if there is a JSON decode problem in
  104. the message that is read, or if the session has been closed
  105. prior to the call of recvmsg()"""
  106. with self._lock:
  107. if len(self._queue) > 0:
  108. i = 0;
  109. for env, msg in self._queue:
  110. if seq != None and "reply" in env and seq == env["reply"]:
  111. return self._queue.pop(i)
  112. elif seq == None and "reply" not in env:
  113. return self._queue.pop(i)
  114. else:
  115. i = i + 1
  116. if self._closed:
  117. raise SessionError("Session has been closed.")
  118. data = self._receive_full_buffer(nonblock)
  119. if data and len(data) > 2:
  120. header_length = struct.unpack('>H', data[0:2])[0]
  121. data_length = len(data) - 2 - header_length
  122. try:
  123. if data_length > 0:
  124. env = isc.cc.message.from_wire(data[2:header_length+2])
  125. msg = isc.cc.message.from_wire(data[header_length + 2:])
  126. if (seq == None and "reply" not in env) or (seq != None and "reply" in env and seq == env["reply"]):
  127. return env, msg
  128. else:
  129. self._queue.append((env,msg))
  130. return self.recvmsg(nonblock, seq)
  131. else:
  132. return isc.cc.message.from_wire(data[2:header_length+2]), None
  133. except ValueError as ve:
  134. # TODO: when we have logging here, add a debug
  135. # message printing the data that we were unable
  136. # to parse as JSON
  137. raise SessionError(ve)
  138. return None, None
  139. def _receive_bytes(self, size):
  140. """Try to get size bytes of data from the socket.
  141. Raises a ProtocolError if the size is 0.
  142. Raises any error from recv().
  143. Returns whatever data was available (if >0 bytes).
  144. """
  145. data = self._socket.recv(size)
  146. if len(data) == 0: # server closed connection
  147. raise ProtocolError("Read of 0 bytes: connection closed")
  148. return data
  149. def _receive_len_data(self):
  150. """Reads self._recv_len_size bytes of data from the socket into
  151. self._recv_len_data
  152. This is done through class variables so in the case of
  153. an EAGAIN we can continue on a subsequent call.
  154. Raises a ProtocolError, a socket.error (which may be
  155. timeout or eagain), or reads until we have all data we need.
  156. """
  157. while self._recv_len_size > 0:
  158. new_data = self._receive_bytes(self._recv_len_size)
  159. self._recv_len_data += new_data
  160. self._recv_len_size -= len(new_data)
  161. def _receive_data(self):
  162. """Reads self._recv_size bytes of data from the socket into
  163. self._recv_data.
  164. This is done through class variables so in the case of
  165. an EAGAIN we can continue on a subsequent call.
  166. Raises a ProtocolError, a socket.error (which may be
  167. timeout or eagain), or reads until we have all data we need.
  168. """
  169. while self._recv_size > 0:
  170. new_data = self._receive_bytes(self._recv_size)
  171. self._recv_data += new_data
  172. self._recv_size -= len(new_data)
  173. def _receive_full_buffer(self, nonblock):
  174. if nonblock:
  175. self._socket.setblocking(0)
  176. else:
  177. self._socket.setblocking(1)
  178. if self._socket_timeout == 0.0:
  179. self._socket.settimeout(None)
  180. else:
  181. self._socket.settimeout(self._socket_timeout)
  182. try:
  183. # we might be in a call following an EAGAIN, in which case
  184. # we simply continue. In the first case, either
  185. # recv_size or recv_len size are not zero
  186. # they may never both be non-zero (we are either starting
  187. # a full read, or continuing one of the reads
  188. assert self._recv_size == 0 or self._recv_len_size == 0
  189. if self._recv_size == 0:
  190. if self._recv_len_size == 0:
  191. # both zero, start a new full read
  192. self._recv_len_size = 4
  193. self._recv_len_data = bytearray()
  194. self._receive_len_data()
  195. self._recv_size = struct.unpack('>I', self._recv_len_data)[0]
  196. self._recv_data = bytearray()
  197. self._receive_data()
  198. # no EAGAIN, so copy data and reset internal counters
  199. data = self._recv_data
  200. self._recv_len_size = 0
  201. self._recv_size = 0
  202. return (data)
  203. except socket.timeout:
  204. raise SessionTimeout("recv() on cc session timed out")
  205. except socket.error as se:
  206. # Only keep data in case of EAGAIN
  207. if se.errno == errno.EAGAIN:
  208. return None
  209. # unknown state otherwise, best to drop data
  210. self._recv_len_size = 0
  211. self._recv_size = 0
  212. # ctrl-c can result in EINTR, return None to prevent
  213. # stacktrace output
  214. if se.errno == errno.EINTR:
  215. return None
  216. raise se
  217. def _next_sequence(self):
  218. self._sequence += 1
  219. return self._sequence
  220. def group_subscribe(self, group, instance = "*"):
  221. self.sendmsg({
  222. "type": "subscribe",
  223. "group": group,
  224. "instance": instance,
  225. })
  226. def group_unsubscribe(self, group, instance = "*"):
  227. self.sendmsg({
  228. "type": "unsubscribe",
  229. "group": group,
  230. "instance": instance,
  231. })
  232. def group_sendmsg(self, msg, group, instance=CC_INSTANCE_WILDCARD,
  233. to=CC_TO_WILDCARD, want_answer=False):
  234. '''
  235. Send a message over the CC session.
  236. Parameters:
  237. - msg The message to send, encoded as python structures (dicts,
  238. lists, etc).
  239. - group The recipient group to send to.
  240. - instance Instance in the group.
  241. - to Direct recipient (overrides the above, should contain the
  242. lname of the recipient).
  243. - want_answer If an answer is requested. If there's no recipient
  244. and this is true, the message queue would send an error message
  245. instead of the answer.
  246. Return:
  247. A sequence number that can be used to wait for an answer
  248. (see group_recvmsg).
  249. '''
  250. seq = self._next_sequence()
  251. self.sendmsg({
  252. CC_HEADER_TYPE: CC_COMMAND_SEND,
  253. CC_HEADER_FROM: self._lname,
  254. CC_HEADER_TO: to,
  255. CC_HEADER_GROUP: group,
  256. CC_HEADER_INSTANCE: instance,
  257. CC_HEADER_SEQ: seq,
  258. CC_HEADER_WANT_ANSWER: want_answer
  259. }, isc.cc.message.to_wire(msg))
  260. return seq
  261. def has_queued_msgs(self):
  262. return len(self._queue) > 0
  263. def group_recvmsg(self, nonblock = True, seq = None):
  264. env, msg = self.recvmsg(nonblock, seq)
  265. if env == None:
  266. # return none twice to match normal return value
  267. # (so caller won't get a type error on no data)
  268. return (None, None)
  269. return (msg, env)
  270. def group_reply(self, routing, msg):
  271. seq = self._next_sequence()
  272. self.sendmsg({
  273. "type": "send",
  274. "from": self._lname,
  275. "to": routing["from"],
  276. "group": routing["group"],
  277. "instance": routing["instance"],
  278. "seq": seq,
  279. "reply": routing["seq"],
  280. }, isc.cc.message.to_wire(msg))
  281. return seq
  282. def set_timeout(self, milliseconds):
  283. """Sets the socket timeout for blocking reads to the given
  284. number of milliseconds"""
  285. self._socket_timeout = milliseconds / 1000.0
  286. def get_timeout(self):
  287. """Returns the current timeout for blocking reads (in milliseconds)"""
  288. return self._socket_timeout * 1000.0
  289. if __name__ == "__main__":
  290. import doctest
  291. doctest.testmod()