|
@@ -497,7 +497,7 @@ class MsgQ:
|
|
|
last_sent = now
|
|
|
if self.poller:
|
|
|
self.poller.register(fileno, select.POLLIN |
|
|
|
- select.POLLOUT)
|
|
|
+ select.POLLOUT)
|
|
|
else:
|
|
|
self.add_kqueue_socket(sock, True)
|
|
|
self.sendbuffs[fileno] = (last_sent, buff)
|
|
@@ -600,12 +600,11 @@ class MsgQ:
|
|
|
self.running = False
|
|
|
break
|
|
|
else:
|
|
|
- if event & select.POLLOUT:
|
|
|
- self.__process_write(fd)
|
|
|
- elif event & select.POLLIN:
|
|
|
- self.process_socket(fd)
|
|
|
- else:
|
|
|
+ writable = event & select.POLLOUT
|
|
|
+ readable = not writable and (event & select.POLLIN)
|
|
|
+ if not writable and not readable:
|
|
|
logger.error(MSGQ_POLL_UNKNOWN_EVENT, fd, event)
|
|
|
+ self._process_fd(fd, writable, readable, False)
|
|
|
|
|
|
def run_kqueue(self):
|
|
|
while self.running:
|
|
@@ -624,14 +623,28 @@ class MsgQ:
|
|
|
self.running = False
|
|
|
break;
|
|
|
else:
|
|
|
- if event.filter == select.KQ_FILTER_WRITE:
|
|
|
- self.__process_write(event.ident)
|
|
|
- if event.filter == select.KQ_FILTER_READ and \
|
|
|
- event.data > 0:
|
|
|
- self.process_socket(event.ident)
|
|
|
- elif event.flags & select.KQ_EV_EOF:
|
|
|
- self.kill_socket(event.ident,
|
|
|
- self.sockets[event.ident])
|
|
|
+ fd = event.ident
|
|
|
+ writable = event.filter == select.KQ_FILTER_WRITE
|
|
|
+ readable = (event.filter == select.KQ_FILTER_READ and
|
|
|
+ event.data > 0)
|
|
|
+ closed = (not readable and
|
|
|
+ (event.flags & select.KQ_EV_EOF))
|
|
|
+ self._process_fd(fd, writable, readable, closed)
|
|
|
+
|
|
|
+ def _process_fd(self, fd, writable, readable, closed):
|
|
|
+ '''Process a single FD: unified subroutine of run_kqueue/poller.
|
|
|
+
|
|
|
+ closed can be True only in the case of kqueue. This is essentially
|
|
|
+ private but is defined as if it were "protected" so it's callable
|
|
|
+ from tests.
|
|
|
+
|
|
|
+ '''
|
|
|
+ if writable:
|
|
|
+ self.__process_write(fd)
|
|
|
+ if readable:
|
|
|
+ self.process_socket(fd)
|
|
|
+ if closed:
|
|
|
+ self.kill_socket(fd, self.sockets[fd])
|
|
|
|
|
|
def stop(self):
|
|
|
# Signal it should terminate.
|