Browse Source

merge trac352 to trunk.

git-svn-id: svn://bind10.isc.org/svn/bind10/trunk@3366 e5f2f494-b856-4b98-b285-d166d9295462
Likun Zhang 14 years ago
parent
commit
ddca697026

+ 6 - 0
ChangeLog

@@ -1,3 +1,9 @@
+  112.	[func]		zhang likun
+	Add one mixin class to override the naive serve_forever() provided
+	in python library socketserver. Instead of polling for shutdwon
+	every poll_interval seconds, one socketpair is used to wake up
+	the waiting server.(Trac #352, svn TBD)
+
   111.	[bug]*   zhanglikun, Michal Vaner
 	Make sure process xfrin/xfrout/zonemgr/cmdctl can be stoped
 	properly when user enter "ctrl+c" or 'Boss shutdown' command

+ 6 - 1
src/bin/cmdctl/cmdctl.py.in

@@ -47,6 +47,8 @@ import isc.net.parse
 import isc.utils.process
 from optparse import OptionParser, OptionValueError
 from hashlib import sha1
+from isc.utils import socketserver_mixin
+
 try:
     import threading
 except ImportError:
@@ -441,7 +443,9 @@ class CommandControl():
 
         return (keyfile, certfile, accountsfile)
 
-class SecureHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
+class SecureHTTPServer(socketserver_mixin.NoPollMixIn,
+                       socketserver.ThreadingMixIn,
+                       http.server.HTTPServer):
     '''Make the server address can be reused.'''
     allow_reuse_address = True
 
@@ -449,6 +453,7 @@ class SecureHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
                  CommandControlClass,
                  idle_timeout = 1200, verbose = False):
         '''idle_timeout: the max idle time for login'''
+        socketserver_mixin.NoPollMixIn.__init__(self)
         try:
             http.server.HTTPServer.__init__(self, server_address, RequestHandlerClass)
         except socket.error as err:

+ 6 - 23
src/bin/xfrout/xfrout.py.in

@@ -35,6 +35,8 @@ import socket
 import select
 import errno
 from optparse import OptionParser, OptionValueError
+from isc.utils import socketserver_mixin
+
 try:
     from libxfr_python import *
     from pydnspp import *
@@ -291,12 +293,13 @@ class XfroutSession(BaseRequestHandler):
 
         self._send_message_with_last_soa(msg, sock, rrset_soa, message_upper_len)
 
-class UnixSockServer(ThreadingUnixStreamServer):
+class UnixSockServer(socketserver_mixin.NoPollMixIn, ThreadingUnixStreamServer):
     '''The unix domain socket server which accept xfr query sent from auth server.'''
 
     def __init__(self, sock_file, handle_class, shutdown_event, config_data, cc, log):
         self._remove_unused_sock_file(sock_file)
         self._sock_file = sock_file
+        socketserver_mixin.NoPollMixIn.__init__(self)
         ThreadingUnixStreamServer.__init__(self, sock_file, handle_class)
         self._lock = threading.Lock()
         self._transfers_counter = 0
@@ -341,7 +344,7 @@ class UnixSockServer(ThreadingUnixStreamServer):
             return True 
 
     def shutdown(self):
-        ThreadingUnixStreamServer.shutdown(self)
+        super().shutdown() # call the shutdown() of class socketserver_mixin.NoPollMixIn
         try:
             os.unlink(self._sock_file)
         except Exception as e:
@@ -383,26 +386,6 @@ class UnixSockServer(ThreadingUnixStreamServer):
         self._transfers_counter -= 1
         self._lock.release()
 
-def listen_on_xfr_query(unix_socket_server):
-    '''Listen xfr query in one single thread. Polls for shutdown 
-    every 0.1 seconds, is there a better time?
-    '''
-
-    while True:
-        try:
-            unix_socket_server.serve_forever(poll_interval = 0.1)
-        except select.error as err:
-            # serve_forever() calls select.select(), which can be 
-            # interrupted.
-            # If it is interrupted, it raises select.error with the 
-            # errno set to EINTR. We ignore this case, and let the
-            # normal program flow continue by trying serve_forever()
-            # again.
-            if err.args[0] != errno.EINTR: raise
-        else:
-            # serve_forever() loop has been stoped normally.
-            break
-
 class XfroutServer:
     def __init__(self):
         self._unix_socket_server = None
@@ -424,7 +407,7 @@ class XfroutServer:
         self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession, 
                                                   self._shutdown_event, self._config_data,
                                                   self._cc, self._log);
-        listener = threading.Thread(target = listen_on_xfr_query, args = (self._unix_socket_server,))
+        listener = threading.Thread(target=self._unix_socket_server.serve_forever)
         listener.start()
         
     def _start_notifier(self):

+ 1 - 1
src/lib/python/isc/utils/Makefile.am

@@ -1,5 +1,5 @@
 SUBDIRS = tests
 
-python_PYTHON = __init__.py process.py
+python_PYTHON = __init__.py process.py socketserver_mixin.py
 
 pythondir = $(pyexecdir)/isc/utils

+ 1 - 0
src/lib/python/isc/utils/__init__.py

@@ -0,0 +1 @@
+from isc.utils.socketserver_mixin import *

+ 92 - 0
src/lib/python/isc/utils/socketserver_mixin.py

@@ -0,0 +1,92 @@
+# Copyright (C) 2010  Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+import threading
+import socket
+import select
+
+SOCK_DATA = b'somedata'
+class NoPollMixIn:
+    '''This is a mix-in class to override the function serve_forever()
+    and shutdown() in class socketserver.BaseServer.
+
+    As commented in the module source code,  serve_forever() in
+    socketserver.BaseServer uses polling for a shutdown request, which
+    "reduces the responsiveness to a shutdown request and wastes cpu at
+    all other times."
+
+    This class fixes this problem by introducing internal message
+    passing via a separate socket. Note, however, that according to
+    the module documentation serve_forever() and shutdown() are not
+    categorized as functions that can be overridden via mix-ins.  So
+    this mix-in class may not be compatible with future versions of
+    socketserver.  It should be considered a short term workaround
+    until the original implementation is fixed.
+
+    The NoPollMixIn class should be used together with
+    socketserver.BaseServer or some derived classes of it, and it must
+    be placed before the corresponding socketserver class.  In
+    addition, the constructor of this mix-in must be called
+    explicitely in the derived class.  For example, a basic TCP server
+    without the problem of polling is created as follows:
+
+       class MyServer(NoPollMixIn, socketserver.TCPServer):
+           def __init__(...):
+               ...
+               NoPollMixIn.__init__(self)
+               ...
+
+    To shutdown the server correctly, the serve_forever() method must
+    be run in a separate thread, and shutdown() must be called from
+    some other thread.
+    '''
+    def __init__(self):
+        self.__read_sock, self.__write_sock = socket.socketpair()
+        self._is_shut_down = threading.Event()
+
+    def serve_forever(self, poll_interval=None):
+        ''' Overrides the serve_forever([poll_interval]) in class
+        socketserver.BaseServer.
+
+        It uses a socketpair to wake up the select when shutdown() is
+        called in anther thread.  Note, parameter 'poll_interval' is
+        just used for interface compatibility; it's never used in this
+        function.
+        '''        
+        while True:
+            # block until the self.socket or self.__read_sock is readable
+            try:
+                r, w, e = select.select([self, self.__read_sock], [], [])
+            except select.error as err:
+                if err.args[0] == EINTR:
+                    continue
+                else:
+                    break
+            
+            if self.__read_sock in r:
+                break
+            else:
+                self._handle_request_noblock()
+
+        self._is_shut_down.set()
+
+    def shutdown(self):
+        '''Stops the serve_forever loop.
+
+        Blocks until the loop has finished, the function should be called
+        in another thread when serve_forever is running, or it will block.
+        '''
+        self.__write_sock.send(SOCK_DATA) # make self.__read_sock readable.
+        self._is_shut_down.wait()  # wait until the serve thread terminate

+ 1 - 1
src/lib/python/isc/utils/tests/Makefile.am

@@ -1,4 +1,4 @@
-PYTESTS = process_test.py
+PYTESTS = process_test.py socketserver_mixin_test.py
 EXTRA_DIST = $(PYTESTS)
 
 # later will have configure option to choose this, like: coverage run --branch

+ 63 - 0
src/lib/python/isc/utils/tests/socketserver_mixin_test.py

@@ -0,0 +1,63 @@
+# Copyright (C) 2010  Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+import unittest
+from isc.utils.socketserver_mixin import NoPollMixIn
+import socketserver
+import threading
+import socket
+import time
+
+class MyHandler(socketserver.BaseRequestHandler):
+    def handle(self):
+        data = self.request.recv(20)
+        self.request.send(data)
+
+class MyServer(NoPollMixIn, 
+               socketserver.ThreadingMixIn,
+               socketserver.TCPServer):
+
+    def __init__(self, server_addr, handler_class):
+        NoPollMixIn.__init__(self)
+        socketserver.TCPServer.__init__(self, server_addr, handler_class)
+
+def send_and_get_reply(ip, port, msg):
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    sock.connect((ip, port))
+    sock.send(msg)
+    response = sock.recv(20)
+    sock.close()
+    return response
+
+class TestNoPollMixIn(unittest.TestCase):
+    def test_serve_forever(self):
+        # use port 0 to select an arbitrary unused port.
+        server = MyServer(('127.0.0.1', 0), MyHandler)
+        ip, port = server.server_address
+        server_thread = threading.Thread(target=server.serve_forever)
+        server_thread.start()
+
+        msg = b'senddata'
+        self.assertEqual(msg, send_and_get_reply(ip, port, msg))
+        self.assertTrue(server_thread.is_alive())
+
+        self.assertFalse(server._is_shut_down.is_set())
+        server.shutdown() # Now shutdown the server
+        self.assertTrue(server._is_shut_down.is_set())
+
+if __name__== "__main__":
+    unittest.main()
+
+