session.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. # Copyright (C) 2009 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import sys
  16. import socket
  17. import struct
  18. import errno
  19. import os
  20. import threading
  21. import bind10_config
  22. import isc.cc.message
  23. import isc.log
  24. from isc.cc.logger import logger
  25. from isc.log_messages.pycc_messages import *
  26. from isc.cc.proto_defs import *
  27. class ProtocolError(Exception): pass
  28. class NetworkError(Exception): pass
  29. class SessionError(Exception): pass
  30. class SessionTimeout(Exception): pass
  31. class Session:
  32. MSGQ_DEFAULT_TIMEOUT = 4000
  33. def __init__(self, socket_file=None):
  34. self._socket = None
  35. self._lname = None
  36. self._sequence = 1
  37. self._closed = False
  38. self._queue = []
  39. self._lock = threading.RLock()
  40. self.set_timeout(self.MSGQ_DEFAULT_TIMEOUT);
  41. self._recv_len_size = 0
  42. self._recv_size = 0
  43. if socket_file is None:
  44. if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
  45. self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"]
  46. else:
  47. self.socket_file = bind10_config.BIND10_MSGQ_SOCKET_FILE
  48. else:
  49. self.socket_file = socket_file
  50. try:
  51. self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  52. self._socket.connect(self.socket_file)
  53. self.sendmsg({ CC_HEADER_TYPE: CC_COMMAND_GET_LNAME })
  54. env, msg = self.recvmsg(False)
  55. if not env:
  56. raise ProtocolError("Could not get local name")
  57. self._lname = msg[CC_HEADER_LNAME]
  58. if not self._lname:
  59. raise ProtocolError("Could not get local name")
  60. logger.debug(logger.DBGLVL_TRACE_BASIC, PYCC_LNAME_RECEIVED,
  61. self._lname)
  62. except socket.error as se:
  63. raise SessionError(se)
  64. @property
  65. def lname(self):
  66. return self._lname
  67. def close(self):
  68. self._socket.close()
  69. self._lname = None
  70. self._closed = True
  71. def sendmsg(self, env, msg=None):
  72. with self._lock:
  73. if self._closed:
  74. raise SessionError("Session has been closed.")
  75. if type(env) == dict:
  76. env = isc.cc.message.to_wire(env)
  77. if len(env) > 65535:
  78. raise ProtocolError("Envelope too large")
  79. if type(msg) == dict:
  80. msg = isc.cc.message.to_wire(msg)
  81. length = 2 + len(env);
  82. if msg is not None:
  83. length += len(msg)
  84. # Build entire message.
  85. data = struct.pack("!I", length)
  86. data += struct.pack("!H", len(env))
  87. data += env
  88. if msg is not None:
  89. data += msg
  90. # Send it in the blocking mode. On some systems send() may
  91. # actually send only part of the data, so we need to repeat it
  92. # until all data have been sent out.
  93. self._socket.setblocking(1)
  94. while len(data) > 0:
  95. cc = self._socket.send(data)
  96. data = data[cc:]
  97. def recvmsg(self, nonblock = True, seq = None):
  98. """Reads a message. If nonblock is true, and there is no
  99. message to read, it returns (None, None).
  100. If seq is not None, it should be a value as returned by
  101. group_sendmsg(), in which case only the response to
  102. that message is returned, and others will be queued until
  103. the next call to this method.
  104. If seq is None, only messages that are *not* responses
  105. will be returned, and responses will be queued.
  106. The queue is checked for relevant messages before data
  107. is read from the socket.
  108. Raises a SessionError if there is a JSON decode problem in
  109. the message that is read, or if the session has been closed
  110. prior to the call of recvmsg()"""
  111. with self._lock:
  112. if len(self._queue) > 0:
  113. i = 0;
  114. for env, msg in self._queue:
  115. if seq != None and CC_HEADER_REPLY in env and \
  116. seq == env[CC_HEADER_REPLY]:
  117. return self._queue.pop(i)
  118. elif seq == None and CC_HEADER_REPLY not in env:
  119. return self._queue.pop(i)
  120. else:
  121. i = i + 1
  122. if self._closed:
  123. raise SessionError("Session has been closed.")
  124. data = self._receive_full_buffer(nonblock)
  125. if data and len(data) > 2:
  126. header_length = struct.unpack('>H', data[0:2])[0]
  127. data_length = len(data) - 2 - header_length
  128. try:
  129. if data_length > 0:
  130. env = isc.cc.message.from_wire(data[2:header_length+2])
  131. msg = isc.cc.message.from_wire(data[header_length + 2:])
  132. if (seq == None and CC_HEADER_REPLY not in env) or \
  133. (seq != None and CC_HEADER_REPLY in env and
  134. seq == env[CC_HEADER_REPLY]):
  135. return env, msg
  136. else:
  137. self._queue.append((env,msg))
  138. return self.recvmsg(nonblock, seq)
  139. else:
  140. return isc.cc.message.from_wire(data[2:header_length+2]), None
  141. except ValueError as ve:
  142. # TODO: when we have logging here, add a debug
  143. # message printing the data that we were unable
  144. # to parse as JSON
  145. raise SessionError(ve)
  146. return None, None
  147. def _receive_bytes(self, size):
  148. """Try to get size bytes of data from the socket.
  149. Raises a ProtocolError if the size is 0.
  150. Raises any error from recv().
  151. Returns whatever data was available (if >0 bytes).
  152. """
  153. data = self._socket.recv(size)
  154. if len(data) == 0: # server closed connection
  155. raise ProtocolError("Read of 0 bytes: connection closed")
  156. return data
  157. def _receive_len_data(self):
  158. """Reads self._recv_len_size bytes of data from the socket into
  159. self._recv_len_data
  160. This is done through class variables so in the case of
  161. an EAGAIN we can continue on a subsequent call.
  162. Raises a ProtocolError, a socket.error (which may be
  163. timeout or eagain), or reads until we have all data we need.
  164. """
  165. while self._recv_len_size > 0:
  166. new_data = self._receive_bytes(self._recv_len_size)
  167. self._recv_len_data += new_data
  168. self._recv_len_size -= len(new_data)
  169. def _receive_data(self):
  170. """Reads self._recv_size bytes of data from the socket into
  171. self._recv_data.
  172. This is done through class variables so in the case of
  173. an EAGAIN we can continue on a subsequent call.
  174. Raises a ProtocolError, a socket.error (which may be
  175. timeout or eagain), or reads until we have all data we need.
  176. """
  177. while self._recv_size > 0:
  178. new_data = self._receive_bytes(self._recv_size)
  179. self._recv_data += new_data
  180. self._recv_size -= len(new_data)
  181. def _receive_full_buffer(self, nonblock):
  182. if nonblock:
  183. self._socket.setblocking(0)
  184. else:
  185. self._socket.setblocking(1)
  186. if self._socket_timeout == 0.0:
  187. self._socket.settimeout(None)
  188. else:
  189. self._socket.settimeout(self._socket_timeout)
  190. try:
  191. # we might be in a call following an EAGAIN, in which case
  192. # we simply continue. In the first case, either
  193. # recv_size or recv_len size are not zero
  194. # they may never both be non-zero (we are either starting
  195. # a full read, or continuing one of the reads
  196. assert self._recv_size == 0 or self._recv_len_size == 0
  197. if self._recv_size == 0:
  198. if self._recv_len_size == 0:
  199. # both zero, start a new full read
  200. self._recv_len_size = 4
  201. self._recv_len_data = bytearray()
  202. self._receive_len_data()
  203. self._recv_size = struct.unpack('>I', self._recv_len_data)[0]
  204. self._recv_data = bytearray()
  205. self._receive_data()
  206. # no EAGAIN, so copy data and reset internal counters
  207. data = self._recv_data
  208. self._recv_len_size = 0
  209. self._recv_size = 0
  210. return (data)
  211. except socket.timeout:
  212. raise SessionTimeout("recv() on cc session timed out")
  213. except socket.error as se:
  214. # Only keep data in case of EAGAIN
  215. if se.errno == errno.EAGAIN:
  216. return None
  217. # unknown state otherwise, best to drop data
  218. self._recv_len_size = 0
  219. self._recv_size = 0
  220. # ctrl-c can result in EINTR, return None to prevent
  221. # stacktrace output
  222. if se.errno == errno.EINTR:
  223. return None
  224. raise se
  225. def _next_sequence(self):
  226. self._sequence += 1
  227. return self._sequence
  228. def group_subscribe(self, group, instance=CC_INSTANCE_WILDCARD):
  229. self.sendmsg({
  230. CC_HEADER_TYPE: CC_COMMAND_SUBSCRIBE,
  231. CC_HEADER_GROUP: group,
  232. CC_HEADER_INSTANCE: instance,
  233. })
  234. def group_unsubscribe(self, group, instance=CC_INSTANCE_WILDCARD):
  235. self.sendmsg({
  236. CC_HEADER_TYPE: CC_COMMAND_UNSUBSCRIBE,
  237. CC_HEADER_GROUP: group,
  238. CC_HEADER_INSTANCE: instance,
  239. })
  240. def group_sendmsg(self, msg, group, instance=CC_INSTANCE_WILDCARD,
  241. to=CC_TO_WILDCARD, want_answer=False):
  242. '''
  243. Send a message over the CC session.
  244. Parameters:
  245. - msg The message to send, encoded as python structures (dicts,
  246. lists, etc).
  247. - group The recipient group to send to.
  248. - instance Instance in the group.
  249. - to Direct recipient (overrides the above, should contain the
  250. lname of the recipient).
  251. - want_answer If an answer is requested. If there's no recipient
  252. and this is true, the message queue would send an error message
  253. instead of the answer.
  254. Return:
  255. A sequence number that can be used to wait for an answer
  256. (see group_recvmsg).
  257. '''
  258. seq = self._next_sequence()
  259. self.sendmsg({
  260. CC_HEADER_TYPE: CC_COMMAND_SEND,
  261. CC_HEADER_FROM: self._lname,
  262. CC_HEADER_TO: to,
  263. CC_HEADER_GROUP: group,
  264. CC_HEADER_INSTANCE: instance,
  265. CC_HEADER_SEQ: seq,
  266. CC_HEADER_WANT_ANSWER: want_answer
  267. }, isc.cc.message.to_wire(msg))
  268. return seq
  269. def has_queued_msgs(self):
  270. return len(self._queue) > 0
  271. def group_recvmsg(self, nonblock = True, seq = None):
  272. env, msg = self.recvmsg(nonblock, seq)
  273. if env == None:
  274. # return none twice to match normal return value
  275. # (so caller won't get a type error on no data)
  276. return (None, None)
  277. return (msg, env)
  278. def group_reply(self, routing, msg):
  279. seq = self._next_sequence()
  280. self.sendmsg({
  281. CC_HEADER_TYPE: CC_COMMAND_SEND,
  282. CC_HEADER_FROM: self._lname,
  283. CC_HEADER_TO: routing[CC_HEADER_FROM],
  284. CC_HEADER_GROUP: routing[CC_HEADER_GROUP],
  285. CC_HEADER_INSTANCE: routing[CC_HEADER_INSTANCE],
  286. CC_HEADER_SEQ: seq,
  287. CC_HEADER_REPLY: routing[CC_HEADER_SEQ],
  288. }, isc.cc.message.to_wire(msg))
  289. return seq
  290. def set_timeout(self, milliseconds):
  291. """Sets the socket timeout for blocking reads to the given
  292. number of milliseconds"""
  293. self._socket_timeout = milliseconds / 1000.0
  294. def get_timeout(self):
  295. """Returns the current timeout for blocking reads (in milliseconds)"""
  296. return self._socket_timeout * 1000.0
  297. if __name__ == "__main__":
  298. import doctest
  299. doctest.testmod()