|
@@ -340,6 +340,17 @@ class MsgQ:
|
|
self.send_prepared_msg(sock, self.preparemsg(env, msg))
|
|
self.send_prepared_msg(sock, self.preparemsg(env, msg))
|
|
|
|
|
|
def __send_data(self, sock, data):
|
|
def __send_data(self, sock, data):
|
|
|
|
+ """
|
|
|
|
+ Send a piece of data to the given socket.
|
|
|
|
+ Parameters:
|
|
|
|
+ sock: The socket to send to
|
|
|
|
+ data: The list of bytes to send
|
|
|
|
+ Returns:
|
|
|
|
+ An integer or None. If an integer (which can be 0), it signals
|
|
|
|
+ the number of bytes sent. If None, the socket appears to have
|
|
|
|
+ been closed on the other end, and it has been killed on this
|
|
|
|
+ side too.
|
|
|
|
+ """
|
|
try:
|
|
try:
|
|
# We set the socket nonblocking, MSG_DONTWAIT doesn't exist
|
|
# We set the socket nonblocking, MSG_DONTWAIT doesn't exist
|
|
# on some OSes
|
|
# on some OSes
|
|
@@ -350,6 +361,13 @@ class MsgQ:
|
|
errno.EWOULDBLOCK,
|
|
errno.EWOULDBLOCK,
|
|
errno.EINTR ]:
|
|
errno.EINTR ]:
|
|
return 0
|
|
return 0
|
|
|
|
+ elif e.errno in [ errno.EPIPE,
|
|
|
|
+ errno.ECONNRESET,
|
|
|
|
+ errno.ENOBUFS ]:
|
|
|
|
+ print("[b10-msgq] " + errno.errorcode[e.errno] +
|
|
|
|
+ " on send, dropping message and closing connection")
|
|
|
|
+ self.kill_socket(sock.fileno(), sock)
|
|
|
|
+ return None
|
|
else:
|
|
else:
|
|
raise e
|
|
raise e
|
|
finally:
|
|
finally:
|
|
@@ -362,23 +380,12 @@ class MsgQ:
|
|
if fileno in self.sendbuffs:
|
|
if fileno in self.sendbuffs:
|
|
amount_sent = 0
|
|
amount_sent = 0
|
|
else:
|
|
else:
|
|
- try:
|
|
|
|
- amount_sent = self.__send_data(sock, msg)
|
|
|
|
- except socket.error as sockerr:
|
|
|
|
- # in the case the other side seems gone, or unable to handle
|
|
|
|
- # life, kill the socket and drop the send action
|
|
|
|
- if sockerr.errno in [ errno.EPIPE,
|
|
|
|
- errno.ECONNRESET,
|
|
|
|
- errno.ENOBUFS
|
|
|
|
- ]:
|
|
|
|
- print("[b10-msgq] " + errno.errorcode[sockerr.errno] +
|
|
|
|
- " on send, dropping message and closing connection")
|
|
|
|
- self.kill_socket(fileno, sock)
|
|
|
|
- return
|
|
|
|
- else:
|
|
|
|
- raise
|
|
|
|
|
|
+ amount_sent = self.__send_data(sock, msg)
|
|
|
|
+ if amount_sent is None:
|
|
|
|
+ # Socket has been killed, drop the send
|
|
|
|
+ return
|
|
|
|
|
|
- # Still something to send
|
|
|
|
|
|
+ # Still something to send, add it to outgoing queue
|
|
if amount_sent < len(msg):
|
|
if amount_sent < len(msg):
|
|
now = time.clock()
|
|
now = time.clock()
|
|
# Append it to buffer (but check the data go away)
|
|
# Append it to buffer (but check the data go away)
|
|
@@ -402,32 +409,19 @@ class MsgQ:
|
|
# Try to send some data from the buffer
|
|
# Try to send some data from the buffer
|
|
(_, msg) = self.sendbuffs[fileno]
|
|
(_, msg) = self.sendbuffs[fileno]
|
|
sock = self.sockets[fileno]
|
|
sock = self.sockets[fileno]
|
|
- try:
|
|
|
|
- amount_sent = self.__send_data(sock, msg)
|
|
|
|
- except socket.error as sockerr:
|
|
|
|
- # in the case the other side seems gone, or unable to handle
|
|
|
|
- # life, kill the socket and drop the send action
|
|
|
|
- if sockerr.errno in [ errno.EPIPE,
|
|
|
|
- errno.ECONNRESET,
|
|
|
|
- errno.ENOBUFS
|
|
|
|
- ]:
|
|
|
|
- print("[b10-msgq] " + errno.errorcode[sockerr.errno] +
|
|
|
|
- " on send, dropping message and closing connection")
|
|
|
|
- self.kill_socket(fileno, sock)
|
|
|
|
- return
|
|
|
|
- else:
|
|
|
|
- raise
|
|
|
|
- # Keep the rest
|
|
|
|
- msg = msg[amount_sent:]
|
|
|
|
- if len(msg) == 0:
|
|
|
|
- # If there's no more, stop requesting for write availability
|
|
|
|
- if self.poller:
|
|
|
|
- self.poller.register(fileno, select.POLLIN)
|
|
|
|
|
|
+ amount_sent = self.__send_data(sock, msg)
|
|
|
|
+ if amount_sent is not None:
|
|
|
|
+ # Keep the rest
|
|
|
|
+ msg = msg[amount_sent:]
|
|
|
|
+ if len(msg) == 0:
|
|
|
|
+ # If there's no more, stop requesting for write availability
|
|
|
|
+ if self.poller:
|
|
|
|
+ self.poller.register(fileno, select.POLLIN)
|
|
|
|
+ else:
|
|
|
|
+ self.delete_kqueue_socket(sock, True)
|
|
|
|
+ del self.sendbuffs[fileno]
|
|
else:
|
|
else:
|
|
- self.delete_kqueue_socket(sock, True)
|
|
|
|
- del self.sendbuffs[fileno]
|
|
|
|
- else:
|
|
|
|
- self.sendbuffs[fileno] = (time.clock(), msg)
|
|
|
|
|
|
+ self.sendbuffs[fileno] = (time.clock(), msg)
|
|
|
|
|
|
def newlname(self):
|
|
def newlname(self):
|
|
"""Generate a unique connection identifier for this socket.
|
|
"""Generate a unique connection identifier for this socket.
|