|
@@ -70,6 +70,23 @@ SPECFILE_LOCATION = SPECFILE_PATH + "/msgq.spec"
|
|
|
|
|
|
class MsgQReceiveError(Exception): pass
|
|
|
|
|
|
+class MsgQCloseOnReceive(Exception):
|
|
|
+ """Exception raised when reading data from a socket results in 'shutdown'.
|
|
|
+
|
|
|
+ This happens when msgq received 0-length data. This class holds whether
|
|
|
+ it happens in the middle of reading (i.e. after reading some) via
|
|
|
+ partial_read parameter, which is set to True if and only if so.
|
|
|
+ This will be used by an upper layer catching the exception to distinguish
|
|
|
+ the severity of the event.
|
|
|
+
|
|
|
+ """
|
|
|
+ def __init__(self, reason, partial_read):
|
|
|
+ self.partial_read = partial_read
|
|
|
+ self.__reason = reason
|
|
|
+
|
|
|
+ def __str__(self):
|
|
|
+ return self.__reason
|
|
|
+
|
|
|
class SubscriptionManager:
|
|
|
def __init__(self, cfgmgr_ready):
|
|
|
"""
|
|
@@ -311,23 +328,22 @@ class MsgQ:
|
|
|
lname = self.newlname()
|
|
|
self.lnames[lname] = newsocket
|
|
|
|
|
|
+ logger.debug(TRACE_BASIC, MSGQ_SOCKET_REGISTERED, newsocket.fileno(),
|
|
|
+ lname)
|
|
|
+
|
|
|
if self.poller:
|
|
|
self.poller.register(newsocket, select.POLLIN)
|
|
|
else:
|
|
|
self.add_kqueue_socket(newsocket)
|
|
|
|
|
|
- def process_socket(self, fd):
|
|
|
- """Process a read on a socket."""
|
|
|
- if not fd in self.sockets:
|
|
|
- logger.error(MSGQ_READ_UNKNOWN_FD, fd)
|
|
|
- return
|
|
|
- sock = self.sockets[fd]
|
|
|
- self.process_packet(fd, sock)
|
|
|
-
|
|
|
def kill_socket(self, fd, sock):
|
|
|
"""Fully close down the socket."""
|
|
|
+ # Unregister events on the socket. Note that we don't have to do
|
|
|
+ # this for kqueue because the registered events are automatically
|
|
|
+ # deleted when the corresponding socket is closed.
|
|
|
if self.poller:
|
|
|
self.poller.unregister(sock)
|
|
|
+
|
|
|
self.subs.unsubscribe_all(sock)
|
|
|
lname = [ k for k, v in self.lnames.items() if v == sock ][0]
|
|
|
del self.lnames[lname]
|
|
@@ -337,24 +353,35 @@ class MsgQ:
|
|
|
del self.sendbuffs[fd]
|
|
|
logger.debug(TRACE_BASIC, MSGQ_SOCK_CLOSE, fd)
|
|
|
|
|
|
- def getbytes(self, fd, sock, length):
|
|
|
+ def __getbytes(self, fd, sock, length, continued):
|
|
|
"""Get exactly the requested bytes, or raise an exception if
|
|
|
- EOF."""
|
|
|
+ EOF.
|
|
|
+
|
|
|
+ continued is set to True if this method is called to complete
|
|
|
+ already read data.
|
|
|
+ """
|
|
|
received = b''
|
|
|
while len(received) < length:
|
|
|
try:
|
|
|
data = sock.recv(length - len(received))
|
|
|
- except socket.error:
|
|
|
- raise MsgQReceiveError(socket.error)
|
|
|
+
|
|
|
+ except socket.error as err:
|
|
|
+ # This case includes ECONNRESET, which seems to happen when
|
|
|
+ # the remote client has closed its socket at some subtle
|
|
|
+ # timing (it should normally result in receiving empty data).
|
|
|
+ # Since we didn't figure out how exactly that could happen,
|
|
|
+ # we treat it just like other really-unexpected socket errors.
|
|
|
+ raise MsgQReceiveError(str(err))
|
|
|
if len(data) == 0:
|
|
|
- raise MsgQReceiveError("EOF")
|
|
|
+ raise MsgQCloseOnReceive("EOF", continued)
|
|
|
received += data
|
|
|
+ continued = True
|
|
|
return received
|
|
|
|
|
|
def read_packet(self, fd, sock):
|
|
|
"""Read a correctly formatted packet. Will raise exceptions if
|
|
|
something fails."""
|
|
|
- lengths = self.getbytes(fd, sock, 6)
|
|
|
+ lengths = self.__getbytes(fd, sock, 6, False)
|
|
|
overall_length, routing_length = struct.unpack(">IH", lengths)
|
|
|
if overall_length < 2:
|
|
|
raise MsgQReceiveError("overall_length < 2")
|
|
@@ -365,9 +392,9 @@ class MsgQ:
|
|
|
raise MsgQReceiveError("routing_length == 0")
|
|
|
data_length = overall_length - routing_length
|
|
|
# probably need to sanity check lengths here...
|
|
|
- routing = self.getbytes(fd, sock, routing_length)
|
|
|
+ routing = self.__getbytes(fd, sock, routing_length, True)
|
|
|
if data_length > 0:
|
|
|
- data = self.getbytes(fd, sock, data_length)
|
|
|
+ data = self.__getbytes(fd, sock, data_length, True)
|
|
|
else:
|
|
|
data = None
|
|
|
return (routing, data)
|
|
@@ -376,8 +403,15 @@ class MsgQ:
|
|
|
"""Process one packet."""
|
|
|
try:
|
|
|
routing, data = self.read_packet(fd, sock)
|
|
|
- except MsgQReceiveError as err:
|
|
|
- logger.error(MSGQ_RECV_ERR, fd, err)
|
|
|
+ except (MsgQReceiveError, MsgQCloseOnReceive) as err:
|
|
|
+ # If it's MsgQCloseOnReceive and that happens without reading
|
|
|
+ # any data, it basically means the remote clinet has closed the
|
|
|
+ # socket, so we log it as debug information. Otherwise, it's
|
|
|
+ # a somewhat unexpected event, so we consider it an "error".
|
|
|
+ if isinstance(err, MsgQCloseOnReceive) and not err.partial_read:
|
|
|
+ logger.debug(TRACE_BASIC, MSGQ_CLOSE_ON_RECV, fd)
|
|
|
+ else:
|
|
|
+ logger.error(MSGQ_RECV_ERR, fd, err)
|
|
|
self.kill_socket(fd, sock)
|
|
|
return
|
|
|
|
|
@@ -428,9 +462,12 @@ class MsgQ:
|
|
|
def sendmsg(self, sock, env, msg = None):
|
|
|
self.send_prepared_msg(sock, self.preparemsg(env, msg))
|
|
|
|
|
|
- def __send_data(self, sock, data):
|
|
|
+ def _send_data(self, sock, data):
|
|
|
"""
|
|
|
- Send a piece of data to the given socket.
|
|
|
+ Send a piece of data to the given socket. This method is
|
|
|
+ essentially "private" to MsgQ, but defined as if it were "protected"
|
|
|
+ for easier access from tests.
|
|
|
+
|
|
|
Parameters:
|
|
|
sock: The socket to send to
|
|
|
data: The list of bytes to send
|
|
@@ -446,15 +483,17 @@ class MsgQ:
|
|
|
sock.setblocking(0)
|
|
|
return sock.send(data)
|
|
|
except socket.error as e:
|
|
|
- if e.errno in [ errno.EAGAIN,
|
|
|
- errno.EWOULDBLOCK,
|
|
|
- errno.EINTR ]:
|
|
|
+ if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR ]:
|
|
|
return 0
|
|
|
- elif e.errno in [ errno.EPIPE,
|
|
|
- errno.ECONNRESET,
|
|
|
- errno.ENOBUFS ]:
|
|
|
- logger.error(MSGQ_SEND_ERR, sock.fileno(),
|
|
|
- errno.errorcode[e.errno])
|
|
|
+ elif e.errno in [ errno.EPIPE, errno.ECONNRESET, errno.ENOBUFS ]:
|
|
|
+ # EPIPE happens if the remote module has terminated by the time
|
|
|
+ # of this send; its severity can vary, but in many cases it
|
|
|
+ # shouldn't be critical, so we log it separately as a warning.
|
|
|
+ if e.errno == errno.EPIPE:
|
|
|
+ logger.warn(MSGQ_CLOSE_ON_SEND, sock.fileno())
|
|
|
+ else:
|
|
|
+ logger.error(MSGQ_SEND_ERR, sock.fileno(),
|
|
|
+ errno.errorcode[e.errno])
|
|
|
self.kill_socket(sock.fileno(), sock)
|
|
|
return None
|
|
|
else:
|
|
@@ -469,7 +508,7 @@ class MsgQ:
|
|
|
if fileno in self.sendbuffs:
|
|
|
amount_sent = 0
|
|
|
else:
|
|
|
- amount_sent = self.__send_data(sock, msg)
|
|
|
+ amount_sent = self._send_data(sock, msg)
|
|
|
if amount_sent is None:
|
|
|
# Socket has been killed, drop the send
|
|
|
return
|
|
@@ -489,7 +528,7 @@ class MsgQ:
|
|
|
last_sent = now
|
|
|
if self.poller:
|
|
|
self.poller.register(fileno, select.POLLIN |
|
|
|
- select.POLLOUT)
|
|
|
+ select.POLLOUT)
|
|
|
else:
|
|
|
self.add_kqueue_socket(sock, True)
|
|
|
self.sendbuffs[fileno] = (last_sent, buff)
|
|
@@ -498,7 +537,7 @@ class MsgQ:
|
|
|
# Try to send some data from the buffer
|
|
|
(_, msg) = self.sendbuffs[fileno]
|
|
|
sock = self.sockets[fileno]
|
|
|
- amount_sent = self.__send_data(sock, msg)
|
|
|
+ amount_sent = self._send_data(sock, msg)
|
|
|
if amount_sent is not None:
|
|
|
# Keep the rest
|
|
|
msg = msg[amount_sent:]
|
|
@@ -592,12 +631,17 @@ class MsgQ:
|
|
|
self.running = False
|
|
|
break
|
|
|
else:
|
|
|
- if event & select.POLLOUT:
|
|
|
- self.__process_write(fd)
|
|
|
- elif event & select.POLLIN:
|
|
|
- self.process_socket(fd)
|
|
|
- else:
|
|
|
+ writable = event & select.POLLOUT
|
|
|
+ # Note: it may be okay to read data if available
|
|
|
+ # immediately after write some, but due to unexpected
|
|
|
+ # regression (see comments on the kqueue version below)
|
|
|
+ # we restrict one operation per iteration for now.
|
|
|
+ # In future we may clarify the point and enable the
|
|
|
+ # "read/write" mode.
|
|
|
+ readable = not writable and (event & select.POLLIN)
|
|
|
+ if not writable and not readable:
|
|
|
logger.error(MSGQ_POLL_UNKNOWN_EVENT, fd, event)
|
|
|
+ self._process_fd(fd, writable, readable, False)
|
|
|
|
|
|
def run_kqueue(self):
|
|
|
while self.running:
|
|
@@ -616,14 +660,35 @@ class MsgQ:
|
|
|
self.running = False
|
|
|
break;
|
|
|
else:
|
|
|
- if event.filter == select.KQ_FILTER_WRITE:
|
|
|
- self.__process_write(event.ident)
|
|
|
- if event.filter == select.KQ_FILTER_READ and \
|
|
|
- event.data > 0:
|
|
|
- self.process_socket(event.ident)
|
|
|
- elif event.flags & select.KQ_EV_EOF:
|
|
|
- self.kill_socket(event.ident,
|
|
|
- self.sockets[event.ident])
|
|
|
+ fd = event.ident
|
|
|
+ writable = event.filter == select.KQ_FILTER_WRITE
|
|
|
+ readable = (event.filter == select.KQ_FILTER_READ and
|
|
|
+ event.data > 0)
|
|
|
+ # It seems to break some of our test cases if we
|
|
|
+ # immediately close the socket on EOF after reading
|
|
|
+ # some data. It may be possible to avoid by tweaking
|
|
|
+ # the test, but unless we can be sure we'll hold off.
|
|
|
+ closed = (not readable and
|
|
|
+ (event.flags & select.KQ_EV_EOF))
|
|
|
+ self._process_fd(fd, writable, readable, closed)
|
|
|
+
|
|
|
+ def _process_fd(self, fd, writable, readable, closed):
|
|
|
+ '''Process a single FD: unified subroutine of run_kqueue/poller.
|
|
|
+
|
|
|
+ closed can be True only in the case of kqueue. This is essentially
|
|
|
+ private but is defined as if it were "protected" so it's callable
|
|
|
+ from tests.
|
|
|
+
|
|
|
+ '''
|
|
|
+ # We need to check if FD is still in the sockets dict, because
|
|
|
+ # it's possible that the socket has been "killed" while processing
|
|
|
+ # other FDs; it's even possible it's killed within this method.
|
|
|
+ if writable and fd in self.sockets:
|
|
|
+ self.__process_write(fd)
|
|
|
+ if readable and fd in self.sockets:
|
|
|
+ self.process_packet(fd, self.sockets[fd])
|
|
|
+ if closed and fd in self.sockets:
|
|
|
+ self.kill_socket(fd, self.sockets[fd])
|
|
|
|
|
|
def stop(self):
|
|
|
# Signal it should terminate.
|
|
@@ -760,3 +825,5 @@ if __name__ == "__main__":
|
|
|
pass
|
|
|
|
|
|
msgq.shutdown()
|
|
|
+
|
|
|
+ logger.info(MSGQ_EXITING)
|