|
@@ -94,12 +94,18 @@ class ZonemgrException(Exception):
|
|
|
class ZonemgrRefresh:
|
|
|
"""This class will maintain and manage zone refresh info.
|
|
|
It also provides methods to keep track of zone timers and
|
|
|
- do zone refresh.
|
|
|
+ do zone refresh.
|
|
|
+ Zone timers can be started by calling run_timer(), and it
|
|
|
+ can be stopped by calling shutdown() in another thread.
|
|
|
+
|
|
|
"""
|
|
|
|
|
|
def __init__(self, cc, db_file, slave_socket):
|
|
|
self._cc = cc
|
|
|
- self._socket = slave_socket
|
|
|
+ self._check_sock = slave_socket
|
|
|
+ self._runnable = False
|
|
|
+ self._is_shut_down = threading.Event()
|
|
|
+ self._read_sock, self._write_sock = socket.socketpair()
|
|
|
self._db_file = db_file
|
|
|
self._zonemgr_refresh_info = {}
|
|
|
self._build_zonemgr_refresh_info()
|
|
@@ -328,8 +334,12 @@ class ZonemgrRefresh:
|
|
|
return False
|
|
|
|
|
|
def run_timer(self):
|
|
|
- """Keep track of zone timers."""
|
|
|
- while True:
|
|
|
+ """Keep track of zone timers. The loop can be stopped by calling shutdown() in
|
|
|
+ another thread.
|
|
|
+ """
|
|
|
+ self._runnable = True
|
|
|
+ self._is_shut_down.clear()
|
|
|
+ while self._runnable:
|
|
|
# Zonemgr has no zone.
|
|
|
if self._zone_mgr_is_empty():
|
|
|
time.sleep(LOWERBOUND_RETRY) # A better time?
|
|
@@ -348,19 +358,29 @@ class ZonemgrRefresh:
|
|
|
""" Wait for the socket notification for a maximum time of timeout
|
|
|
in seconds (as float)."""
|
|
|
try:
|
|
|
- (rlist, wlist, xlist) = select.select([self._socket], [], [], timeout)
|
|
|
- if rlist:
|
|
|
- self._socket.recv(32)
|
|
|
- except ValueError as e:
|
|
|
- raise ZonemgrException("[b10-zonemgr] Socket has been closed\n")
|
|
|
- break
|
|
|
+ rlist, wlist, xlist = select.select([self._check_sock, self._read_sock], [], [], timeout)
|
|
|
except select.error as e:
|
|
|
if e.args[0] == errno.EINTR:
|
|
|
(rlist, wlist, xlist) = ([], [], [])
|
|
|
else:
|
|
|
- raise ZonemgrException("[b10-zonemgr] Error with select(): %s\n" % e)
|
|
|
+ sys.stderr.write("[b10-zonemgr] Error with select(); %s\n" % e)
|
|
|
break
|
|
|
|
|
|
+ if not rlist: # timer timeout
|
|
|
+ continue
|
|
|
+ if self._read_sock in rlist: # awaken by shutdown socket
|
|
|
+ break
|
|
|
+ if self._check_sock in rlist: # awaken by check socket
|
|
|
+ self._check_sock.recv(5)
|
|
|
+
|
|
|
+ self._is_shut_down.set()
|
|
|
+
|
|
|
+ def shutdown(self):
|
|
|
+ """Stop the run_timer() loop. Block until the loop has finished. This must be
|
|
|
+ called when run_timer() is running in another thread, or it will deadlock."""
|
|
|
+ self._runnable = False
|
|
|
+ self._write_sock.send(b'shutdown') # make self._read_sock readble
|
|
|
+ self._is_shut_down.wait()
|
|
|
|
|
|
class Zonemgr:
|
|
|
"""Zone manager class."""
|
|
@@ -378,7 +398,6 @@ class Zonemgr:
|
|
|
def _start_zone_refresh_timer(self):
|
|
|
"""Start a new thread to keep track of zone timers"""
|
|
|
listener = threading.Thread(target = self._zone_refresh.run_timer, args = ())
|
|
|
- listener.setDaemon(True)
|
|
|
listener.start()
|
|
|
|
|
|
def _setup_session(self):
|
|
@@ -406,12 +425,14 @@ class Zonemgr:
|
|
|
"""Shutdown the zonemgr process. the thread which is keeping track of zone
|
|
|
timers should be terminated.
|
|
|
"""
|
|
|
+ self._zone_refresh.shutdown()
|
|
|
+
|
|
|
self._slave_socket.close()
|
|
|
self._master_socket.close()
|
|
|
-
|
|
|
self._shutdown_event.set()
|
|
|
main_thread = threading.currentThread()
|
|
|
for th in threading.enumerate():
|
|
|
+ # Stop the thread which is running zone refresh timer
|
|
|
if th is main_thread:
|
|
|
continue
|
|
|
th.join()
|
|
@@ -459,21 +480,21 @@ class Zonemgr:
|
|
|
with self._lock:
|
|
|
self._zone_refresh.zone_handle_notify(zone_name_class, master)
|
|
|
# Send notification to zonemgr timer thread
|
|
|
- self._master_socket.send(b" ")
|
|
|
+ self._master_socket.send(b" ")# make self._slave_socket readble
|
|
|
|
|
|
elif command == ZONE_XFRIN_SUCCESS_COMMAND:
|
|
|
""" Handle xfrin success command"""
|
|
|
zone_name_class = self._parse_cmd_params(args, command)
|
|
|
with self._lock:
|
|
|
self._zone_refresh.zone_refresh_success(zone_name_class)
|
|
|
- self._master_socket.send(b" ")
|
|
|
+ self._master_socket.send(b" ")# make self._slave_socket readble
|
|
|
|
|
|
elif command == ZONE_XFRIN_FAILED_COMMAND:
|
|
|
""" Handle xfrin fail command"""
|
|
|
zone_name_class = self._parse_cmd_params(args, command)
|
|
|
with self._lock:
|
|
|
self._zone_refresh.zone_refresh_fail(zone_name_class)
|
|
|
- self._master_socket.send(b" ")
|
|
|
+ self._master_socket.send(b" ")# make self._slave_socket readble
|
|
|
|
|
|
elif command == "shutdown":
|
|
|
self.shutdown()
|