Browse Source

Merge branch 'trac1180'

Jelte Jansen 13 years ago
parent
commit
6e68b97b05
1 changed files with 20 additions and 13 deletions
  1. 20 13
      src/bin/msgq/msgq.py.in

+ 20 - 13
src/bin/msgq/msgq.py.in

@@ -28,7 +28,6 @@ import struct
 import errno
 import errno
 import time
 import time
 import select
 import select
-import pprint
 import random
 import random
 from optparse import OptionParser, OptionValueError
 from optparse import OptionParser, OptionValueError
 import isc.util.process
 import isc.util.process
@@ -96,10 +95,10 @@ class MsgQ:
                                "@PACKAGE_NAME@",
                                "@PACKAGE_NAME@",
                                "msgq_socket").replace("${prefix}",
                                "msgq_socket").replace("${prefix}",
                                                       "@prefix@")
                                                       "@prefix@")
-    
+
     def __init__(self, socket_file=None, verbose=False):
     def __init__(self, socket_file=None, verbose=False):
         """Initialize the MsgQ master.
         """Initialize the MsgQ master.
-        
+
         The socket_file specifies the path to the UNIX domain socket
         The socket_file specifies the path to the UNIX domain socket
         that the msgq process listens on. If it is None, the
         that the msgq process listens on. If it is None, the
         environment variable BIND10_MSGQ_SOCKET_FILE is used. If that
         environment variable BIND10_MSGQ_SOCKET_FILE is used. If that
@@ -135,7 +134,7 @@ class MsgQ:
             self.poller = select.poll()
             self.poller = select.poll()
         except AttributeError:
         except AttributeError:
             self.kqueue = select.kqueue()
             self.kqueue = select.kqueue()
-    
+
     def add_kqueue_socket(self, socket, write_filter=False):
     def add_kqueue_socket(self, socket, write_filter=False):
         """Add a kquque filter for a socket.  By default the read
         """Add a kquque filter for a socket.  By default the read
         filter is used; if write_filter is set to True, the write
         filter is used; if write_filter is set to True, the write
@@ -167,7 +166,7 @@ class MsgQ:
                              self.socket_file)
                              self.socket_file)
 
 
         self.listen_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
         self.listen_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-        
+
         if os.path.exists(self.socket_file):
         if os.path.exists(self.socket_file):
             os.remove(self.socket_file)
             os.remove(self.socket_file)
         try:
         try:
@@ -196,7 +195,7 @@ class MsgQ:
 
 
         if self.verbose:
         if self.verbose:
             sys.stdout.write("[b10-msgq] Listening\n")
             sys.stdout.write("[b10-msgq] Listening\n")
-        
+
         self.runnable = True
         self.runnable = True
 
 
     def process_accept(self):
     def process_accept(self):
@@ -293,9 +292,6 @@ class MsgQ:
             sys.stderr.write("[b10-msgq] Routing decode error: %s\n" % err)
             sys.stderr.write("[b10-msgq] Routing decode error: %s\n" % err)
             return
             return
 
 
-#        sys.stdout.write("\t" + pprint.pformat(routingmsg) + "\n")
-#        sys.stdout.write("\t" + pprint.pformat(data) + "\n")
-
         self.process_command(fd, sock, routingmsg, data)
         self.process_command(fd, sock, routingmsg, data)
 
 
     def process_command(self, fd, sock, routing, data):
     def process_command(self, fd, sock, routing, data):
@@ -357,7 +353,18 @@ class MsgQ:
         if fileno in self.sendbuffs:
         if fileno in self.sendbuffs:
             amount_sent = 0
             amount_sent = 0
         else:
         else:
-            amount_sent = self.__send_data(sock, msg)
+            try:
+                amount_sent = self.__send_data(sock, msg)
+            except socket.error as sockerr:
+                # in the case the other side seems gone, kill the socket
+                # and drop the send action
+                if sockerr.errno == errno.EPIPE:
+                    print("[b10-msgq] SIGPIPE on send, dropping message " +
+                          "and closing connection")
+                    self.kill_socket(fileno, sock)
+                    return
+                else:
+                    raise
 
 
         # Still something to send
         # Still something to send
         if amount_sent < len(msg):
         if amount_sent < len(msg):
@@ -448,12 +455,12 @@ class MsgQ:
 
 
     def run(self):
     def run(self):
         """Process messages.  Forever.  Mostly."""
         """Process messages.  Forever.  Mostly."""
-        
+
         if self.poller:
         if self.poller:
             self.run_poller()
             self.run_poller()
         else:
         else:
             self.run_kqueue()
             self.run_kqueue()
-    
+
     def run_poller(self):
     def run_poller(self):
         while True:
         while True:
             try:
             try:
@@ -511,7 +518,7 @@ def signal_handler(signal, frame):
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
     def check_port(option, opt_str, value, parser):
     def check_port(option, opt_str, value, parser):
-        """Function to insure that the port we are passed is actually 
+        """Function to insure that the port we are passed is actually
         a valid port number. Used by OptionParser() on startup."""
         a valid port number. Used by OptionParser() on startup."""
         intval = int(value)
         intval = int(value)
         if (intval < 0) or (intval > 65535):
         if (intval < 0) or (intval > 65535):