#!@PYTHON@ # Copyright (C) 2010 Internet Systems Consortium. # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. import sys; sys.path.append ('@@PYTHONPATH@@') import isc import isc.cc import threading import struct import signal from isc.datasrc import sqlite3_ds from socketserver import * import os from isc.config.ccsession import * from isc.cc import SessionError import socket from optparse import OptionParser, OptionValueError try: from bind10_xfr import * from bind10_dns import * except ImportError as e: # C++ loadable module may not be installed; even so the xfrout process # must keep running, so we warn about it and move forward. sys.stderr.write('[b10-xfrout] failed to import DNS or XFR module: %s\n' % str(e)) if "B10_FROM_SOURCE" in os.environ: SPECFILE_PATH = os.environ["B10_FROM_SOURCE"] + "/src/bin/xfrout" else: PREFIX = "@prefix@" DATAROOTDIR = "@datarootdir@" SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX) SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec" UNIX_SOCKET_FILE = "@@LOCALSTATEDIR@@/auth_xfrout_conn" MAX_TRANSFERS_OUT = 10 verbose_mode = False class XfroutException(Exception): pass class XfroutSession(BaseRequestHandler): def handle(self): fd = recv_fd(self.request.fileno()) if fd < 0: raise XfroutException("failed to receive the FD for XFR connection") data_len = self.request.recv(2) msg_len = struct.unpack('!H', data_len)[0] msgdata = self.request.recv(msg_len) sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) try: self.dns_xfrout_start(sock, msgdata) except Exception as e: if verbose_mode: self.log_msg(str(e)) sock.close() def _parse_query_message(self, mdata): ''' parse query message to [socket,message]''' #TODO, need to add parseHeader() in case the message header is invalid try: msg = message(message_mode.PARSE) msg.from_wire(input_buffer(mdata)) except Exception as err: if verbose_mode: self.log_msg(str(err)) return rcode.FORMERR(), None return rcode.NOERROR(), msg def _get_query_zone_name(self, msg): q_iter = question_iter(msg) question = q_iter.get_question() return question.get_name().to_text() def _send_data(self, sock, data): size = len(data) total_count = 0 while total_count < size: count = sock.send(data[total_count:]) total_count += count def _send_message(self, sock, msg): obuf = output_buffer(0) render = message_render(obuf) msg.to_wire(render) header_len = struct.pack('H', socket.htons(obuf.get_length())) self._send_data(sock, header_len) self._send_data(sock, obuf.get_data()) def _reply_query_with_error_rcode(self, msg, sock, rcode_): msg.make_response() msg.set_rcode(rcode_) self._send_message(sock, msg) def _reply_query_with_format_error(self, msg, sock): '''query message format isn't legal.''' if not msg: return # query message is invalid. send nothing back. msg.make_response() msg.set_rcode(rcode.FORMERR()) self._send_message(sock, msg) def _zone_is_empty(self, zone): if sqlite3_ds.get_zone_soa(zone, self.server.get_db_file()): return False return True def _zone_exist(self, zonename): # Find zone in datasource, should this works? maybe should ask # config manager. soa = sqlite3_ds.get_zone_soa(zonename, self.server.get_db_file()) if soa: return True return False def _check_xfrout_available(self, zone_name): '''Check if xfr request can be responsed. TODO, Get zone's configuration from cfgmgr or some other place eg. check allow_transfer setting, ''' if not self._zone_exist(zone_name): return rcode.NOTAUTH() if self._zone_is_empty(zone_name): return rcode.SERVFAIL() #TODO, check allow_transfer if not self.server.increase_transfers_counter(): return rcode.REFUSED() return rcode.NOERROR() def dns_xfrout_start(self, sock, msg_query): rcode_, msg = self._parse_query_message(msg_query) #TODO. create query message and parse header if rcode_ != rcode.NOERROR(): return self._reply_query_with_format_error(msg, sock) zone_name = self._get_query_zone_name(msg) rcode_ = self._check_xfrout_available(zone_name) if rcode_ != rcode.NOERROR(): return self. _reply_query_with_error_rcode(msg, sock, rcode_) try: if verbose_mode: self.log_msg("transfer of '%s/IN': AXFR started" % zone_name) self._reply_xfrout_query(msg, sock, zone_name) if verbose_mode: self.log_msg("transfer of '%s/IN': AXFR end" % zone_name) except Exception as err: if verbose_mode: sys.stderr.write(str(err)) self.server.decrease_transfers_counter() return def _clear_message(self, msg): qid = msg.get_qid() opcode = msg.get_opcode() rcode = msg.get_rcode() msg.clear(message_mode.RENDER) msg.set_qid(qid) msg.set_opcode(opcode) msg.set_rcode(rcode) msg.set_header_flag(message_flag.AA()) msg.set_header_flag(message_flag.QR()) return msg def _create_rrset_from_db_record(self, record): '''Create one rrset from one record of datasource, if the schema of record is changed, This function should be updated first. ''' rrtype_ = rr_type(record[5]) rdata_ = create_rdata(rrtype_, rr_class.IN(), " ".join(record[7:])) rrset_ = rrset(name(record[2]), rr_class.IN(), rrtype_, rr_ttl( int(record[4]))) rrset_.add_rdata(rdata_) return rrset_ def _send_message_with_last_soa(self, msg, sock, rrset_soa): '''Add the SOA record to the end of message. If it can't be added, a new message should be created to send out the last soa . ''' obuf = output_buffer(0) render = message_render(obuf) msg.to_wire(render) old_message_len = obuf.get_length() msg.add_rrset(section.ANSWER(), rrset_soa) msg.to_wire(render) message_len = obuf.get_length() if message_len != old_message_len: self._send_message(sock, msg) else: msg = self._clear_message(msg) msg.add_rrset(section.ANSWER(), rrset_soa) self._send_message(sock, msg) def _get_message_len(self, msg): '''Get message length, every time need do like this? Actually there should be a better way, I need check with jinmei later. ''' obuf = output_buffer(0) render = message_render(obuf) msg.to_wire(render) return obuf.get_length() def _reply_xfrout_query(self, msg, sock, zone_name): #TODO, there should be a better way to insert rrset. msg.make_response() msg.set_header_flag(message_flag.AA()) soa_record = sqlite3_ds.get_zone_soa(zone_name, self.server.get_db_file()) rrset_soa = self._create_rrset_from_db_record(soa_record) msg.add_rrset(section.ANSWER(), rrset_soa) old_message_len = 0 # TODO, Since add_rrset() return nothing when rrset can't be added, so I have to compare # the message length to know if the rrset has been added sucessfully. for rr_data in sqlite3_ds.get_zone_datas(zone_name, self.server.get_db_file()): if self.server._shutdown_event.is_set(): # Check if xfrout is shutdown raise XfroutException("shutdown!") if rr_type(rr_data[5]) == rr_type.SOA(): #ignore soa record continue rrset_ = self._create_rrset_from_db_record(rr_data) msg.add_rrset(section.ANSWER(), rrset_) message_len = self._get_message_len(msg) if message_len != old_message_len: old_message_len = message_len continue self._send_message(sock, msg) msg = self._clear_message(msg) msg.add_rrset(section.ANSWER(), rrset_) # Add the rrset to the new message old_message_len = 0 self._send_message_with_last_soa(msg, sock, rrset_soa) def log_msg(self, msg): print('[b10-xfrout] ', msg) class UnixSockServer(ThreadingUnixStreamServer): '''The unix domain socket server which accept xfr query sent from auth server.''' def __init__(self, sock_file, handle_class, shutdown_event, config_data): try: os.unlink(sock_file) except: pass self._sock_file = sock_file ThreadingUnixStreamServer.__init__(self, sock_file, handle_class) self._lock = threading.Lock() self._transfers_counter = 0 self._shutdown_event = shutdown_event self.update_config_data(config_data) def shutdown(self): ThreadingUnixStreamServer.shutdown(self) try: os.unlink(self._sock_file) except: pass def update_config_data(self, new_config): '''Apply the new config setting of xfrout module. ''' self._lock.acquire() self._max_transfers_out = new_config.get('transfers_out') self._db_file = new_config.get('db_file') self._lock.release() def get_db_file(self): self._lock.acquire() file = self._db_file self._lock.release() return file def increase_transfers_counter(self): '''Return False, if counter + 1 > max_transfers_out, or else return True ''' ret = False self._lock.acquire() if self._transfers_counter < self._max_transfers_out: self._transfers_counter += 1 ret = True self._lock.release() return ret def decrease_transfers_counter(self): self._lock.acquire() self._transfers_counter -= 1 self._lock.release() def listen_on_xfr_query(unix_socket_server): '''Listen xfr query in one single thread. Polls for shutdown every 0.1 seconds, is there a better time? ''' unix_socket_server.serve_forever(poll_interval = 0.1) class XfroutServer: def __init__(self): self._unix_socket_server = None self._listen_sock_file = UNIX_SOCKET_FILE self._shutdown_event = threading.Event() self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler) self._config_data = self._cc.get_full_config() self._cc.start() self._start_xfr_query_listener() def _start_xfr_query_listener(self): '''Start a new thread to accept xfr query. ''' self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession, self._shutdown_event, self._config_data); listener = threading.Thread(target = listen_on_xfr_query, args = (self._unix_socket_server,)) listener.start() def config_handler(self, new_config): '''Update config data. TODO. Do error check''' answer = create_answer(0) for key in new_config: if key not in self._config_data: answer = create_answer(1, "Unknown config data: " + str(key)) continue self._config_data[key] = new_config[key] if self._unix_socket_server: self._unix_socket_server.update_config_data(self._config_data) return answer def shutdown(self): ''' shutdown the xfrout process. The thread which is doing zone transfer-out should be terminated. ''' global xfrout_server xfrout_server = None #Avoid shutdown is called twice self._shutdown_event.set() if self._unix_socket_server: self._unix_socket_server.shutdown() main_thread = threading.currentThread() for th in threading.enumerate(): if th is main_thread: continue th.join() def command_handler(self, cmd, args): if cmd == "shutdown": if verbose_mode: log_msg("Received shutdown command") self.shutdown() answer = create_answer(0) else: answer = create_answer(1, "Unknown command:" + str(cmd)) return answer def run(self): '''Get and process all commands sent from cfgmgr or other modules. ''' while not self._shutdown_event.is_set(): self._cc.check_command() xfrout_server = None def signal_handler(signal, frame): if xfrout_server: xfrout_server.shutdown() sys.exit(0) def set_signal_handler(): signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) def set_cmd_options(parser): parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="display more about what is going on") if '__main__' == __name__: try: parser = OptionParser() set_cmd_options(parser) (options, args) = parser.parse_args() verbose_mode = options.verbose set_signal_handler() xfrout_server = XfroutServer() xfrout_server.run() except KeyboardInterrupt: print("[b10-xfrout] exit xfrout process") except SessionError as e: print('[b10-xfrout] Error creating xfrout, ' 'is the command channel daemon running?' ) except ModuleCCSessionError as e: print('[b10-xfrout] exit xfrout process:', e) if xfrout_server: xfrout_server.shutdown()