|
@@ -182,8 +182,6 @@ class MsgQ:
|
|
|
self.socket_file = socket_file
|
|
|
|
|
|
self.verbose = verbose
|
|
|
- self.poller = None
|
|
|
- self.kqueue = None
|
|
|
self.runnable = False
|
|
|
self.listen_socket = False
|
|
|
self.sockets = {}
|
|
@@ -264,37 +262,6 @@ class MsgQ:
|
|
|
self.__cfgmgr_ready_cond.wait()
|
|
|
return self.__cfgmgr_ready
|
|
|
|
|
|
- def setup_poller(self):
|
|
|
- """Set up the poll thing. Internal function."""
|
|
|
- try:
|
|
|
- self.kqueue = select.kqueue()
|
|
|
- except AttributeError:
|
|
|
- self.poller = select.poll()
|
|
|
-
|
|
|
- def add_kqueue_socket(self, socket, write_filter=False):
|
|
|
- """Add a kqueue filter for a socket. By default the read
|
|
|
- filter is used; if write_filter is set to True, the write
|
|
|
- filter is used. We use a boolean value instead of a specific
|
|
|
- filter constant, because kqueue filter values do not seem to
|
|
|
- be defined on some systems. The use of boolean makes the
|
|
|
- interface restrictive because there are other filters, but this
|
|
|
- method is mostly only for our internal use, so it should be
|
|
|
- acceptable at least for now."""
|
|
|
- filter_type = select.KQ_FILTER_WRITE if write_filter else \
|
|
|
- select.KQ_FILTER_READ
|
|
|
- event = select.kevent(socket.fileno(), filter_type,
|
|
|
- select.KQ_EV_ADD | select.KQ_EV_ENABLE)
|
|
|
- self.kqueue.control([event], 0)
|
|
|
-
|
|
|
- def delete_kqueue_socket(self, socket, write_filter=False):
|
|
|
- """Delete a kqueue filter for socket. See add_kqueue_socket()
|
|
|
- for the semantics and notes about write_filter."""
|
|
|
- filter_type = select.KQ_FILTER_WRITE if write_filter else \
|
|
|
- select.KQ_FILTER_READ
|
|
|
- event = select.kevent(socket.fileno(), filter_type,
|
|
|
- select.KQ_EV_DELETE)
|
|
|
- self.kqueue.control([event], 0)
|
|
|
-
|
|
|
def setup_listener(self):
|
|
|
"""Set up the listener socket. Internal function."""
|
|
|
logger.debug(TRACE_BASIC, MSGQ_LISTENER_SETUP, self.socket_file)
|
|
@@ -315,11 +282,6 @@ class MsgQ:
|
|
|
logger.fatal(MSGQ_LISTENER_FAILED, self.socket_file, e)
|
|
|
raise e
|
|
|
|
|
|
- if self.poller:
|
|
|
- self.poller.register(self.listen_socket, select.POLLIN)
|
|
|
- else:
|
|
|
- self.add_kqueue_socket(self.listen_socket)
|
|
|
-
|
|
|
def setup_signalsock(self):
|
|
|
"""Create a socket pair used to signal when we want to finish.
|
|
|
Using a socket is easy and thread/signal safe way to signal
|
|
@@ -329,18 +291,12 @@ class MsgQ:
|
|
|
# closed, we should shut down.
|
|
|
(self.__poller_sock, self.__control_sock) = socket.socketpair()
|
|
|
|
|
|
- if self.poller:
|
|
|
- self.poller.register(self.__poller_sock, select.POLLIN)
|
|
|
- else:
|
|
|
- self.add_kqueue_socket(self.__poller_sock)
|
|
|
-
|
|
|
def setup(self):
|
|
|
"""Configure listener socket, polling, etc.
|
|
|
Raises a socket.error if the socket_file cannot be
|
|
|
created.
|
|
|
"""
|
|
|
|
|
|
- self.setup_poller()
|
|
|
self.setup_signalsock()
|
|
|
self.setup_listener()
|
|
|
|
|
@@ -369,20 +325,10 @@ class MsgQ:
|
|
|
logger.debug(TRACE_BASIC, MSGQ_SOCKET_REGISTERED, newsocket.fileno(),
|
|
|
lname)
|
|
|
|
|
|
- if self.poller:
|
|
|
- self.poller.register(newsocket, select.POLLIN)
|
|
|
- else:
|
|
|
- self.add_kqueue_socket(newsocket)
|
|
|
-
|
|
|
self.members_notify('connected', {'client': lname})
|
|
|
|
|
|
def kill_socket(self, fd, sock):
|
|
|
"""Fully close down the socket."""
|
|
|
- # Unregister events on the socket. Note that we don't have to do
|
|
|
- # this for kqueue because the registered events are automatically
|
|
|
- # deleted when the corresponding socket is closed.
|
|
|
- if self.poller:
|
|
|
- self.poller.unregister(sock)
|
|
|
|
|
|
unsubscribed_from = self.subs.unsubscribe_all(sock)
|
|
|
lname = self.fd_to_lname[fd]
|
|
@@ -584,11 +530,6 @@ class MsgQ:
|
|
|
else:
|
|
|
buff = msg[amount_sent:]
|
|
|
last_sent = now
|
|
|
- if self.poller:
|
|
|
- self.poller.register(fileno, select.POLLIN |
|
|
|
- select.POLLOUT)
|
|
|
- else:
|
|
|
- self.add_kqueue_socket(sock, True)
|
|
|
self.sendbuffs[fileno] = (last_sent, buff)
|
|
|
return True
|
|
|
|
|
@@ -602,10 +543,6 @@ class MsgQ:
|
|
|
msg = msg[amount_sent:]
|
|
|
if len(msg) == 0:
|
|
|
# If there's no more, stop requesting for write availability
|
|
|
- if self.poller:
|
|
|
- self.poller.register(fileno, select.POLLIN)
|
|
|
- else:
|
|
|
- self.delete_kqueue_socket(sock, True)
|
|
|
del self.sendbuffs[fileno]
|
|
|
else:
|
|
|
self.sendbuffs[fileno] = (time.clock(), msg)
|
|
@@ -753,89 +690,9 @@ class MsgQ:
|
|
|
self.running = False
|
|
|
break
|
|
|
else:
|
|
|
- self._process_fd(fd, False, True, False)
|
|
|
+ self.process_packet(fd, self.sockets[fd])
|
|
|
for fd in write_ready:
|
|
|
- self._process_fd(fd, True, False, False)
|
|
|
-
|
|
|
- def run_poller(self):
|
|
|
- while self.running:
|
|
|
- try:
|
|
|
- # Poll with a timeout so that every once in a while,
|
|
|
- # the loop checks for self.running.
|
|
|
- events = self.poller.poll()
|
|
|
- except select.error as err:
|
|
|
- if err.args[0] == errno.EINTR:
|
|
|
- events = []
|
|
|
- else:
|
|
|
- logger.fatal(MSGQ_POLL_ERROR, err)
|
|
|
- break
|
|
|
- with self.__lock:
|
|
|
- for (fd, event) in events:
|
|
|
- if fd == self.listen_socket.fileno():
|
|
|
- self.process_accept()
|
|
|
- elif fd == self.__poller_sock.fileno():
|
|
|
- # If it's the signal socket, we should terminate now.
|
|
|
- self.running = False
|
|
|
- break
|
|
|
- else:
|
|
|
- writable = event & select.POLLOUT
|
|
|
- # Note: it may be okay to read data if available
|
|
|
- # immediately after write some, but due to unexpected
|
|
|
- # regression (see comments on the kqueue version below)
|
|
|
- # we restrict one operation per iteration for now.
|
|
|
- # In future we may clarify the point and enable the
|
|
|
- # "read/write" mode.
|
|
|
- readable = not writable and (event & select.POLLIN)
|
|
|
- if not writable and not readable:
|
|
|
- logger.error(MSGQ_POLL_UNKNOWN_EVENT, fd, event)
|
|
|
- self._process_fd(fd, writable, readable, False)
|
|
|
-
|
|
|
- def run_kqueue(self):
|
|
|
- while self.running:
|
|
|
- # Check with a timeout so that every once in a while,
|
|
|
- # the loop checks for self.running.
|
|
|
- events = self.kqueue.control(None, 10)
|
|
|
- if not events:
|
|
|
- raise RuntimeError('serve: kqueue returned no events')
|
|
|
-
|
|
|
- with self.__lock:
|
|
|
- for event in events:
|
|
|
- if event.ident == self.listen_socket.fileno():
|
|
|
- self.process_accept()
|
|
|
- elif event.ident == self.__poller_sock.fileno():
|
|
|
- # If it's the signal socket, we should terminate now.
|
|
|
- self.running = False
|
|
|
- break;
|
|
|
- else:
|
|
|
- fd = event.ident
|
|
|
- writable = event.filter == select.KQ_FILTER_WRITE
|
|
|
- readable = (event.filter == select.KQ_FILTER_READ and
|
|
|
- event.data > 0)
|
|
|
- # It seems to break some of our test cases if we
|
|
|
- # immediately close the socket on EOF after reading
|
|
|
- # some data. It may be possible to avoid by tweaking
|
|
|
- # the test, but unless we can be sure we'll hold off.
|
|
|
- closed = (not readable and
|
|
|
- (event.flags & select.KQ_EV_EOF))
|
|
|
- self._process_fd(fd, writable, readable, closed)
|
|
|
-
|
|
|
- def _process_fd(self, fd, writable, readable, closed):
|
|
|
- '''Process a single FD: unified subroutine of run_kqueue/poller.
|
|
|
-
|
|
|
- closed can be True only in the case of kqueue. This is essentially
|
|
|
- private but is defined as if it were "protected" so it's callable
|
|
|
- from tests.
|
|
|
-
|
|
|
- '''
|
|
|
- # We need to check if FD is still in the sockets dict, because
|
|
|
- # it's possible that the socket has been "killed" while processing
|
|
|
- # other FDs; it's even possible it's killed within this method.
|
|
|
- if writable and fd in self.sockets:
|
|
|
- self.__process_write(fd)
|
|
|
- if readable and fd in self.sockets:
|
|
|
- self.process_packet(fd, self.sockets[fd])
|
|
|
- if closed and fd in self.sockets:
|
|
|
- self.kill_socket(fd, self.sockets[fd])
|
|
|
+ self.__process_write(fd)
|
|
|
|
|
|
def stop(self):
|
|
|
# Signal it should terminate.
|