|
@@ -93,6 +93,19 @@ class Session:
|
|
|
self._socket.send(msg)
|
|
|
|
|
|
def recvmsg(self, nonblock = True, seq = None):
|
|
|
+ """Reads a message. If nonblock is true, and there is no
|
|
|
+ message to read, it returns (None, None).
|
|
|
+ If seq is not None, it should be a value as returned by
|
|
|
+ group_sendmsg(), in which case only the response to
|
|
|
+ that message is returned, and others will be queued until
|
|
|
+ the next call to this method.
|
|
|
+ If seq is None, only messages that are *not* responses
|
|
|
+ will be returned, and responses will be queued.
|
|
|
+ The queue is checked for relevant messages before data
|
|
|
+ is read from the socket.
|
|
|
+ Raises a SessionError if there is a JSON decode problem in
|
|
|
+ the message that is read, or if the session has been closed
|
|
|
+ prior to the call of recvmsg()"""
|
|
|
with self._lock:
|
|
|
if len(self._queue) > 0:
|
|
|
i = 0;
|
|
@@ -109,16 +122,22 @@ class Session:
|
|
|
if data and len(data) > 2:
|
|
|
header_length = struct.unpack('>H', data[0:2])[0]
|
|
|
data_length = len(data) - 2 - header_length
|
|
|
- if data_length > 0:
|
|
|
- env = isc.cc.message.from_wire(data[2:header_length+2])
|
|
|
- msg = isc.cc.message.from_wire(data[header_length + 2:])
|
|
|
- if (seq == None and "reply" not in env) or (seq != None and "reply" in env and seq == env["reply"]):
|
|
|
- return env, msg
|
|
|
+ try:
|
|
|
+ if data_length > 0:
|
|
|
+ env = isc.cc.message.from_wire(data[2:header_length+2])
|
|
|
+ msg = isc.cc.message.from_wire(data[header_length + 2:])
|
|
|
+ if (seq == None and "reply" not in env) or (seq != None and "reply" in env and seq == env["reply"]):
|
|
|
+ return env, msg
|
|
|
+ else:
|
|
|
+ self._queue.append((env,msg))
|
|
|
+ return self.recvmsg(nonblock, seq)
|
|
|
else:
|
|
|
- self._queue.append((env,msg))
|
|
|
- return self.recvmsg(nonblock, seq)
|
|
|
- else:
|
|
|
- return isc.cc.message.from_wire(data[2:header_length+2]), None
|
|
|
+ return isc.cc.message.from_wire(data[2:header_length+2]), None
|
|
|
+ except ValueError as ve:
|
|
|
+ # TODO: when we have logging here, add a debug
|
|
|
+ # message printing the data that we were unable
|
|
|
+ # to parse as JSON
|
|
|
+ raise SessionError(ve)
|
|
|
return None, None
|
|
|
|
|
|
def _receive_bytes(self, size):
|