|
@@ -81,6 +81,7 @@ class MsgQ:
|
|
|
self.verbose = True
|
|
|
self.c_channel_port = c_channel_port
|
|
|
self.poller = None
|
|
|
+ self.kqueue = None
|
|
|
self.runnable = False
|
|
|
self.listen_socket = False
|
|
|
self.sockets = {}
|
|
@@ -91,7 +92,17 @@ class MsgQ:
|
|
|
|
|
|
def setup_poller(self):
|
|
|
"""Set up the poll thing. Internal function."""
|
|
|
- self.poller = select.poll()
|
|
|
+ try:
|
|
|
+ self.poller = select.poll()
|
|
|
+ except AttributeError:
|
|
|
+ self.kqueue = select.kqueue()
|
|
|
+
|
|
|
+ def add_kqueue_socket(self, socket):
|
|
|
+ event = select.kevent(socket.fileno(),
|
|
|
+ select.KQ_FILTER_READ,
|
|
|
+ select.KQ_EV_ADD | select.KQ_EV_ENABLE)
|
|
|
+ self.kqueue.control([event], 0)
|
|
|
+
|
|
|
|
|
|
def setup_listener(self):
|
|
|
"""Set up the listener socket. Internal function."""
|
|
@@ -100,7 +111,10 @@ class MsgQ:
|
|
|
self.listen_socket.bind(("127.0.0.1", self.c_channel_port))
|
|
|
self.listen_socket.listen(1024)
|
|
|
|
|
|
- self.poller.register(self.listen_socket, select.POLLIN)
|
|
|
+ if self.poller:
|
|
|
+ self.poller.register(self.listen_socket, select.POLLIN)
|
|
|
+ else:
|
|
|
+ self.add_kqueue_socket(self.listen_socket)
|
|
|
|
|
|
def setup(self):
|
|
|
"""Configure listener socket, polling, etc."""
|
|
@@ -120,7 +134,11 @@ class MsgQ:
|
|
|
self.sockets[newsocket.fileno()] = newsocket
|
|
|
lname = self.newlname()
|
|
|
self.lnames[lname] = newsocket
|
|
|
- self.poller.register(newsocket, select.POLLIN)
|
|
|
+
|
|
|
+ if self.poller:
|
|
|
+ self.poller.register(newsocket, select.POLLIN)
|
|
|
+ else:
|
|
|
+ self.add_kqueue_socket(newsocket)
|
|
|
|
|
|
def process_socket(self, fd):
|
|
|
"""Process a read on a socket."""
|
|
@@ -133,7 +151,8 @@ class MsgQ:
|
|
|
|
|
|
def kill_socket(self, fd, sock):
|
|
|
"""Fully close down the socket."""
|
|
|
- self.poller.unregister(sock)
|
|
|
+ if self.poller:
|
|
|
+ self.poller.unregister(sock)
|
|
|
self.subs.unsubscribe_all(sock)
|
|
|
lname = [ k for k, v in self.lnames.items() if v == sock ][0]
|
|
|
del self.lnames[lname]
|
|
@@ -280,6 +299,13 @@ class MsgQ:
|
|
|
|
|
|
def run(self):
|
|
|
"""Process messages. Forever. Mostly."""
|
|
|
+
|
|
|
+ if self.poller:
|
|
|
+ self.run_poller()
|
|
|
+ else:
|
|
|
+ self.run_kqueue()
|
|
|
+
|
|
|
+ def run_poller(self):
|
|
|
while True:
|
|
|
try:
|
|
|
events = self.poller.poll()
|
|
@@ -295,6 +321,21 @@ class MsgQ:
|
|
|
else:
|
|
|
self.process_socket(fd)
|
|
|
|
|
|
+ def run_kqueue(self):
|
|
|
+ while True:
|
|
|
+ events = self.kqueue.control(None, 10)
|
|
|
+ if not events:
|
|
|
+ raise RuntimeError('serve: kqueue returned no events')
|
|
|
+
|
|
|
+ for event in events:
|
|
|
+ if event.ident == self.listen_socket.fileno():
|
|
|
+ self.process_accept()
|
|
|
+ else:
|
|
|
+ if event.flags & select.KQ_FILTER_READ and event.data > 0:
|
|
|
+ self.process_socket(event.ident)
|
|
|
+ elif event.flags & select.KQ_EV_EOF:
|
|
|
+ self.kill_socket(event.ident, self.sockets[event.ident])
|
|
|
+
|
|
|
def shutdown(self):
|
|
|
"""Stop the MsgQ master."""
|
|
|
if self.verbose:
|