Browse Source

[2690] Use select in msgq to wait for sockets

Michal 'vorner' Vaner 11 years ago
parent
commit
7b94068163
2 changed files with 44 additions and 4 deletions
  1. 39 4
      src/bin/msgq/msgq.py.in
  2. 5 0
      src/bin/msgq/msgq_messages.mes

+ 39 - 4
src/bin/msgq/msgq.py.in

@@ -717,10 +717,45 @@ class MsgQ:
         """Process messages.  Forever.  Mostly."""
         self.running = True
 
-        if self.poller:
-            self.run_poller()
-        else:
-            self.run_kqueue()
+        self.run_select()
+
+    def run_select(self):
+        while self.running:
+            reads = list(self.fd_to_lname.keys())
+            if self.listen_socket.fileno() != -1: # Skip in tests
+                reads.append(self.listen_socket.fileno())
+            if self.__poller_sock.fileno() != -1:
+                reads.append(self.__poller_sock.fileno())
+            writes = list(self.sendbuffs.keys())
+            (read_ready, write_ready) = ([], [])
+            try:
+                (read_ready, write_ready, _) = select.select(reads, writes,
+                                                             []);
+            except select.error as err:
+                if err.args[0] == errno.EINTR:
+                    continue # Just try it again if interrupted.
+                else:
+                    logger.fatal(MSGQ_SELECT_ERROR, err)
+                    break
+            with self.__lock:
+                write_ready = set(write_ready)
+                for fd in read_ready:
+                    # Do only one operation per loop iteration on the given fd.
+                    # It could be possible to perform both, but it may have
+                    # undesired side effects in special situations (like, if the
+                    # read closes the socket).
+                    if fd in write_ready:
+                        write_ready.remove(fd)
+                    if fd == self.listen_socket.fileno():
+                        self.process_accept()
+                    elif fd == self.__poller_sock.fileno():
+                        # The signal socket. We should terminate now.
+                        self.running = False
+                        break
+                    else:
+                        self._process_fd(fd, False, True, False)
+                for fd in write_ready:
+                    self._process_fd(fd, True, False, False)
 
     def run_poller(self):
         while self.running:

+ 5 - 0
src/bin/msgq/msgq_messages.mes

@@ -119,6 +119,11 @@ on shutdown unless there's really something unexpected.
 % MSGQ_RECV_HDR Received header: %1
 Debug message. This message includes the whole routing header of a packet.
 
+% MSGQ_SELECT_ERROR Error while waiting for events: %1
+A low-level error happened when waiting for events, the error is logged. The
+reason for this varies, but it usually means the system is short on some
+resources.
+
 % MSGQ_SEND_ERROR Error while sending to socket %1: %2
 There was a low-level error when sending data to a socket. The error is logged
 and the corresponding socket is dropped.