Browse Source

Merge branch 'master' into trac502-fix-author

chenzhengzhang 14 years ago
parent
commit
c30cd00cf1
2 changed files with 20 additions and 8 deletions
  1. 9 3
      src/bin/msgq/msgq.py.in
  2. 11 5
      src/bin/msgq/tests/msgq_test.py

+ 9 - 3
src/bin/msgq/msgq.py.in

@@ -323,12 +323,18 @@ class MsgQ:
 
 
     def __send_data(self, sock, data):
     def __send_data(self, sock, data):
         try:
         try:
-            return sock.send(data, socket.MSG_DONTWAIT)
+            # We set the socket nonblocking, MSG_DONTWAIT doesn't exist
+            # on some OSes
+            sock.setblocking(0)
+            return sock.send(data)
         except socket.error as e:
         except socket.error as e:
             if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK:
             if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK:
                 return 0
                 return 0
             else:
             else:
                 raise e
                 raise e
+        finally:
+            # And set it back again
+            sock.setblocking(1)
 
 
     def send_prepared_msg(self, sock, msg):
     def send_prepared_msg(self, sock, msg):
         # Try to send the data, but only if there's nothing waiting
         # Try to send the data, but only if there's nothing waiting
@@ -355,7 +361,7 @@ class MsgQ:
                     self.poller.register(fileno, select.POLLIN |
                     self.poller.register(fileno, select.POLLIN |
                         select.POLLOUT)
                         select.POLLOUT)
                 else:
                 else:
-                    self.add_kqueue_socket(fileno, True)
+                    self.add_kqueue_socket(sock, True)
             self.sendbuffs[fileno] = (last_sent, buff)
             self.sendbuffs[fileno] = (last_sent, buff)
 
 
     def __process_write(self, fileno):
     def __process_write(self, fileno):
@@ -370,7 +376,7 @@ class MsgQ:
             if self.poller:
             if self.poller:
                 self.poller.register(fileno, select.POLLIN)
                 self.poller.register(fileno, select.POLLIN)
             else:
             else:
-                self.add_kqueue_socket(fileno)
+                self.add_kqueue_socket(sock)
             del self.sendbuffs[fileno]
             del self.sendbuffs[fileno]
         else:
         else:
             self.sendbuffs[fileno] = (time.clock(), msg)
             self.sendbuffs[fileno] = (time.clock(), msg)

+ 11 - 5
src/bin/msgq/tests/msgq_test.py

@@ -117,7 +117,7 @@ class SendNonblock(unittest.TestCase):
     Tests that the whole thing will not get blocked if someone does not read.
     Tests that the whole thing will not get blocked if someone does not read.
     """
     """
 
 
-    def terminate_check(self, task, timeout = 1):
+    def terminate_check(self, task, timeout = 10):
         """
         """
         Runs task in separate process (task is a function) and checks
         Runs task in separate process (task is a function) and checks
         it terminates sooner than timeout.
         it terminates sooner than timeout.
@@ -161,23 +161,28 @@ class SendNonblock(unittest.TestCase):
         Tries sending messages (and not reading them) until it either times
         Tries sending messages (and not reading them) until it either times
         out (in blocking call, wrong) or closes it (correct).
         out (in blocking call, wrong) or closes it (correct).
         """
         """
+        data = "data"
+        for i in range(1, 10):
+            data += data
         self.terminate_check(lambda: self.infinite_sender(
         self.terminate_check(lambda: self.infinite_sender(
-            lambda msgq, socket: msgq.sendmsg(socket, {}, {"message" : "x"})))
+            lambda msgq, socket: msgq.sendmsg(socket, {}, {"message" : data})))
 
 
     def test_infinite_sendprepared(self):
     def test_infinite_sendprepared(self):
         """
         """
         Tries sending data (and not reading them) until it either times
         Tries sending data (and not reading them) until it either times
         out (in blocking call, wrong) or closes it (correct).
         out (in blocking call, wrong) or closes it (correct).
         """
         """
+        data = b"data"
+        for i in range(1, 10):
+            data += data
         self.terminate_check(lambda: self.infinite_sender(
         self.terminate_check(lambda: self.infinite_sender(
-            lambda msgq, socket: msgq.send_prepared_msg(socket, b"data")))
+            lambda msgq, socket: msgq.send_prepared_msg(socket, data)))
 
 
     def send_many(self, data):
     def send_many(self, data):
         """
         """
         Tries that sending a command many times and getting an answer works.
         Tries that sending a command many times and getting an answer works.
         """
         """
         msgq = MsgQ()
         msgq = MsgQ()
-        msgq.setup_poller()
         # msgq.run needs to compare with the listen_socket, so we provide
         # msgq.run needs to compare with the listen_socket, so we provide
         # a replacement
         # a replacement
         class DummySocket:
         class DummySocket:
@@ -189,7 +194,8 @@ class SendNonblock(unittest.TestCase):
             length = len(data)
             length = len(data)
             queue_pid = os.fork()
             queue_pid = os.fork()
             if queue_pid == 0:
             if queue_pid == 0:
-                signal.alarm(10)
+                signal.alarm(30)
+                msgq.setup_poller()
                 msgq.register_socket(queue)
                 msgq.register_socket(queue)
                 msgq.run()
                 msgq.run()
             else:
             else: