|
@@ -71,6 +71,15 @@ SPECFILE_LOCATION = SPECFILE_PATH + "/msgq.spec"
|
|
class MsgQReceiveError(Exception): pass
|
|
class MsgQReceiveError(Exception): pass
|
|
|
|
|
|
class MsgQCloseOnReceive(Exception):
|
|
class MsgQCloseOnReceive(Exception):
|
|
|
|
+ '''Exception raised when reading data from a socket results in "shutdown.
|
|
|
|
+
|
|
|
|
+ This can be either getting 0-length data or via ECONNRESET socket.error
|
|
|
|
+ exception. This class holds whether it happens in the middle of reading
|
|
|
|
+ (i.e. after reading some) via partial_read parameter, which is set to True
|
|
|
|
+ if and only if so. This will be used by an upper layer cathing the
|
|
|
|
+ exception to distinguish severity of the event.
|
|
|
|
+
|
|
|
|
+ "'''
|
|
def __init__(self, reason, partial_read):
|
|
def __init__(self, reason, partial_read):
|
|
self.partial_read = partial_read
|
|
self.partial_read = partial_read
|
|
self.__reason = reason
|
|
self.__reason = reason
|
|
@@ -328,8 +337,12 @@ class MsgQ:
|
|
|
|
|
|
def kill_socket(self, fd, sock):
|
|
def kill_socket(self, fd, sock):
|
|
"""Fully close down the socket."""
|
|
"""Fully close down the socket."""
|
|
|
|
+ # Unregister events on the socket. Note that we don't have to do
|
|
|
|
+ # this for kqueue because the registered events are automatically
|
|
|
|
+ # deleted when the corresponding socket is closed.
|
|
if self.poller:
|
|
if self.poller:
|
|
self.poller.unregister(sock)
|
|
self.poller.unregister(sock)
|
|
|
|
+
|
|
self.subs.unsubscribe_all(sock)
|
|
self.subs.unsubscribe_all(sock)
|
|
lname = [ k for k, v in self.lnames.items() if v == sock ][0]
|
|
lname = [ k for k, v in self.lnames.items() if v == sock ][0]
|
|
del self.lnames[lname]
|
|
del self.lnames[lname]
|
|
@@ -341,11 +354,20 @@ class MsgQ:
|
|
|
|
|
|
def __getbytes(self, fd, sock, length, continued):
|
|
def __getbytes(self, fd, sock, length, continued):
|
|
"""Get exactly the requested bytes, or raise an exception if
|
|
"""Get exactly the requested bytes, or raise an exception if
|
|
- EOF."""
|
|
+ EOF.
|
|
|
|
+
|
|
|
|
+ continued is set to True if this method is called to complete
|
|
|
|
+ already read data.
|
|
|
|
+ """
|
|
received = b''
|
|
received = b''
|
|
while len(received) < length:
|
|
while len(received) < length:
|
|
try:
|
|
try:
|
|
data = sock.recv(length - len(received))
|
|
data = sock.recv(length - len(received))
|
|
|
|
+
|
|
|
|
+ # If the remote client has closed the socket there seems to be
|
|
|
|
+ # two possible cases: getting ECONNRESET or receiving empty data.
|
|
|
|
+ # These cases are possible in normal operation, so we report them
|
|
|
|
+ # using MsgQCloseOnReceive.
|
|
except socket.error as err:
|
|
except socket.error as err:
|
|
if err.errno == errno.ECONNRESET:
|
|
if err.errno == errno.ECONNRESET:
|
|
raise MsgQCloseOnReceive(str(err), continued)
|
|
raise MsgQCloseOnReceive(str(err), continued)
|
|
@@ -382,6 +404,10 @@ class MsgQ:
|
|
try:
|
|
try:
|
|
routing, data = self.read_packet(fd, sock)
|
|
routing, data = self.read_packet(fd, sock)
|
|
except (MsgQReceiveError, MsgQCloseOnReceive) as err:
|
|
except (MsgQReceiveError, MsgQCloseOnReceive) as err:
|
|
|
|
+ # If it's MsgQCloseOnReceive and that happens without reading
|
|
|
|
+ # any data, it basically means the remote clinet has closed the
|
|
|
|
+ # socket, so we log it as debug information. Otherwise, it's
|
|
|
|
+ # a somewhat unexpected event, so we consider it an "error".
|
|
if isinstance(err, MsgQCloseOnReceive) and not err.partial_read:
|
|
if isinstance(err, MsgQCloseOnReceive) and not err.partial_read:
|
|
logger.debug(TRACE_BASIC, MSGQ_CLOSE_ON_RECV, fd)
|
|
logger.debug(TRACE_BASIC, MSGQ_CLOSE_ON_RECV, fd)
|
|
else:
|
|
else:
|