#!@PYTHON@ # Copyright (C) 2010 Internet Systems Consortium. # Copyright (C) 2010 CZ NIC # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """ This file implements the Secondary Manager program. The secondary manager is one of the co-operating processes of BIND10, which keeps track of timers and other information necessary for BIND10 to act as a slave. """ import sys; sys.path.append ('@@PYTHONPATH@@') import os import time import signal import isc import random import threading import select import socket import errno from isc.datasrc import sqlite3_ds from optparse import OptionParser, OptionValueError from isc.config.ccsession import * import isc.util.process isc.util.process.rename() # If B10_FROM_BUILD is set in the environment, we use data files # from a directory relative to that, otherwise we use the ones # installed on the system if "B10_FROM_BUILD" in os.environ: SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/zonemgr" AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth" else: PREFIX = "@prefix@" DATAROOTDIR = "@datarootdir@" SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX) AUTH_SPECFILE_PATH = SPECFILE_PATH SPECFILE_LOCATION = SPECFILE_PATH + "/zonemgr.spec" AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + "/auth.spec" __version__ = "BIND10" # define module name XFRIN_MODULE_NAME = 'Xfrin' AUTH_MODULE_NAME = 'Auth' # define command name ZONE_XFRIN_FAILED_COMMAND = 'zone_xfrin_failed' ZONE_XFRIN_SUCCESS_COMMAND = 'zone_new_data_ready' ZONE_REFRESH_COMMAND = 'refresh_from_zonemgr' ZONE_NOTIFY_COMMAND = 'notify' # define zone state ZONE_OK = 0 ZONE_REFRESHING = 1 ZONE_EXPIRED = 2 # offsets of fields in the SOA RDATA REFRESH_OFFSET = 3 RETRY_OFFSET = 4 EXPIRED_OFFSET = 5 # verbose mode VERBOSE_MODE = False def log_msg(msg): if VERBOSE_MODE: sys.stdout.write("[b10-zonemgr] %s\n" % str(msg)) class ZonemgrException(Exception): pass class ZonemgrRefresh: """This class will maintain and manage zone refresh info. It also provides methods to keep track of zone timers and do zone refresh. Zone timers can be started by calling run_timer(), and it can be stopped by calling shutdown() in another thread. """ def __init__(self, cc, db_file, slave_socket, config_data): self._cc = cc self._check_sock = slave_socket self._db_file = db_file self.update_config_data(config_data) self._zonemgr_refresh_info = {} self._build_zonemgr_refresh_info() self._running = False def _random_jitter(self, max, jitter): """Imposes some random jitters for refresh and retry timers to avoid many zones need to do refresh at the same time. The value should be between (max - jitter) and max. """ if 0 == jitter: return max return random.uniform(max - jitter, max) def _get_current_time(self): return time.time() def _set_zone_timer(self, zone_name_class, max, jitter): """Set zone next refresh time. jitter should not be bigger than half the original value.""" self._set_zone_next_refresh_time(zone_name_class, self._get_current_time() + \ self._random_jitter(max, jitter)) def _set_zone_refresh_timer(self, zone_name_class): """Set zone next refresh time after zone refresh success. now + refresh - jitter <= next_refresh_time <= now + refresh """ zone_refresh_time = float(self._get_zone_soa_rdata(zone_name_class).split(" ")[REFRESH_OFFSET]) zone_refresh_time = max(self._lowerbound_refresh, zone_refresh_time) self._set_zone_timer(zone_name_class, zone_refresh_time, self._jitter_scope * zone_refresh_time) def _set_zone_retry_timer(self, zone_name_class): """Set zone next refresh time after zone refresh fail. now + retry - jitter <= next_refresh_time <= now + retry """ zone_retry_time = float(self._get_zone_soa_rdata(zone_name_class).split(" ")[RETRY_OFFSET]) zone_retry_time = max(self._lowerbound_retry, zone_retry_time) self._set_zone_timer(zone_name_class, zone_retry_time, self._jitter_scope * zone_retry_time) def _set_zone_notify_timer(self, zone_name_class): """Set zone next refresh time after receiving notify next_refresh_time = now """ self._set_zone_timer(zone_name_class, 0, 0) def _zone_not_exist(self, zone_name_class): """ Zone doesn't belong to zonemgr""" if zone_name_class in self._zonemgr_refresh_info.keys(): return False return True def zone_refresh_success(self, zone_name_class): """Update zone info after zone refresh success""" if (self._zone_not_exist(zone_name_class)): raise ZonemgrException("[b10-zonemgr] Zone (%s, %s) doesn't " "belong to zonemgr" % zone_name_class) return self.zonemgr_reload_zone(zone_name_class) self._set_zone_refresh_timer(zone_name_class) self._set_zone_state(zone_name_class, ZONE_OK) self._set_zone_last_refresh_time(zone_name_class, self._get_current_time()) def zone_refresh_fail(self, zone_name_class): """Update zone info after zone refresh fail""" if (self._zone_not_exist(zone_name_class)): raise ZonemgrException("[b10-zonemgr] Zone (%s, %s) doesn't " "belong to zonemgr" % zone_name_class) return # Is zone expired? if (self._zone_is_expired(zone_name_class)): self._set_zone_state(zone_name_class, ZONE_EXPIRED) else: self._set_zone_state(zone_name_class, ZONE_OK) self._set_zone_retry_timer(zone_name_class) def zone_handle_notify(self, zone_name_class, master): """Handle zone notify""" if (self._zone_not_exist(zone_name_class)): raise ZonemgrException("[b10-zonemgr] Notified zone (%s, %s) " "doesn't belong to zonemgr" % zone_name_class) return self._set_zone_notifier_master(zone_name_class, master) self._set_zone_notify_timer(zone_name_class) def zonemgr_reload_zone(self, zone_name_class): """ Reload a zone.""" zone_soa = sqlite3_ds.get_zone_soa(str(zone_name_class[0]), self._db_file) self._zonemgr_refresh_info[zone_name_class]["zone_soa_rdata"] = zone_soa[7] def zonemgr_add_zone(self, zone_name_class): """ Add a zone into zone manager.""" zone_info = {} zone_soa = sqlite3_ds.get_zone_soa(str(zone_name_class[0]), self._db_file) if not zone_soa: raise ZonemgrException("[b10-zonemgr] zone (%s, %s) doesn't have soa." % zone_name_class) zone_info["zone_soa_rdata"] = zone_soa[7] zone_info["zone_state"] = ZONE_OK zone_info["last_refresh_time"] = self._get_current_time() zone_info["next_refresh_time"] = self._get_current_time() + \ float(zone_soa[7].split(" ")[REFRESH_OFFSET]) self._zonemgr_refresh_info[zone_name_class] = zone_info def _build_zonemgr_refresh_info(self): """ Build zonemgr refresh info map.""" log_msg("Start loading zone into zonemgr.") for zone_name, zone_class in sqlite3_ds.get_zones_info(self._db_file): zone_name_class = (zone_name, zone_class) self.zonemgr_add_zone(zone_name_class) log_msg("Finish loading zone into zonemgr.") def _zone_is_expired(self, zone_name_class): """Judge whether a zone is expired or not.""" zone_expired_time = float(self._get_zone_soa_rdata(zone_name_class).split(" ")[EXPIRED_OFFSET]) zone_last_refresh_time = self._get_zone_last_refresh_time(zone_name_class) if (ZONE_EXPIRED == self._get_zone_state(zone_name_class) or zone_last_refresh_time + zone_expired_time <= self._get_current_time()): return True return False def _get_zone_soa_rdata(self, zone_name_class): return self._zonemgr_refresh_info[zone_name_class]["zone_soa_rdata"] def _get_zone_last_refresh_time(self, zone_name_class): return self._zonemgr_refresh_info[zone_name_class]["last_refresh_time"] def _set_zone_last_refresh_time(self, zone_name_class, time): self._zonemgr_refresh_info[zone_name_class]["last_refresh_time"] = time def _get_zone_notifier_master(self, zone_name_class): if ("notify_master" in self._zonemgr_refresh_info[zone_name_class].keys()): return self._zonemgr_refresh_info[zone_name_class]["notify_master"] return None def _set_zone_notifier_master(self, zone_name_class, master_addr): self._zonemgr_refresh_info[zone_name_class]["notify_master"] = master_addr def _clear_zone_notifier_master(self, zone_name_class): if ("notify_master" in self._zonemgr_refresh_info[zone_name_class].keys()): del self._zonemgr_refresh_info[zone_name_class]["notify_master"] def _get_zone_state(self, zone_name_class): return self._zonemgr_refresh_info[zone_name_class]["zone_state"] def _set_zone_state(self, zone_name_class, zone_state): self._zonemgr_refresh_info[zone_name_class]["zone_state"] = zone_state def _get_zone_refresh_timeout(self, zone_name_class): return self._zonemgr_refresh_info[zone_name_class]["refresh_timeout"] def _set_zone_refresh_timeout(self, zone_name_class, time): self._zonemgr_refresh_info[zone_name_class]["refresh_timeout"] = time def _get_zone_next_refresh_time(self, zone_name_class): return self._zonemgr_refresh_info[zone_name_class]["next_refresh_time"] def _set_zone_next_refresh_time(self, zone_name_class, time): self._zonemgr_refresh_info[zone_name_class]["next_refresh_time"] = time def _send_command(self, module_name, command_name, params): """Send command between modules.""" msg = create_command(command_name, params) try: seq = self._cc.group_sendmsg(msg, module_name) try: answer, env = self._cc.group_recvmsg(False, seq) except isc.cc.session.SessionTimeout: pass # for now we just ignore the failure except socket.error: sys.stderr.write("[b10-zonemgr] Failed to send to module %s, the session has been closed." % module_name) def _find_need_do_refresh_zone(self): """Find the first zone need do refresh, if no zone need do refresh, return the zone with minimum next_refresh_time. """ zone_need_refresh = None for zone_name_class in self._zonemgr_refresh_info.keys(): zone_state = self._get_zone_state(zone_name_class) # If hasn't received refresh response but are within refresh timeout, skip the zone if (ZONE_REFRESHING == zone_state and (self._get_zone_refresh_timeout(zone_name_class) > self._get_current_time())): continue # Get the zone with minimum next_refresh_time if ((zone_need_refresh is None) or (self._get_zone_next_refresh_time(zone_name_class) < self._get_zone_next_refresh_time(zone_need_refresh))): zone_need_refresh = zone_name_class # Find the zone need do refresh if (self._get_zone_next_refresh_time(zone_need_refresh) < self._get_current_time()): break return zone_need_refresh def _do_refresh(self, zone_name_class): """Do zone refresh.""" log_msg("Do refresh for zone (%s, %s)." % zone_name_class) self._set_zone_state(zone_name_class, ZONE_REFRESHING) self._set_zone_refresh_timeout(zone_name_class, self._get_current_time() + self._max_transfer_timeout) notify_master = self._get_zone_notifier_master(zone_name_class) # If the zone has notify master, send notify command to xfrin module if notify_master: param = {"zone_name" : zone_name_class[0], "zone_class" : zone_name_class[1], "master" : notify_master } self._send_command(XFRIN_MODULE_NAME, ZONE_NOTIFY_COMMAND, param) self._clear_zone_notifier_master(zone_name_class) # Send refresh command to xfrin module else: param = {"zone_name" : zone_name_class[0], "zone_class" : zone_name_class[1] } self._send_command(XFRIN_MODULE_NAME, ZONE_REFRESH_COMMAND, param) def _zone_mgr_is_empty(self): """Does zone manager has no zone?""" if not len(self._zonemgr_refresh_info): return True return False def _run_timer(self, start_event): while self._running: # Notify run_timer that we already started and are inside the loop. # It is set only once, but when it was outside the loop, there was # a race condition and _running could be set to false before we # could enter it if start_event: start_event.set() start_event = None # If zonemgr has no zone, set timer timeout to self._lowerbound_retry. if self._zone_mgr_is_empty(): timeout = self._lowerbound_retry else: zone_need_refresh = self._find_need_do_refresh_zone() # If don't get zone with minimum next refresh time, set timer timeout to self._lowerbound_retry. if not zone_need_refresh: timeout = self._lowerbound_retry else: timeout = self._get_zone_next_refresh_time(zone_need_refresh) - self._get_current_time() if (timeout < 0): self._do_refresh(zone_need_refresh) continue """ Wait for the socket notification for a maximum time of timeout in seconds (as float).""" try: rlist, wlist, xlist = select.select([self._check_sock, self._read_sock], [], [], timeout) except select.error as e: if e.args[0] == errno.EINTR: (rlist, wlist, xlist) = ([], [], []) else: sys.stderr.write("[b10-zonemgr] Error with select(); %s\n" % e) break for fd in rlist: if fd == self._read_sock: # awaken by shutdown socket # self._running will be False by now, if it is not a false # alarm (linux kernel is said to trigger spurious wakeup # on a filehandle that is not really readable). continue if fd == self._check_sock: # awaken by check socket self._check_sock.recv(32) def run_timer(self, daemon=False): """ Keep track of zone timers. Spawns and starts a thread. The thread object is returned. You can stop it by calling shutdown(). """ # Small sanity check if self._running: raise RuntimeError("Trying to run the timers twice at the same time") # Prepare the launch self._running = True (self._read_sock, self._write_sock) = socket.socketpair() start_event = threading.Event() # Start the thread self._thread = threading.Thread(target = self._run_timer, args = (start_event,)) if daemon: self._thread.setDaemon(True) self._thread.start() start_event.wait() # Return the thread to anyone interested return self._thread def shutdown(self): """ Stop the run_timer() thread. Block until it finished. This must be called from a different thread. """ if not self._running: raise RuntimeError("Trying to shutdown, but not running") # Ask the thread to stop self._running = False self._write_sock.send(b'shutdown') # make self._read_sock readble # Wait for it to actually finnish self._thread.join() # Wipe out what we do not need self._thread = None self._read_sock = None self._write_sock = None def update_config_data(self, new_config): """ update ZonemgrRefresh config """ self._lowerbound_refresh = new_config.get('lowerbound_refresh') self._lowerbound_retry = new_config.get('lowerbound_retry') self._max_transfer_timeout = new_config.get('max_transfer_timeout') self._jitter_scope = new_config.get('jitter_scope') class Zonemgr: """Zone manager class.""" def __init__(self): self._zone_refresh = None self._setup_session() self._db_file = self.get_db_file() # Create socket pair for communicating between main thread and zonemgr timer thread self._master_socket, self._slave_socket = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) self._zone_refresh = ZonemgrRefresh(self._cc, self._db_file, self._slave_socket, self._config_data) self._zone_refresh.run_timer() self._lock = threading.Lock() self._shutdown_event = threading.Event() self.running = False def _setup_session(self): """Setup two sessions for zonemgr, one(self._module_cc) is used for receiving commands and config data sent from other modules, another one (self._cc) is used to send commands to proper modules.""" self._cc = isc.cc.Session() self._module_cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler) self._module_cc.add_remote_config(AUTH_SPECFILE_LOCATION) self._config_data = self._module_cc.get_full_config() self._config_data_check(self._config_data) self._module_cc.start() def get_db_file(self): db_file, is_default = self._module_cc.get_remote_config_value(AUTH_MODULE_NAME, "database_file") # this too should be unnecessary, but currently the # 'from build' override isn't stored in the config # (and we don't have indirect python access to datasources yet) if is_default and "B10_FROM_BUILD" in os.environ: db_file = os.environ["B10_FROM_BUILD"] + "/bind10_zones.sqlite3" return db_file def shutdown(self): """Shutdown the zonemgr process. the thread which is keeping track of zone timers should be terminated. """ self._zone_refresh.shutdown() self._slave_socket.close() self._master_socket.close() self._shutdown_event.set() self.running = False def config_handler(self, new_config): """ Update config data. """ answer = create_answer(0) for key in new_config: if key not in self._config_data: answer = create_answer(1, "Unknown config data: " + str(key)) continue self._config_data[key] = new_config[key] self._config_data_check(self._config_data) if (self._zone_refresh): self._zone_refresh.update_config_data(self._config_data) return answer def _config_data_check(self, config_data): """Check whether the new config data is valid or not. """ # jitter should not be bigger than half of the original value if config_data.get('jitter_scope') > 0.5: config_data['jitter_scope'] = 0.5 log_msg("[b10-zonemgr] jitter_scope is too big, its value will " "be set to 0.5") def _parse_cmd_params(self, args, command): zone_name = args.get("zone_name") if not zone_name: raise ZonemgrException("zone name should be provided") zone_class = args.get("zone_class") if not zone_class: raise ZonemgrException("zone class should be provided") if (command != ZONE_NOTIFY_COMMAND): return (zone_name, zone_class) master_str = args.get("master") if not master_str: raise ZonemgrException("master address should be provided") return ((zone_name, zone_class), master_str) def command_handler(self, command, args): """Handle command receivd from command channel. ZONE_NOTIFY_COMMAND is issued by Auth process; ZONE_XFRIN_SUCCESS_COMMAND and ZONE_XFRIN_FAILED_COMMAND are issued by Xfrin process; shutdown is issued by a user or Boss process. """ answer = create_answer(0) if command == ZONE_NOTIFY_COMMAND: """ Handle Auth notify command""" # master is the source sender of the notify message. zone_name_class, master = self._parse_cmd_params(args, command) log_msg("Received notify command for zone (%s, %s)." % zone_name_class) with self._lock: self._zone_refresh.zone_handle_notify(zone_name_class, master) # Send notification to zonemgr timer thread self._master_socket.send(b" ")# make self._slave_socket readble elif command == ZONE_XFRIN_SUCCESS_COMMAND: """ Handle xfrin success command""" zone_name_class = self._parse_cmd_params(args, command) with self._lock: self._zone_refresh.zone_refresh_success(zone_name_class) self._master_socket.send(b" ")# make self._slave_socket readble elif command == ZONE_XFRIN_FAILED_COMMAND: """ Handle xfrin fail command""" zone_name_class = self._parse_cmd_params(args, command) with self._lock: self._zone_refresh.zone_refresh_fail(zone_name_class) self._master_socket.send(b" ")# make self._slave_socket readble elif command == "shutdown": self.shutdown() else: answer = create_answer(1, "Unknown command:" + str(command)) return answer def run(self): self.running = True while not self._shutdown_event.is_set(): self._module_cc.check_command(False) zonemgrd = None def signal_handler(signal, frame): if zonemgrd: zonemgrd.shutdown() sys.exit(0) def set_signal_handler(): signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) def set_cmd_options(parser): parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="display more about what is going on") if '__main__' == __name__: try: parser = OptionParser() set_cmd_options(parser) (options, args) = parser.parse_args() VERBOSE_MODE = options.verbose set_signal_handler() zonemgrd = Zonemgr() zonemgrd.run() except KeyboardInterrupt: sys.stderr.write("[b10-zonemgr] exit zonemgr process\n") except isc.cc.session.SessionError as e: sys.stderr.write("[b10-zonemgr] Error creating zonemgr, " "is the command channel daemon running?\n") except isc.cc.session.SessionTimeout as e: sys.stderr.write("[b10-zonemgr] Error creating zonemgr, " "is the configuration manager running?\n") except isc.config.ModuleCCSessionError as e: sys.stderr.write("[b10-zonemgr] exit zonemgr process: %s\n" % str(e)) if zonemgrd and zonemgrd.running: zonemgrd.shutdown()