Parcourir la source

[1924] Consider EPIPE and similar as no recipient

If an EPIPE is received in attempt to send a message, consider it
there's no recipient. This is still not perfect, since there may be data
in queue and before the socket is handled, it would be just appended.
Or, if the other socket is closed before the whole queue is sent there,
the messages are silently blackholed. But fully tracking each message
would be too much work for this ticket.
Michal 'vorner' Vaner il y a 12 ans
Parent
commit
f29a543f5e
3 fichiers modifiés avec 98 ajouts et 14 suppressions
  1. 17 6
      src/bin/msgq/msgq.py.in
  2. 80 7
      src/bin/msgq/tests/msgq_test.py
  3. 1 1
      src/lib/util/common_defs.cc

+ 17 - 6
src/bin/msgq/msgq.py.in

@@ -465,6 +465,15 @@ class MsgQ:
             sock.setblocking(1)
 
     def send_prepared_msg(self, sock, msg):
+        '''
+        Add a message to the queue. If there's nothing waiting, try
+        to send it right away.
+
+        Return if the socket is still alive. It can return false if the
+        socket dies (for example due to EPIPE in the attempt to send).
+        Returning true does not guarantee the message will be delivered,
+        but returning false means it won't.
+        '''
         # Try to send the data, but only if there's nothing waiting
         fileno = sock.fileno()
         if fileno in self.sendbuffs:
@@ -473,7 +482,7 @@ class MsgQ:
             amount_sent = self.__send_data(sock, msg)
             if amount_sent is None:
                 # Socket has been killed, drop the send
-                return
+                return False
 
         # Still something to send, add it to outgoing queue
         if amount_sent < len(msg):
@@ -483,7 +492,7 @@ class MsgQ:
                 (last_sent, buff) = self.sendbuffs[fileno]
                 if now - last_sent > 0.1:
                     self.kill_socket(fileno, sock)
-                    return
+                    return False
                 buff += msg
             else:
                 buff = msg[amount_sent:]
@@ -494,6 +503,7 @@ class MsgQ:
                 else:
                     self.add_kqueue_socket(sock, True)
             self.sendbuffs[fileno] = (last_sent, buff)
+        return True
 
     def __process_write(self, fileno):
         # Try to send some data from the buffer
@@ -549,10 +559,11 @@ class MsgQ:
             # Don't bounce to self
             sockets.remove(sock)
 
-        if sockets:
-            for socket in sockets:
-                self.send_prepared_msg(socket, msg)
-        elif routing.get(CC_HEADER_WANT_ANSWER) and \
+        has_recipient = False
+        for socket in sockets:
+            if self.send_prepared_msg(socket, msg):
+                has_recipient = True
+        if not has_recipient and routing.get(CC_HEADER_WANT_ANSWER) and \
             CC_HEADER_REPLY not in routing:
             # We have no recipients. But the sender insists on a reply
             # (and the message isn't a reply itself). We need to send

+ 80 - 7
src/bin/msgq/tests/msgq_test.py

@@ -168,14 +168,16 @@ class MsgQTest(unittest.TestCase):
         undeliverable notifications under the correct circumstances.
         """
         sent_messages = []
-        def fake_end_prepared_msg(socket, msg):
+        def fake_send_prepared_msg(socket, msg):
             sent_messages.append((socket, msg))
-        self.__msgq.send_prepared_msg = fake_end_prepared_msg
+            return True
+        self.__msgq.send_prepared_msg = fake_send_prepared_msg
         # These would be real sockets in the MsgQ, but we pass them as
         # parameters only, so we don't need them to be. We use simple
         # integers to tell one from another.
         sender = 1
         recipient = 2
+        another_recipiet = 3
         # The routing headers and data to test with.
         routing = {
             'to': '*',
@@ -255,6 +257,39 @@ class MsgQTest(unittest.TestCase):
         self.assertEqual(2, sent_messages[0][0]) # The recipient
         self.assertEqual((routing, data), self.parse_msg(sent_messages[0][1]))
         sent_messages = []
+        # If an attempt to send fails, consider it no recipient.
+        def fail_send_prepared_msg(socket, msg):
+            '''
+            Pretend sending a message failed. After one call, return to the
+            usual mock, so the errors or other messages can be sent.
+            '''
+            self.__msgq.send_prepared_msg = fake_send_prepared_msg
+        self.__msgq.send_prepared_msg = fail_send_prepared_msg
+        self.__msgq.process_command_send(sender, routing, data)
+        self.assertEqual(1, len(sent_messages))
+        self.assertEqual(1, sent_messages[0][0])
+        self.assertEqual(({
+                              'group': 'group',
+                              'instance': '*',
+                              'reply': 42,
+                              'seq': 42,
+                              'from': 'msgq',
+                              'to': 'sender',
+                              'want_answer': True
+                          }, {'result': [-1, "No such recipient"]}),
+                          self.parse_msg(sent_messages[0][1]))
+        sent_messages = []
+        # But if there are more recipients and only one fails, it should
+        # be delivered to the other and not considered an error
+        self.__msgq.send_prepared_msg = fail_send_prepared_msg
+        routing["to"] = '*'
+        self.__msgq.subs.find = lambda group, instance: [recipient,
+                                                         another_recipiet]
+        self.__msgq.process_command_send(sender, routing, data)
+        self.assertEqual(1, len(sent_messages))
+        self.assertEqual(3, sent_messages[0][0]) # The recipient
+        self.assertEqual((routing, data), self.parse_msg(sent_messages[0][1]))
+        sent_messages = []
 
 class DummySocket:
     """
@@ -360,17 +395,27 @@ class SendNonblock(unittest.TestCase):
             self.assertEqual(0, status,
                 "The task did not complete successfully in time")
 
+    def get_msgq_with_sockets(self):
+        '''
+        Create a message queue and prepare it for use with a socket pair.
+        The write end is put into the message queue, so we can check it.
+        It returns (msgq, read_end, write_end). It is expected the sockets
+        are closed by the caller afterwards.
+        '''
+        msgq = MsgQ()
+        # We do only partial setup, so we don't create the listening socket
+        msgq.setup_poller()
+        (read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+        msgq.register_socket(write)
+        return (msgq, read, write)
+
     def infinite_sender(self, sender):
         """
         Sends data until an exception happens. socket.error is caught,
         as it means the socket got closed. Sender is called to actually
         send the data.
         """
-        msgq = MsgQ()
-        # We do only partial setup, so we don't create the listening socket
-        msgq.setup_poller()
-        (read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
-        msgq.register_socket(write)
+        (msgq, read, write) = self.get_msgq_with_sockets()
         # Keep sending while it is not closed by the msgq
         try:
             while True:
@@ -406,6 +451,34 @@ class SendNonblock(unittest.TestCase):
         self.terminate_check(lambda: self.infinite_sender(
             lambda msgq, socket: msgq.send_prepared_msg(socket, data)))
 
+    def test_sendprepared_success(self):
+        '''
+        Test the send_prepared_msg returns success when queueing messages.
+        It does so on the first attempt (when it actually tries to send
+        something to the socket) and on any attempt that follows and the
+        buffer is already full.
+        '''
+        (msgq, read, write) = self.get_msgq_with_sockets()
+        # Now keep sending until we fill in something into the internal
+        # buffer.
+        while not write.fileno() in msgq.sendbuffs:
+            self.assertTrue(msgq.send_prepared_msg(write, b'data'))
+        read.close()
+        write.close()
+
+    def test_sendprepared_epipe(self):
+        '''
+        Test the send_prepared_msg returns false when we try to queue a
+        message and the other side is not there any more. It should be done
+        with EPIPE, so not a fatal error.
+        '''
+        (msgq, read, write) = self.get_msgq_with_sockets()
+        # Close one end. It should make a EPIPE on the other.
+        read.close()
+        # Now it should soft-fail
+        self.assertFalse(msgq.send_prepared_msg(write, b'data'))
+        write.close()
+
     def send_many(self, data):
         """
         Tries that sending a command many times and getting an answer works.

+ 1 - 1
src/lib/util/common_defs.cc

@@ -38,7 +38,7 @@ const char* CC_COMMAND_SEND = "send";
 const char* CC_TO_WILDCARD = "*";
 const char* CC_INSTANCE_WILDCARD = "*";
 // Reply codes
-const int CC_REPLY_NO_RECPT = -1; // No recipient
+const int CC_REPLY_NO_RECPT = -1;
 
 }
 }