|
@@ -136,14 +136,29 @@ class MsgQ:
|
|
|
except AttributeError:
|
|
|
self.kqueue = select.kqueue()
|
|
|
|
|
|
- def add_kqueue_socket(self, socket, enable_write = False):
|
|
|
- filters = select.KQ_FILTER_READ
|
|
|
- if enable_write:
|
|
|
- filters |= select.KQ_FILTER_WRITE
|
|
|
- event = select.kevent(socket.fileno(), filters,
|
|
|
+ def add_kqueue_socket(self, socket, write_filter=False):
|
|
|
+ """Add a kquque filter for a socket. By default the read
|
|
|
+ filter is used; if write_filter is set to True, the write
|
|
|
+ filter is used. We use a boolean value instead of a specific
|
|
|
+ filter constant, because kqueue filter values do not seem to
|
|
|
+ be defined on some systems. The use of boolean makes the
|
|
|
+ interface restrictive because there are other filters, but this
|
|
|
+ method is mostly only for our internal use, so it should be
|
|
|
+ acceptable at least for now."""
|
|
|
+ filter_type = select.KQ_FILTER_WRITE if write_filter else \
|
|
|
+ select.KQ_FILTER_READ
|
|
|
+ event = select.kevent(socket.fileno(), filter_type,
|
|
|
select.KQ_EV_ADD | select.KQ_EV_ENABLE)
|
|
|
self.kqueue.control([event], 0)
|
|
|
|
|
|
+ def delete_kqueue_socket(self, socket, write_filter=False):
|
|
|
+ """Delete a kqueue filter for socket. See add_kqueue_socket()
|
|
|
+ for the semantics and notes about write_filter."""
|
|
|
+ filter_type = select.KQ_FILTER_WRITE if write_filter else \
|
|
|
+ select.KQ_FILTER_READ
|
|
|
+ event = select.kevent(socket.fileno(), filter_type,
|
|
|
+ select.KQ_EV_DELETE)
|
|
|
+ self.kqueue.control([event], 0)
|
|
|
|
|
|
def setup_listener(self):
|
|
|
"""Set up the listener socket. Internal function."""
|
|
@@ -376,7 +391,7 @@ class MsgQ:
|
|
|
if self.poller:
|
|
|
self.poller.register(fileno, select.POLLIN)
|
|
|
else:
|
|
|
- self.add_kqueue_socket(sock)
|
|
|
+ self.delete_kqueue_socket(sock, True)
|
|
|
del self.sendbuffs[fileno]
|
|
|
else:
|
|
|
self.sendbuffs[fileno] = (time.clock(), msg)
|
|
@@ -468,12 +483,14 @@ class MsgQ:
|
|
|
if event.ident == self.listen_socket.fileno():
|
|
|
self.process_accept()
|
|
|
else:
|
|
|
- if event.flags & select.KQ_FILTER_WRITE:
|
|
|
- self.process_socket(event.ident)
|
|
|
- if event.flags & select.KQ_FILTER_READ and event.data > 0:
|
|
|
+ if event.filter == select.KQ_FILTER_WRITE:
|
|
|
+ self.__process_write(event.ident)
|
|
|
+ if event.filter == select.KQ_FILTER_READ and \
|
|
|
+ event.data > 0:
|
|
|
self.process_socket(event.ident)
|
|
|
elif event.flags & select.KQ_EV_EOF:
|
|
|
- self.kill_socket(event.ident, self.sockets[event.ident])
|
|
|
+ self.kill_socket(event.ident,
|
|
|
+ self.sockets[event.ident])
|
|
|
|
|
|
def shutdown(self):
|
|
|
"""Stop the MsgQ master."""
|