#!@PYTHON@ # Copyright (C) 2010 Internet Systems Consortium. # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. import sys; sys.path.append ('@@PYTHONPATH@@') import isc import isc.cc import threading import struct import signal from isc.datasrc import sqlite3_ds from socketserver import * import os from isc.config.ccsession import * from isc.log.log import * from isc.cc import SessionError, SessionTimeout from isc.notify import notify_out import isc.utils.process import socket import select import errno from optparse import OptionParser, OptionValueError try: from libxfr_python import * from pydnspp import * except ImportError as e: # C++ loadable module may not be installed; even so the xfrout process # must keep running, so we warn about it and move forward. sys.stderr.write('[b10-xfrout] failed to import DNS or XFR module: %s\n' % str(e)) isc.utils.process.rename() if "B10_FROM_BUILD" in os.environ: SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/xfrout" AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth" UNIX_SOCKET_FILE= os.environ["B10_FROM_BUILD"] + "/auth_xfrout_conn" else: PREFIX = "@prefix@" DATAROOTDIR = "@datarootdir@" SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX) AUTH_SPECFILE_PATH = SPECFILE_PATH UNIX_SOCKET_FILE = "@@LOCALSTATEDIR@@/auth_xfrout_conn" SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec" AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + os.sep + "auth.spec" MAX_TRANSFERS_OUT = 10 VERBOSE_MODE = False XFROUT_MAX_MESSAGE_SIZE = 65535 def get_rrset_len(rrset): """Returns the wire length of the given RRset""" bytes = bytearray() rrset.to_wire(bytes) return len(bytes) class XfroutSession(BaseRequestHandler): def __init__(self, request, client_address, server, log): # The initializer for the superclass may call functions # that need _log to be set, so we set it first self._log = log BaseRequestHandler.__init__(self, request, client_address, server) def handle(self): fd = recv_fd(self.request.fileno()) if fd < 0: # This may happen when one xfrout process try to connect to # xfrout unix socket server, to check whether there is another # xfrout running. self._log.log_message("error", "Failed to receive the file descriptor for XFR connection") return data_len = self.request.recv(2) msg_len = struct.unpack('!H', data_len)[0] msgdata = self.request.recv(msg_len) sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) try: self.dns_xfrout_start(sock, msgdata) #TODO, avoid catching all exceptions except Exception as e: self._log.log_message("error", str(e)) try: sock.shutdown(socket.SHUT_RDWR) except socket.error: # Avoid socket error caused by shutting down # one non-connected socket. pass sock.close() os.close(fd) pass def _parse_query_message(self, mdata): ''' parse query message to [socket,message]''' #TODO, need to add parseHeader() in case the message header is invalid try: msg = Message(Message.PARSE) Message.from_wire(msg, mdata) except Exception as err: self._log.log_message("error", str(err)) return Rcode.FORMERR(), None return Rcode.NOERROR(), msg def _get_query_zone_name(self, msg): question = msg.get_question()[0] return question.get_name().to_text() def _send_data(self, sock, data): size = len(data) total_count = 0 while total_count < size: count = sock.send(data[total_count:]) total_count += count def _send_message(self, sock, msg): render = MessageRenderer() render.set_length_limit(XFROUT_MAX_MESSAGE_SIZE) msg.to_wire(render) header_len = struct.pack('H', socket.htons(render.get_length())) self._send_data(sock, header_len) self._send_data(sock, render.get_data()) def _reply_query_with_error_rcode(self, msg, sock, rcode_): msg.make_response() msg.set_rcode(rcode_) self._send_message(sock, msg) def _reply_query_with_format_error(self, msg, sock): '''query message format isn't legal.''' if not msg: return # query message is invalid. send nothing back. msg.make_response() msg.set_rcode(Rcode.FORMERR()) self._send_message(sock, msg) def _zone_is_empty(self, zone): if sqlite3_ds.get_zone_soa(zone, self.server.get_db_file()): return False return True def _zone_exist(self, zonename): # Find zone in datasource, should this works? maybe should ask # config manager. soa = sqlite3_ds.get_zone_soa(zonename, self.server.get_db_file()) if soa: return True return False def _check_xfrout_available(self, zone_name): '''Check if xfr request can be responsed. TODO, Get zone's configuration from cfgmgr or some other place eg. check allow_transfer setting, ''' if not self._zone_exist(zone_name): return Rcode.NOTAUTH() if self._zone_is_empty(zone_name): return Rcode.SERVFAIL() #TODO, check allow_transfer if not self.server.increase_transfers_counter(): return Rcode.REFUSED() return Rcode.NOERROR() def dns_xfrout_start(self, sock, msg_query): rcode_, msg = self._parse_query_message(msg_query) #TODO. create query message and parse header if rcode_ != Rcode.NOERROR(): return self._reply_query_with_format_error(msg, sock) zone_name = self._get_query_zone_name(msg) rcode_ = self._check_xfrout_available(zone_name) if rcode_ != Rcode.NOERROR(): self._log.log_message("info", "transfer of '%s/IN' failed: %s", zone_name, rcode_.to_text()) return self. _reply_query_with_error_rcode(msg, sock, rcode_) try: self._log.log_message("info", "transfer of '%s/IN': AXFR started" % zone_name) self._reply_xfrout_query(msg, sock, zone_name) self._log.log_message("info", "transfer of '%s/IN': AXFR end" % zone_name) except Exception as err: self._log.log_message("error", str(err)) self.server.decrease_transfers_counter() return def _clear_message(self, msg): qid = msg.get_qid() opcode = msg.get_opcode() rcode = msg.get_rcode() msg.clear(Message.RENDER) msg.set_qid(qid) msg.set_opcode(opcode) msg.set_rcode(rcode) msg.set_header_flag(MessageFlag.AA()) msg.set_header_flag(MessageFlag.QR()) return msg def _create_rrset_from_db_record(self, record): '''Create one rrset from one record of datasource, if the schema of record is changed, This function should be updated first. ''' rrtype_ = RRType(record[5]) rdata_ = Rdata(rrtype_, RRClass("IN"), " ".join(record[7:])) rrset_ = RRset(Name(record[2]), RRClass("IN"), rrtype_, RRTTL( int(record[4]))) rrset_.add_rdata(rdata_) return rrset_ def _send_message_with_last_soa(self, msg, sock, rrset_soa, message_upper_len): '''Add the SOA record to the end of message. If it can't be added, a new message should be created to send out the last soa . ''' rrset_len = get_rrset_len(rrset_soa) if message_upper_len + rrset_len < XFROUT_MAX_MESSAGE_SIZE: msg.add_rrset(Section.ANSWER(), rrset_soa) else: self._send_message(sock, msg) msg = self._clear_message(msg) msg.add_rrset(Section.ANSWER(), rrset_soa) self._send_message(sock, msg) def _reply_xfrout_query(self, msg, sock, zone_name): #TODO, there should be a better way to insert rrset. msg.make_response() msg.set_header_flag(MessageFlag.AA()) soa_record = sqlite3_ds.get_zone_soa(zone_name, self.server.get_db_file()) rrset_soa = self._create_rrset_from_db_record(soa_record) msg.add_rrset(Section.ANSWER(), rrset_soa) message_upper_len = get_rrset_len(rrset_soa) for rr_data in sqlite3_ds.get_zone_datas(zone_name, self.server.get_db_file()): if self.server._shutdown_event.is_set(): # Check if xfrout is shutdown self._log.log_message("error", "shutdown!") # TODO: RRType.SOA() ? if RRType(rr_data[5]) == RRType("SOA"): #ignore soa record continue rrset_ = self._create_rrset_from_db_record(rr_data) # We calculate the maximum size of the RRset (i.e. the # size without compression) and use that to see if we # may have reached the limit rrset_len = get_rrset_len(rrset_) if message_upper_len + rrset_len < XFROUT_MAX_MESSAGE_SIZE: msg.add_rrset(Section.ANSWER(), rrset_) message_upper_len += rrset_len continue self._send_message(sock, msg) msg = self._clear_message(msg) msg.add_rrset(Section.ANSWER(), rrset_) # Add the rrset to the new message message_upper_len = rrset_len self._send_message_with_last_soa(msg, sock, rrset_soa, message_upper_len) class UnixSockServer(ThreadingUnixStreamServer): '''The unix domain socket server which accept xfr query sent from auth server.''' def __init__(self, sock_file, handle_class, shutdown_event, config_data, cc, log): self._remove_unused_sock_file(sock_file) self._sock_file = sock_file ThreadingUnixStreamServer.__init__(self, sock_file, handle_class) self._lock = threading.Lock() self._transfers_counter = 0 self._shutdown_event = shutdown_event self._log = log self.update_config_data(config_data) self._cc = cc def finish_request(self, request, client_address): '''Finish one request by instantiating RequestHandlerClass.''' self.RequestHandlerClass(request, client_address, self, self._log) def _remove_unused_sock_file(self, sock_file): '''Try to remove the socket file. If the file is being used by one running xfrout process, exit from python. If it's not a socket file or nobody is listening , it will be removed. If it can't be removed, exit from python. ''' if self._sock_file_in_use(sock_file): sys.stderr.write("[b10-xfrout] Fail to start xfrout process, unix socket" " file '%s' is being used by another xfrout process\n" % sock_file) sys.exit(0) else: if not os.path.exists(sock_file): return try: os.unlink(sock_file) except OSError as err: sys.stderr.write('[b10-xfrout] Fail to remove file %s: %s\n' % (sock_file, err)) sys.exit(0) def _sock_file_in_use(self, sock_file): '''Check whether the socket file 'sock_file' exists and is being used by one running xfrout process. If it is, return True, or else return False. ''' try: sock = socket.socket(socket.AF_UNIX) sock.connect(sock_file) except socket.error as err: return False else: return True def shutdown(self): ThreadingUnixStreamServer.shutdown(self) try: os.unlink(self._sock_file) except Exception as e: self._log.log_message("error", str(e)) def update_config_data(self, new_config): '''Apply the new config setting of xfrout module. ''' self._log.log_message('info', 'update config data start.') self._lock.acquire() self._max_transfers_out = new_config.get('transfers_out') self._log.log_message('info', 'max transfer out : %d', self._max_transfers_out) self._lock.release() self._log.log_message('info', 'update config data complete.') def get_db_file(self): file, is_default = self._cc.get_remote_config_value("Auth", "database_file") # this too should be unnecessary, but currently the # 'from build' override isn't stored in the config # (and we don't have indirect python access to datasources yet) if is_default and "B10_FROM_BUILD" in os.environ: file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3" return file def increase_transfers_counter(self): '''Return False, if counter + 1 > max_transfers_out, or else return True ''' ret = False self._lock.acquire() if self._transfers_counter < self._max_transfers_out: self._transfers_counter += 1 ret = True self._lock.release() return ret def decrease_transfers_counter(self): self._lock.acquire() self._transfers_counter -= 1 self._lock.release() def listen_on_xfr_query(unix_socket_server): '''Listen xfr query in one single thread. Polls for shutdown every 0.1 seconds, is there a better time? ''' while True: try: unix_socket_server.serve_forever(poll_interval = 0.1) except select.error as err: # serve_forever() calls select.select(), which can be # interrupted. # If it is interrupted, it raises select.error with the # errno set to EINTR. We ignore this case, and let the # normal program flow continue by trying serve_forever() # again. if err.args[0] != errno.EINTR: raise class XfroutServer: def __init__(self): self._unix_socket_server = None self._log = None self._listen_sock_file = UNIX_SOCKET_FILE self._shutdown_event = threading.Event() self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler) self._config_data = self._cc.get_full_config() self._cc.start() self._cc.add_remote_config(AUTH_SPECFILE_LOCATION); self._log = isc.log.NSLogger(self._config_data.get('log_name'), self._config_data.get('log_file'), self._config_data.get('log_severity'), self._config_data.get('log_versions'), self._config_data.get('log_max_bytes'), True) self._start_xfr_query_listener() self._start_notifier() def _start_xfr_query_listener(self): '''Start a new thread to accept xfr query. ''' self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession, self._shutdown_event, self._config_data, self._cc, self._log); listener = threading.Thread(target = listen_on_xfr_query, args = (self._unix_socket_server,)) listener.start() def _start_notifier(self): datasrc = self._unix_socket_server.get_db_file() self._notifier = notify_out.NotifyOut(datasrc, self._log) td = threading.Thread(target = notify_out.dispatcher, args = (self._notifier,)) td.daemon = True td.start() def send_notify(self, zone_name, zone_class): self._notifier.send_notify(zone_name, zone_class) def config_handler(self, new_config): '''Update config data. TODO. Do error check''' answer = create_answer(0) for key in new_config: if key not in self._config_data: answer = create_answer(1, "Unknown config data: " + str(key)) continue self._config_data[key] = new_config[key] if self._log: self._log.update_config(new_config) if self._unix_socket_server: self._unix_socket_server.update_config_data(self._config_data) return answer def shutdown(self): ''' shutdown the xfrout process. The thread which is doing zone transfer-out should be terminated. ''' global xfrout_server xfrout_server = None #Avoid shutdown is called twice self._shutdown_event.set() if self._unix_socket_server: self._unix_socket_server.shutdown() main_thread = threading.currentThread() for th in threading.enumerate(): if th is main_thread: continue th.join() def command_handler(self, cmd, args): if cmd == "shutdown": self._log.log_message("info", "Received shutdown command.") self.shutdown() answer = create_answer(0) elif cmd == notify_out.ZONE_NEW_DATA_READY_CMD: zone_name = args.get('zone_name') zone_class = args.get('zone_class') if zone_name and zone_class: self._log.log_message("info", "zone '%s/%s': receive notify others command" \ % (zone_name, zone_class)) self.send_notify(zone_name, zone_class) answer = create_answer(0) else: answer = create_answer(1, "Bad command parameter:" + str(args)) else: answer = create_answer(1, "Unknown command:" + str(cmd)) return answer def run(self): '''Get and process all commands sent from cfgmgr or other modules. ''' while not self._shutdown_event.is_set(): self._cc.check_command() xfrout_server = None def signal_handler(signal, frame): if xfrout_server: xfrout_server.shutdown() sys.exit(0) def set_signal_handler(): signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) def set_cmd_options(parser): parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="display more about what is going on") if '__main__' == __name__: try: parser = OptionParser() set_cmd_options(parser) (options, args) = parser.parse_args() VERBOSE_MODE = options.verbose set_signal_handler() xfrout_server = XfroutServer() xfrout_server.run() except KeyboardInterrupt: sys.stderr.write("[b10-xfrout] exit xfrout process\n") except SessionError as e: sys.stderr.write("[b10-xfrout] Error creating xfrout, " "is the command channel daemon running?\n") except SessionTimeout as e: sys.stderr.write("[b10-xfrout] Error creating xfrout, " "is the configuration manager running?\n") except ModuleCCSessionError as e: sys.stderr.write("[b10-xfrout] exit xfrout process:%s\n" % str(e)) if xfrout_server: xfrout_server.shutdown()