Browse Source

Send buffered data

Michal 'vorner' Vaner 14 years ago
parent
commit
43adf3a50a
1 changed files with 47 additions and 6 deletions
  1. 47 6
      src/bin/msgq/msgq.py.in

+ 47 - 6
src/bin/msgq/msgq.py.in

@@ -136,9 +136,9 @@ class MsgQ:
         except AttributeError:
             self.kqueue = select.kqueue()
     
-    def add_kqueue_socket(self, socket):
+    def add_kqueue_socket(self, socket, add_filter = 0):
         event = select.kevent(socket.fileno(),
-                              select.KQ_FILTER_READ,
+                              select.KQ_FILTER_READ | add_filter,
                               select.KQ_EV_ADD | select.KQ_EV_ENABLE)
         self.kqueue.control([event], 0)
 
@@ -325,7 +325,13 @@ class MsgQ:
         if fileno in self.sendbuffs:
             amount_sent = 0
         else:
-            amount_sent = sock.send(msg)
+            try:
+                amount_sent = sock.send(msg, socket.MSG_DONTWAIT)
+            except socket.error as e:
+                if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK:
+                    amount_sent = 0
+                else:
+                    raise e
 
         # Still something to send
         if amount_sent < len(msg):
@@ -340,8 +346,38 @@ class MsgQ:
             else:
                 buff = msg
                 last_sent = now
-                # TODO Register
-            self.sendbuffs[fd] = (last_sent, buff)
+                if self.poller:
+                    try:
+                        self.poller.register(fileno, select.POLLIN |
+                            select.POLLOUT)
+                    except Exception as e:
+                        raise e
+                else:
+                    self.add_kqueue_socket(fileno, select.KQ_FILTER_WRITE)
+            self.sendbuffs[fileno] = (last_sent, buff)
+
+    def process_write(self, fileno):
+        # Try to send some data from the buffer
+        (_, msg) = self.sendbuffs[fileno]
+        sock = self.sockets[fileno]
+        try:
+            amount_sent = sock.send(msg, socket.MSG_DONTWAIT)
+        except socket.error as e:
+            if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK:
+                amount_sent = 0
+            else:
+                raise e
+        # Keep the rest
+        msg = msg[amount_sent:]
+        if len(msg) == 0:
+            # If there's no more, stop requesting for write availability
+            if self.poller:
+                self.poller.register(fileno, select.POLLIN)
+            else:
+                self.add_kqueue_socket(fileno)
+            del self.sendbuffs[fileno]
+        else:
+            self.sendbuffs[fileno] = (time.clock(), msg)
 
     def newlname(self):
         """Generate a unique connection identifier for this socket.
@@ -415,7 +451,10 @@ class MsgQ:
                 if fd == self.listen_socket.fileno():
                     self.process_accept()
                 else:
-                    self.process_socket(fd)
+                    if event & select.POLLOUT:
+                        self.process_write(fd)
+                    if event & select.POLLIN:
+                        self.process_socket(fd)
 
     def run_kqueue(self):
         while True:
@@ -427,6 +466,8 @@ class MsgQ:
                 if event.ident == self.listen_socket.fileno():
                     self.process_accept()
                 else:
+                    if event.flags & select.KQ_FILTER_WRITE:
+                        self.process_socket(event.ident)
                     if event.flags & select.KQ_FILTER_READ and event.data > 0:
                         self.process_socket(event.ident)
                     elif event.flags & select.KQ_EV_EOF: