Browse Source

merge trunk into branch

git-svn-id: svn://bind10.isc.org/svn/bind10/branches/trac380@3299 e5f2f494-b856-4b98-b285-d166d9295462
Jerry 14 years ago
parent
commit
cb44697526

+ 13 - 4
ChangeLog

@@ -1,7 +1,16 @@
+  111.	[bug]*   zhanglikun, Michal Vaner
+	Make sure process xfrin/xfrout/zonemgr/cmdctl can be stoped
+	properly when user enter "ctrl+c" or 'Boss shutdown' command
+	through	bindctl.
+
+	The ZonemgrRefresh.run_timer and NotifyOut.dispatcher spawn
+	a thread themselves.
+	(Trac #335, svn r3273)
+
   110.  [func]      Michal Vaner
-	Added isc.net.check module to check ip addresses and ports for correctness
-	and isc.net.addr to hold IP address. The bind10, xfrin and cmdctl programs
-	are modified to use it.
+	Added isc.net.check module to check ip addresses and ports for
+	correctness and isc.net.addr to hold IP address. The bind10, xfrin
+	and cmdctl programs are modified to use it.
 	(Trac #353, svn r3240)
 
   109.  [func]		naokikambe
@@ -29,7 +38,7 @@
   105.  [bug]       Michal Vaner
 	Python processes: they no longer take 100% CPU while idle
 	due to a busy loop in reading command session in a nonblocking way.
-	(Trac #349, svn r3153)
+	(Trac #349, svn r3153), (Trac #382, svn r3294)
 
   104.	[bug]		jerry
 	bin/zonemgr: zonemgr should be attempting to refresh expired zones.

+ 1 - 1
src/bin/stats/stats.py.in

@@ -233,7 +233,7 @@ class CCSessionListener(Listener):
         """
         check the cc chanel
         """
-        return self.cc_session.check_command()
+        return self.cc_session.check_command(False)
 
     def config_handler(self, new_config):
         """

+ 2 - 2
src/bin/stats/tests/isc/config/ccsession.py

@@ -90,8 +90,8 @@ class ModuleCCSession(ConfigData):
     def close(self):
         self._session.close()
 
-    def check_command(self):
-        msg, env = self._session.group_recvmsg(False)
+    def check_command(self, nonblock=True):
+        msg, env = self._session.group_recvmsg(nonblock)
         if not msg or 'result' in msg:
             return
         cmd, arg = parse_command(msg)

+ 40 - 23
src/bin/zonemgr/tests/zonemgr_test.py

@@ -45,13 +45,22 @@ class MySession():
 
 class MyZonemgrRefresh(ZonemgrRefresh):
     def __init__(self):
-        self._cc = MySession()
-        self._db_file = "initdb.file"
+        class FakeConfig:
+            def get(self, name):
+                if name == 'lowerbound_refresh':
+                    return LOWERBOUND_REFRESH
+                elif name == 'lowerbound_retry':
+                    return LOWERBOUND_RETRY
+                elif name == 'max_transfer_timeout':
+                    return MAX_TRANSFER_TIMEOUT
+                elif name == 'jitter_scope':
+                    return JITTER_SCOPE
+                else:
+                    raise ValueError('Uknown config option')
+        self._master_socket, self._slave_socket = socket.socketpair()
+        ZonemgrRefresh.__init__(self, MySession(), "initdb.file",
+            self._slave_socket, FakeConfig())
         current_time = time.time()
-        self._max_transfer_timeout = MAX_TRANSFER_TIMEOUT
-        self._lowerbound_refresh = LOWERBOUND_REFRESH
-        self._lowerbound_retry = LOWERBOUND_RETRY
-        self._jitter_scope = JITTER_SCOPE
         self._zonemgr_refresh_info = { 
          ('sd.cn.', 'IN'): {
          'last_refresh_time': current_time,
@@ -67,8 +76,8 @@ class MyZonemgrRefresh(ZonemgrRefresh):
 
 class TestZonemgrRefresh(unittest.TestCase):
     def setUp(self):
-        self.stdout_backup = sys.stdout
-        sys.stdout = open(os.devnull, 'w')
+        self.stderr_backup = sys.stderr
+        sys.stderr = open(os.devnull, 'w')
         self.zone_refresh = MyZonemgrRefresh()
 
     def test_random_jitter(self):
@@ -101,7 +110,7 @@ class TestZonemgrRefresh(unittest.TestCase):
         time2 = time.time()
         self.assertTrue((time1 + 7200 * 3 / 4) <= zone_timeout)
         self.assertTrue(zone_timeout <= time2 + 7200)
-        
+
     def test_set_zone_retry_timer(self):
         time1 = time.time()
         self.zone_refresh._set_zone_retry_timer(ZONE_NAME_CLASS1_IN)
@@ -147,6 +156,8 @@ class TestZonemgrRefresh(unittest.TestCase):
          
     def test_zonemgr_reload_zone(self):
         soa_rdata = 'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600'
+        # We need to restore this not to harm other tests
+        old_get_zone_soa = sqlite3_ds.get_zone_soa
         def get_zone_soa(zone_name, db_file):
             return (1, 2, 'sd.cn.', 'cn.sd.', 21600, 'SOA', None, 
                     'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600')
@@ -154,6 +165,7 @@ class TestZonemgrRefresh(unittest.TestCase):
 
         self.zone_refresh.zonemgr_reload_zone(ZONE_NAME_CLASS1_IN)
         self.assertEqual(soa_rdata, self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_soa_rdata"])
+        sqlite3_ds.get_zone_soa = old_get_zone_soa
 
     def test_get_zone_notifier_master(self):
         notify_master = "192.168.1.1"
@@ -231,6 +243,9 @@ class TestZonemgrRefresh(unittest.TestCase):
 
     def test_zonemgr_add_zone(self):
         soa_rdata = 'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600'
+        # This needs to be restored. The following test actually failed if we left
+        # this unclean
+        old_get_zone_soa = sqlite3_ds.get_zone_soa
 
         def get_zone_soa(zone_name, db_file):
             return (1, 2, 'sd.cn.', 'cn.sd.', 21600, 'SOA', None, 
@@ -251,7 +266,8 @@ class TestZonemgrRefresh(unittest.TestCase):
             return None
         sqlite3_ds.get_zone_soa = get_zone_soa2
         self.assertRaises(ZonemgrException, self.zone_refresh.zonemgr_add_zone, \
-                                          ZONE_NAME_CLASS1_IN)
+                                         ZONE_NAME_CLASS1_IN)
+        sqlite3_ds.get_zone_soa = old_get_zone_soa
 
     def test_build_zonemgr_refresh_info(self):
         soa_rdata = 'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600'
@@ -382,7 +398,7 @@ class TestZonemgrRefresh(unittest.TestCase):
         """This case will run timer in daemon thread. 
         The zone's next_refresh_time is less than now, so zonemgr will do zone refresh 
         immediately. The zone's state will become "refreshing". 
-        Then closing the socket ,the timer will stop, and throw a ZonemgrException."""
+        """
         time1 = time.time()
         self.zone_refresh._zonemgr_refresh_info = {
                 ("sd.cn.", "IN"):{
@@ -391,17 +407,11 @@ class TestZonemgrRefresh(unittest.TestCase):
                     'zone_soa_rdata': 'a.dns.cn. root.cnnic.cn. 2009073105 7200 3600 2419200 21600', 
                     'zone_state': ZONE_OK}
                 }
-        master_socket, slave_socket = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
-        self.zone_refresh._socket = master_socket 
-        master_socket.close()
-        self.assertRaises(ZonemgrException, self.zone_refresh.run_timer)
-
-        self.zone_refresh._socket = slave_socket
-        listener = threading.Thread(target = self.zone_refresh.run_timer, args = ())
-        listener.setDaemon(True)
-        listener.start()
-        time.sleep(1)
-
+        self.zone_refresh._check_sock = self.zone_refresh._master_socket 
+        listener = self.zone_refresh.run_timer(daemon=True)
+        # Shut down the timer thread
+        self.zone_refresh.shutdown()
+        # After running timer, the zone's state should become "refreshing".
         zone_state = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_state"]
         self.assertTrue("refresh_timeout" in self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN].keys())
         self.assertTrue(zone_state == ZONE_REFRESHING)
@@ -419,9 +429,16 @@ class TestZonemgrRefresh(unittest.TestCase):
         self.assertEqual(19800, self.zone_refresh._max_transfer_timeout)
         self.assertEqual(0.25, self.zone_refresh._jitter_scope)
 
+    def test_shutdown(self):
+        self.zone_refresh._check_sock = self.zone_refresh._master_socket 
+        listener = self.zone_refresh.run_timer()
+        self.assertTrue(listener.is_alive())
+        # Shut down the timer thread
+        self.zone_refresh.shutdown()
+        self.assertFalse(listener.is_alive())
 
     def tearDown(self):
-        sys.stdout = self.stdout_backup
+        sys.stderr= self.stderr_backup
 
 
 class MyCCSession():

+ 83 - 41
src/bin/zonemgr/zonemgr.py.in

@@ -16,7 +16,7 @@
 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
 # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 
-"""\
+"""
 This file implements the Secondary Manager program.
 
 The secondary manager is one of the co-operating processes
@@ -92,15 +92,19 @@ class ZonemgrRefresh:
     """This class will maintain and manage zone refresh info.
     It also provides methods to keep track of zone timers and 
     do zone refresh.
+    Zone timers can be started by calling run_timer(), and it 
+    can be stopped by calling shutdown() in another thread.
+
     """
 
     def __init__(self, cc, db_file, slave_socket, config_data):
         self._cc = cc
-        self._socket = slave_socket 
+        self._check_sock = slave_socket 
         self._db_file = db_file
         self.update_config_data(config_data)
         self._zonemgr_refresh_info = {} 
         self._build_zonemgr_refresh_info()
+        self._running = False
     
     def _random_jitter(self, max, jitter):
         """Imposes some random jitters for refresh and
@@ -319,40 +323,86 @@ class ZonemgrRefresh:
 
         return False
 
-    def run_timer(self):
-        """Keep track of zone timers."""
-        while True:
-            # Zonemgr has no zone.
+    def _run_timer(self, start_event):
+        start_event.set()
+        while self._running:
+            # If zonemgr has no zone, set timer timeout to self._lowerbound_retry.
             if self._zone_mgr_is_empty():
-                time.sleep(self._lowerbound_retry) # A better time?
-                continue
-
-            zone_need_refresh = self._find_need_do_refresh_zone()
-            # If don't get zone with minimum next refresh time, set timer timeout = lowerbound_retry 
-            if not zone_need_refresh:
                 timeout = self._lowerbound_retry 
             else:
-                timeout = self._get_zone_next_refresh_time(zone_need_refresh) - self._get_current_time()
-                if (timeout < 0):
-                    self._do_refresh(zone_need_refresh)
-                    continue
+                zone_need_refresh = self._find_need_do_refresh_zone()
+                # If don't get zone with minimum next refresh time, set timer timeout to self._lowerbound_retry 
+                if not zone_need_refresh:
+                    timeout = self._lowerbound_retry 
+                else:
+                    timeout = self._get_zone_next_refresh_time(zone_need_refresh) - self._get_current_time()
+                    if (timeout < 0):
+                        self._do_refresh(zone_need_refresh)
+                        continue
 
             """ Wait for the socket notification for a maximum time of timeout 
             in seconds (as float)."""
             try:
-                (rlist, wlist, xlist) = select.select([self._socket], [], [], timeout)
-                if rlist:
-                    self._socket.recv(32)
-            except ValueError as e:
-                raise ZonemgrException("[b10-zonemgr] Socket has been closed\n")
-                break
+                rlist, wlist, xlist = select.select([self._check_sock, self._read_sock], [], [], timeout)
             except select.error as e:
                 if e.args[0] == errno.EINTR:
                     (rlist, wlist, xlist) = ([], [], [])
                 else:
-                    raise ZonemgrException("[b10-zonemgr] Error with select(): %s\n" % e)
+                    sys.stderr.write("[b10-zonemgr] Error with select(); %s\n" % e)
                     break
 
+            for fd in rlist:
+                if fd == self._read_sock: # awaken by shutdown socket 
+                    # self._running will be False by now, if it is not a false
+                    # alarm
+                    continue
+                if fd == self._check_sock: # awaken by check socket
+                    self._check_sock.recv(32)
+
+    def run_timer(self, daemon=False):
+        """
+        Keep track of zone timers. Spawns and starts a thread. The thread object is returned.
+
+        You can stop it by calling shutdown().
+        """
+        # Small sanity check
+        if self._running:
+            raise RuntimeError("Trying to run the timers twice at the same time")
+
+        # Prepare the launch
+        self._running = True
+        (self._read_sock, self._write_sock) = socket.socketpair()
+        start_event = threading.Event()
+
+        # Start the thread
+        self._thread = threading.Thread(target = self._run_timer,
+            args = (start_event,))
+        if daemon:
+            self._thread.setDaemon(True)
+        self._thread.start()
+        start_event.wait()
+
+        # Return the thread to anyone interested
+        return self._thread
+
+    def shutdown(self):
+        """
+        Stop the run_timer() thread. Block until it finished. This must be
+        called from a different thread.
+        """
+        if not self._running:
+            raise RuntimeError("Trying to shutdown, but not running")
+
+        # Ask the thread to stop
+        self._running = False
+        self._write_sock.send(b'shutdown') # make self._read_sock readble
+        # Wait for it to actually finnish
+        self._thread.join()
+        # Wipe out what we do not need
+        self._thread = None
+        self._read_sock = None
+        self._write_sock = None
+
     def update_config_data(self, new_config):
         """ update ZonemgrRefresh config """
         self._lowerbound_refresh = new_config.get('lowerbound_refresh')
@@ -360,7 +410,6 @@ class ZonemgrRefresh:
         self._max_transfer_timeout = new_config.get('max_transfer_timeout')
         self._jitter_scope = new_config.get('jitter_scope')
 
-
 class Zonemgr:
     """Zone manager class."""
     def __init__(self):
@@ -370,16 +419,11 @@ class Zonemgr:
         # Create socket pair for communicating between main thread and zonemgr timer thread 
         self._master_socket, self._slave_socket = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
         self._zone_refresh = ZonemgrRefresh(self._cc, self._db_file, self._slave_socket, self._config_data)
-        self._start_zone_refresh_timer()
+        self._zone_refresh.run_timer()
 
         self._lock = threading.Lock()
         self._shutdown_event = threading.Event()
-
-    def _start_zone_refresh_timer(self):
-        """Start a new thread to keep track of zone timers"""
-        listener = threading.Thread(target = self._zone_refresh.run_timer, args = ())
-        listener.setDaemon(True)
-        listener.start()
+        self.running = False
 
     def _setup_session(self):
         """Setup two sessions for zonemgr, one(self._module_cc) is used for receiving 
@@ -407,15 +451,12 @@ class Zonemgr:
         """Shutdown the zonemgr process. the thread which is keeping track of zone
         timers should be terminated.
         """ 
+        self._zone_refresh.shutdown()
+
         self._slave_socket.close()
         self._master_socket.close()
-
         self._shutdown_event.set()
-        main_thread = threading.currentThread()
-        for th in threading.enumerate():
-            if th is main_thread:
-                continue
-            th.join()
+        self.running = False
 
     def config_handler(self, new_config):
         """ Update config data. """
@@ -474,21 +515,21 @@ class Zonemgr:
             with self._lock:
                 self._zone_refresh.zone_handle_notify(zone_name_class, master)
             # Send notification to zonemgr timer thread
-            self._master_socket.send(b" ")
+            self._master_socket.send(b" ")# make self._slave_socket readble
 
         elif command == ZONE_XFRIN_SUCCESS_COMMAND:
             """ Handle xfrin success command"""
             zone_name_class = self._parse_cmd_params(args, command)
             with self._lock:
                 self._zone_refresh.zone_refresh_success(zone_name_class)
-            self._master_socket.send(b" ")
+            self._master_socket.send(b" ")# make self._slave_socket readble
 
         elif command == ZONE_XFRIN_FAILED_COMMAND:
             """ Handle xfrin fail command"""
             zone_name_class = self._parse_cmd_params(args, command)
             with self._lock:
                 self._zone_refresh.zone_refresh_fail(zone_name_class)
-            self._master_socket.send(b" ")
+            self._master_socket.send(b" ")# make self._slave_socket readble
 
         elif command == "shutdown":
             self.shutdown()
@@ -499,6 +540,7 @@ class Zonemgr:
         return answer
 
     def run(self):
+        self.running = True
         while not self._shutdown_event.is_set():
             self._module_cc.check_command(False)
 
@@ -538,6 +580,6 @@ if '__main__' == __name__:
     except isc.config.ModuleCCSessionError as e:
         sys.stderr.write("[b10-zonemgr] exit zonemgr process: %s\n" % str(e))
 
-    if zonemgrd:
+    if zonemgrd and zonemgrd.running:
         zonemgrd.shutdown()