Browse Source

Buffer management

Also, fix a small bug that could lead to resource leak (but probably
wouldn't)
Michal 'vorner' Vaner 14 years ago
parent
commit
7a8bfbdd24
2 changed files with 29 additions and 5 deletions
  1. 28 4
      src/bin/msgq/msgq.py.in
  2. 1 1
      src/bin/msgq/tests/msgq_test.py

+ 28 - 4
src/bin/msgq/msgq.py.in

@@ -127,6 +127,7 @@ class MsgQ:
         self.hostname = socket.gethostname()
         self.subs = SubscriptionManager()
         self.lnames = {}
+        self.sendbuffs = {}
 
     def setup_poller(self):
         """Set up the poll thing.  Internal function."""
@@ -204,10 +205,10 @@ class MsgQ:
 
     def process_socket(self, fd):
         """Process a read on a socket."""
-        sock = self.sockets[fd]
-        if sock == None:
+        if not fd in self.sockets:
             sys.stderr.write("[b10-msgq] Got read on Strange Socket fd %d\n" % fd)
             return
+        sock = self.sockets[fd]
 #        sys.stderr.write("[b10-msgq] Got read on fd %d\n" %fd)
         self.process_packet(fd, sock)
 
@@ -219,7 +220,9 @@ class MsgQ:
         lname = [ k for k, v in self.lnames.items() if v == sock ][0]
         del self.lnames[lname]
         sock.close()
-        self.sockets[fd] = None
+        del self.sockets[fd]
+        if fd in self.sendbuffs:
+            del self.sendbuffs[fd]
         sys.stderr.write("[b10-msgq] Closing socket fd %d\n" % fd)
 
     def getbytes(self, fd, sock, length):
@@ -317,7 +320,28 @@ class MsgQ:
         sock.send(self.preparemsg(env, msg))
 
     def send_prepared_msg(self, sock, msg):
-        sock.send(msg)
+        # Try to send the data, but only if there's nothing waiting
+        fileno = sock.fileno()
+        if fileno in self.sendbuffs:
+            amount_sent = 0
+        else:
+            amount_sent = sock.send(msg)
+
+        # Still something to send
+        if amount_sent < len(msg):
+            now = time.clock()
+            # Append it to buffer (but check the data go away)
+            if fileno in self.sendbuffs:
+                (last_sent, buff) = self.sendbuffs[fileno]
+                if now - last_sent > 0.1:
+                    self.kill_socket(fileno, sock)
+                    return
+                buff += msg[amount_sent:]
+            else:
+                buff = msg
+                last_sent = now
+                # TODO Register
+            self.sendbuffs[fd] = (last_sent, buff)
 
     def newlname(self):
         """Generate a unique connection identifier for this socket.

+ 1 - 1
src/bin/msgq/tests/msgq_test.py

@@ -194,7 +194,7 @@ class SendNonblock(unittest.TestCase):
             else:
                 msg = msgq.preparemsg({"type" : "ping"}, {"data" : data})
                 for i in range(1, count):
-                    out.send(msg)
+                    out.sendall(msg)
                     amount = 0
                     while amount < length:
                         amount += len(out.recv(1024 + length - amount))