|
@@ -70,6 +70,13 @@ SPECFILE_LOCATION = SPECFILE_PATH + "/msgq.spec"
|
|
|
|
|
|
class MsgQReceiveError(Exception): pass
|
|
|
|
|
|
+class MsgQCloseOnReceive(Exception):
|
|
|
+ def __init__(self, reason, partial_read):
|
|
|
+ self.partial_read = partial_read
|
|
|
+ self.__reason = reason
|
|
|
+ def __str__(self):
|
|
|
+ return self.__reason
|
|
|
+
|
|
|
class SubscriptionManager:
|
|
|
def __init__(self, cfgmgr_ready):
|
|
|
"""
|
|
@@ -332,7 +339,7 @@ class MsgQ:
|
|
|
del self.sendbuffs[fd]
|
|
|
logger.debug(TRACE_BASIC, MSGQ_SOCK_CLOSE, fd)
|
|
|
|
|
|
- def getbytes(self, fd, sock, length):
|
|
|
+ def __getbytes(self, fd, sock, length, continued):
|
|
|
"""Get exactly the requested bytes, or raise an exception if
|
|
|
EOF."""
|
|
|
received = b''
|
|
@@ -340,16 +347,19 @@ class MsgQ:
|
|
|
try:
|
|
|
data = sock.recv(length - len(received))
|
|
|
except socket.error as err:
|
|
|
+ if err.errno == errno.ECONNRESET:
|
|
|
+ raise MsgQCloseOnReceive(str(err), continued)
|
|
|
raise MsgQReceiveError(str(err))
|
|
|
if len(data) == 0:
|
|
|
- raise MsgQReceiveError("EOF")
|
|
|
+ raise MsgQCloseOnReceive("EOF", continued)
|
|
|
received += data
|
|
|
+ continued = True
|
|
|
return received
|
|
|
|
|
|
def read_packet(self, fd, sock):
|
|
|
"""Read a correctly formatted packet. Will raise exceptions if
|
|
|
something fails."""
|
|
|
- lengths = self.getbytes(fd, sock, 6)
|
|
|
+ lengths = self.__getbytes(fd, sock, 6, False)
|
|
|
overall_length, routing_length = struct.unpack(">IH", lengths)
|
|
|
if overall_length < 2:
|
|
|
raise MsgQReceiveError("overall_length < 2")
|
|
@@ -360,9 +370,9 @@ class MsgQ:
|
|
|
raise MsgQReceiveError("routing_length == 0")
|
|
|
data_length = overall_length - routing_length
|
|
|
# probably need to sanity check lengths here...
|
|
|
- routing = self.getbytes(fd, sock, routing_length)
|
|
|
+ routing = self.__getbytes(fd, sock, routing_length, True)
|
|
|
if data_length > 0:
|
|
|
- data = self.getbytes(fd, sock, data_length)
|
|
|
+ data = self.__getbytes(fd, sock, data_length, True)
|
|
|
else:
|
|
|
data = None
|
|
|
return (routing, data)
|
|
@@ -371,8 +381,11 @@ class MsgQ:
|
|
|
"""Process one packet."""
|
|
|
try:
|
|
|
routing, data = self.read_packet(fd, sock)
|
|
|
- except MsgQReceiveError as err:
|
|
|
- logger.error(MSGQ_RECV_ERR, fd, err)
|
|
|
+ except (MsgQReceiveError, MsgQCloseOnReceive) as err:
|
|
|
+ if isinstance(err, MsgQCloseOnReceive) and not err.partial_read:
|
|
|
+ logger.debug(TRACE_BASIC, MSGQ_CLOSE_ON_RECV, fd)
|
|
|
+ else:
|
|
|
+ logger.error(MSGQ_RECV_ERR, fd, err)
|
|
|
self.kill_socket(fd, sock)
|
|
|
return
|
|
|
|