#!@PYTHON@ # Copyright (C) 2010 Internet Systems Consortium. # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """\ This file implements the Secondary Manager program. The secondary manager is one of the co-operating processes of BIND10, which keeps track of timers and other information necessary for BIND10 to act as a slave. """ import sys; sys.path.append ('@@PYTHONPATH@@') import os import time import signal import isc import random import threading import select import socket import errno from isc.datasrc import sqlite3_ds from optparse import OptionParser, OptionValueError from isc.config.ccsession import * # If B10_FROM_BUILD is set in the environment, we use data files # from a directory relative to that, otherwise we use the ones # installed on the system if "B10_FROM_BUILD" in os.environ: SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/zonemgr" AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth" else: PREFIX = "@prefix@" DATAROOTDIR = "@datarootdir@" SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX) AUTH_SPECFILE_PATH = SPECFILE_PATH SPECFILE_LOCATION = SPECFILE_PATH + "/zonemgr.spec" AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + "/auth.spec" __version__ = "BIND10" # define module name XFRIN_MODULE_NAME = 'Xfrin' AUTH_MODULE_NAME = 'Auth' # define command name ZONE_XFRIN_FAILED_COMMAND = 'zone_xfrin_failed' ZONE_XFRIN_SUCCESS_COMMAND = 'zone_new_data_ready' ZONE_REFRESH_COMMAND = 'refresh_from_zonemgr' ZONE_NOTIFY_COMMAND = 'notify' # define zone state ZONE_OK = 0 ZONE_REFRESHING = 1 ZONE_EXPIRED = 2 # smallest refresh timeout LOWERBOUND_REFRESH = 10 # smallest retry timeout LOWERBOUND_RETRY = 5 # max zone transfer timeout MAX_TRANSFER_TIMEOUT = 14400 # offsets of fields in the SOA RDATA REFRESH_OFFSET = 3 RETRY_OFFSET = 4 EXPIRED_OFFSET = 5 # verbose mode VERBOSE_MODE = False def log_msg(msg): if VERBOSE_MODE: sys.stdout.write("[b10-zonemgr] %s\n" % str(msg)) class ZonemgrException(Exception): pass class ZonemgrRefresh: """This class will maintain and manage zone refresh info. It also provides methods to keep track of zone timers and do zone refresh. """ def __init__(self, cc, db_file, slave_socket): self._cc = cc self._socket = slave_socket self._db_file = db_file self._zonemgr_refresh_info = {} self._build_zonemgr_refresh_info() def _random_jitter(self, max, jitter): """Imposes some random jitters for refresh and retry timers to avoid many zones need to do refresh at the same time. The value should be between (max - jitter) and max. """ if 0 == jitter: return max return random.uniform(max - jitter, max) def _get_current_time(self): return time.time() def _set_zone_timer(self, zone_name_class, max, jitter): """Set zone next refresh time.""" self._set_zone_next_refresh_time(zone_name_class, self._get_current_time() + \ self._random_jitter(max, jitter)) def _set_zone_refresh_timer(self, zone_name_class): """Set zone next refresh time after zone refresh success. now + refresh*3/4 <= next_refresh_time <= now + refresh """ zone_refresh_time = float(self._get_zone_soa_rdata(zone_name_class).split(" ")[REFRESH_OFFSET]) zone_refresh_time = max(LOWERBOUND_REFRESH, zone_refresh_time) self._set_zone_timer(zone_name_class, zone_refresh_time, (1 * zone_refresh_time) / 4) def _set_zone_retry_timer(self, zone_name_class): """Set zone next refresh time after zone refresh fail. now + retry*3/4 <= next_refresh_time <= now + retry """ zone_retry_time = float(self._get_zone_soa_rdata(zone_name_class).split(" ")[RETRY_OFFSET]) zone_retry_time = max(LOWERBOUND_RETRY, zone_retry_time) self._set_zone_timer(zone_name_class, zone_retry_time, (1 * zone_retry_time) / 4) def _set_zone_notify_timer(self, zone_name_class): """Set zone next refresh time after receiving notify next_refresh_time = now """ self._set_zone_timer(zone_name_class, 0, 0) def _zone_not_exist(self, zone_name_class): """ Zone doesn't belong to zonemgr""" if zone_name_class in self._zonemgr_refresh_info.keys(): return False return True def zone_refresh_success(self, zone_name_class): """Update zone info after zone refresh success""" if (self._zone_not_exist(zone_name_class)): raise ZonemgrException("[b10-zonemgr] Zone (%s, %s) doesn't " "belong to zonemgr" % zone_name_class) return self.zonemgr_reload_zone(zone_name_class) self._set_zone_refresh_timer(zone_name_class) self._set_zone_state(zone_name_class, ZONE_OK) self._set_zone_last_refresh_time(zone_name_class, self._get_current_time()) def zone_refresh_fail(self, zone_name_class): """Update zone info after zone refresh fail""" if (self._zone_not_exist(zone_name_class)): raise ZonemgrException("[b10-zonemgr] Zone (%s, %s) doesn't " "belong to zonemgr" % zone_name_class) return self._set_zone_state(zone_name_class, ZONE_OK) self._set_zone_retry_timer(zone_name_class) def zone_handle_notify(self, zone_name_class, master): """Handle zone notify""" if (self._zone_not_exist(zone_name_class)): raise ZonemgrException("[b10-zonemgr] Notified zone (%s, %s) " "doesn't belong to zonemgr" % zone_name_class) return self._set_zone_notifier_master(zone_name_class, master) self._set_zone_notify_timer(zone_name_class) def zonemgr_reload_zone(self, zone_name_class): """ Reload a zone.""" zone_soa = sqlite3_ds.get_zone_soa(str(zone_name_class[0]), self._db_file) self._zonemgr_refresh_info[zone_name_class]["zone_soa_rdata"] = zone_soa[7] def zonemgr_add_zone(self, zone_name_class): """ Add a zone into zone manager.""" zone_info = {} zone_soa = sqlite3_ds.get_zone_soa(str(zone_name_class[0]), self._db_file) if not zone_soa: raise ZonemgrException("[b10-zonemgr] zone (%s, %s) doesn't have soa." % zone_name_class) zone_info["zone_soa_rdata"] = zone_soa[7] zone_info["zone_state"] = ZONE_OK zone_info["last_refresh_time"] = self._get_current_time() zone_info["next_refresh_time"] = self._get_current_time() + \ float(zone_soa[7].split(" ")[REFRESH_OFFSET]) self._zonemgr_refresh_info[zone_name_class] = zone_info def _build_zonemgr_refresh_info(self): """ Build zonemgr refresh info map.""" log_msg("Start loading zone into zonemgr.") for zone_name, zone_class in sqlite3_ds.get_zones_info(self._db_file): zone_name_class = (zone_name, zone_class) self.zonemgr_add_zone(zone_name_class) log_msg("Finish loading zone into zonemgr.") def _zone_is_expired(self, zone_name_class): """Judge whether a zone is expired or not.""" zone_expired_time = float(self._get_zone_soa_rdata(zone_name_class).split(" ")[EXPIRED_OFFSET]) zone_last_refresh_time = self._get_zone_last_refresh_time(zone_name_class) if (ZONE_EXPIRED == self._get_zone_state(zone_name_class) or zone_last_refresh_time + zone_expired_time <= self._get_current_time()): return True return False def _get_zone_soa_rdata(self, zone_name_class): return self._zonemgr_refresh_info[zone_name_class]["zone_soa_rdata"] def _get_zone_last_refresh_time(self, zone_name_class): return self._zonemgr_refresh_info[zone_name_class]["last_refresh_time"] def _set_zone_last_refresh_time(self, zone_name_class, time): self._zonemgr_refresh_info[zone_name_class]["last_refresh_time"] = time def _get_zone_notifier_master(self, zone_name_class): if ("notify_master" in self._zonemgr_refresh_info[zone_name_class].keys()): return self._zonemgr_refresh_info[zone_name_class]["notify_master"] return None def _set_zone_notifier_master(self, zone_name_class, master_addr): self._zonemgr_refresh_info[zone_name_class]["notify_master"] = master_addr def _clear_zone_notifier_master(self, zone_name_class): if ("notify_master" in self._zonemgr_refresh_info[zone_name_class].keys()): del self._zonemgr_refresh_info[zone_name_class]["notify_master"] def _get_zone_state(self, zone_name_class): return self._zonemgr_refresh_info[zone_name_class]["zone_state"] def _set_zone_state(self, zone_name_class, zone_state): self._zonemgr_refresh_info[zone_name_class]["zone_state"] = zone_state def _get_zone_refresh_timeout(self, zone_name_class): return self._zonemgr_refresh_info[zone_name_class]["refresh_timeout"] def _set_zone_refresh_timeout(self, zone_name_class, time): self._zonemgr_refresh_info[zone_name_class]["refresh_timeout"] = time def _get_zone_next_refresh_time(self, zone_name_class): return self._zonemgr_refresh_info[zone_name_class]["next_refresh_time"] def _set_zone_next_refresh_time(self, zone_name_class, time): self._zonemgr_refresh_info[zone_name_class]["next_refresh_time"] = time def _send_command(self, module_name, command_name, params): """Send command between modules.""" msg = create_command(command_name, params) try: self._cc.group_sendmsg(msg, module_name) except socket.error: sys.stderr.write("[b10-zonemgr] Failed to send to module %s, the session has been closed." % module_name) def _find_need_do_refresh_zone(self): """Find the first zone need do refresh, if no zone need do refresh, return the zone with minimum next_refresh_time. """ zone_need_refresh = None for zone_name_class in self._zonemgr_refresh_info.keys(): # Does the zone expired? if (ZONE_EXPIRED != self._get_zone_state(zone_name_class) and self._zone_is_expired(zone_name_class)): log_msg("Zone (%s, %s) is expired." % zone_name_class) self._set_zone_state(zone_name_class, ZONE_EXPIRED) zone_state = self._get_zone_state(zone_name_class) # If zone is expired and doesn't receive notify, skip the zone if (ZONE_EXPIRED == zone_state and (not self._get_zone_notifier_master(zone_name_class))): continue # If hasn't received refresh response but are within refresh timeout, skip the zone if (ZONE_REFRESHING == zone_state and (self._get_zone_refresh_timeout(zone_name_class) > self._get_current_time())): continue # Get the zone with minimum next_refresh_time if ((None == zone_need_refresh) or (self._get_zone_next_refresh_time(zone_name_class) < self._get_zone_next_refresh_time(zone_need_refresh))): zone_need_refresh = zone_name_class # Find the zone need do refresh if (self._get_zone_next_refresh_time(zone_need_refresh) < self._get_current_time()): break return zone_need_refresh def _do_refresh(self, zone_name_class): """Do zone refresh.""" log_msg("Do refresh for zone (%s, %s)." % zone_name_class) self._set_zone_state(zone_name_class, ZONE_REFRESHING) self._set_zone_refresh_timeout(zone_name_class, self._get_current_time() + MAX_TRANSFER_TIMEOUT) notify_master = self._get_zone_notifier_master(zone_name_class) # If the zone has notify master, send notify command to xfrin module if notify_master: param = {"zone_name" : zone_name_class[0], "zone_class" : zone_name_class[1], "master" : notify_master } self._send_command(XFRIN_MODULE_NAME, ZONE_NOTIFY_COMMAND, param) self._clear_zone_notifier_master(zone_name_class) # Send refresh command to xfrin module else: param = {"zone_name" : zone_name_class[0], "zone_class" : zone_name_class[1] } self._send_command(XFRIN_MODULE_NAME, ZONE_REFRESH_COMMAND, param) def _zone_mgr_is_empty(self): """Does zone manager has no zone?""" if not len(self._zonemgr_refresh_info): return True return False def run_timer(self): """Keep track of zone timers.""" while True: # Zonemgr has no zone. if self._zone_mgr_is_empty(): time.sleep(LOWERBOUND_RETRY) # A better time? continue zone_need_refresh = self._find_need_do_refresh_zone() # If don't get zone with minimum next refresh time, set timer timeout = LOWERBOUND_REFRESH if not zone_need_refresh: timeout = LOWERBOUND_RETRY else: timeout = self._get_zone_next_refresh_time(zone_need_refresh) - self._get_current_time() if (timeout < 0): self._do_refresh(zone_need_refresh) continue """ Wait for the socket notification for a maximum time of timeout in seconds (as float).""" try: (rlist, wlist, xlist) = select.select([self._socket], [], [], timeout) if rlist: self._socket.recv(32) except ValueError as e: raise ZonemgrException("[b10-zonemgr] Socket has been closed\n") break except select.error as e: if e.args[0] == errno.EINTR: (rlist, wlist, xlist) = ([], [], []) else: raise ZonemgrException("[b10-zonemgr] Error with select(): %s\n" % e) break class Zonemgr: """Zone manager class.""" def __init__(self): self._setup_session() self._db_file = self.get_db_file() # Create socket pair for communicating between main thread and zonemgr timer thread self._master_socket, self._slave_socket = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) self._zone_refresh= ZonemgrRefresh(self._cc, self._db_file, self._slave_socket) self._start_zone_refresh_timer() self._lock = threading.Lock() self._shutdown_event = threading.Event() def _start_zone_refresh_timer(self): """Start a new thread to keep track of zone timers""" listener = threading.Thread(target = self._zone_refresh.run_timer, args = ()) listener.setDaemon(True) listener.start() def _setup_session(self): """Setup two sessions for zonemgr, one(self._module_cc) is used for receiving commands and config data sent from other modules, another one (self._cc) is used to send commands to proper modules.""" self._cc = isc.cc.Session() self._module_cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler) self._module_cc.add_remote_config(AUTH_SPECFILE_LOCATION) self._config_data = self._module_cc.get_full_config() self._module_cc.start() def get_db_file(self): db_file, is_default = self._module_cc.get_remote_config_value(AUTH_MODULE_NAME, "database_file") # this too should be unnecessary, but currently the # 'from build' override isn't stored in the config # (and we don't have indirect python access to datasources yet) if is_default and "B10_FROM_BUILD" in os.environ: db_file = os.environ["B10_FROM_BUILD"] + "/bind10_zones.sqlite3" return db_file def shutdown(self): """Shutdown the zonemgr process. the thread which is keeping track of zone timers should be terminated. """ self._slave_socket.close() self._master_socket.close() self._shutdown_event.set() main_thread = threading.currentThread() for th in threading.enumerate(): if th is main_thread: continue th.join() def config_handler(self, new_config): """Update config data.""" answer = create_answer(0) for key in new_config: if key not in self._config_data: answer = create_answer(1, "Unknown config data: " + str(key)) continue self._config_data[key] = new_config[key] return answer def _parse_cmd_params(self, args, command): zone_name = args.get("zone_name") if not zone_name: raise ZonemgrException("zone name should be provided") zone_class = args.get("zone_class") if not zone_class: raise ZonemgrException("zone class should be provided") if (command != ZONE_NOTIFY_COMMAND): return (zone_name, zone_class) master_str = args.get("master") if not master_str: raise ZonemgrException("master address should be provided") return ((zone_name, zone_class), master_str) def command_handler(self, command, args): """Handle command receivd from command channel. ZONE_NOTIFY_COMMAND is issued by Auth process; ZONE_XFRIN_SUCCESS_COMMAND and ZONE_XFRIN_FAILED_COMMAND are issued by Xfrin process; shutdown is issued by a user or Boss process. """ answer = create_answer(0) if command == ZONE_NOTIFY_COMMAND: """ Handle Auth notify command""" # master is the source sender of the notify message. zone_name_class, master = self._parse_cmd_params(args, command) log_msg("Received notify command for zone (%s, %s)." % zone_name_class) with self._lock: self._zone_refresh.zone_handle_notify(zone_name_class, master) # Send notification to zonemgr timer thread self._master_socket.send(b" ") elif command == ZONE_XFRIN_SUCCESS_COMMAND: """ Handle xfrin success command""" zone_name_class = self._parse_cmd_params(args, command) with self._lock: self._zone_refresh.zone_refresh_success(zone_name_class) self._master_socket.send(b" ") elif command == ZONE_XFRIN_FAILED_COMMAND: """ Handle xfrin fail command""" zone_name_class = self._parse_cmd_params(args, command) with self._lock: self._zone_refresh.zone_refresh_fail(zone_name_class) self._master_socket.send(b" ") elif command == "shutdown": self.shutdown() else: answer = create_answer(1, "Unknown command:" + str(command)) return answer def run(self): while not self._shutdown_event.is_set(): self._module_cc.check_command() zonemgrd = None def signal_handler(signal, frame): if zonemgrd: zonemgrd.shutdown() sys.exit(0) def set_signal_handler(): signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) def set_cmd_options(parser): parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="display more about what is going on") if '__main__' == __name__: try: parser = OptionParser() set_cmd_options(parser) (options, args) = parser.parse_args() VERBOSE_MODE = options.verbose set_signal_handler() zonemgrd = Zonemgr() zonemgrd.run() except KeyboardInterrupt: sys.stderr.write("[b10-zonemgr] exit zonemgr process\n") except isc.cc.session.SessionError as e: sys.stderr.write("[b10-zonemgr] Error creating zonemgr, " "is the command channel daemon running?\n") except isc.cc.session.SessionTimeout as e: sys.stderr.write("[b10-zonemgr] Error creating zonemgr, " "is the configuration manager running?\n") except isc.config.ModuleCCSessionError as e: sys.stderr.write("[b10-zonemgr] exit zonemgr process: %s\n" % str(e)) if zonemgrd: zonemgrd.shutdown()