123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444 |
- #!@PYTHON@
- # Copyright (C) 2010 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- import sys; sys.path.append ('@@PYTHONPATH@@')
- import isc
- import isc.cc
- import threading
- import struct
- import signal
- from isc.datasrc import sqlite3_ds
- from socketserver import *
- import os
- from isc.config.ccsession import *
- from isc.cc import SessionError
- import socket
- from optparse import OptionParser, OptionValueError
- try:
- from bind10_xfr import *
- from bind10_dns import *
- except ImportError as e:
- # C++ loadable module may not be installed; even so the xfrout process
- # must keep running, so we warn about it and move forward.
- sys.stderr.write('[b10-xfrout] failed to import DNS or XFR module: %s\n' % str(e))
- if "B10_FROM_SOURCE" in os.environ:
- SPECFILE_PATH = os.environ["B10_FROM_SOURCE"] + "/src/bin/xfrout"
- else:
- PREFIX = "@prefix@"
- DATAROOTDIR = "@datarootdir@"
- SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
- SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec"
- UNIX_SOCKET_FILE = "@@LOCALSTATEDIR@@/auth_xfrout_conn"
- MAX_TRANSFERS_OUT = 10
- verbose_mode = False
- class XfroutException(Exception): pass
- class XfroutSession(BaseRequestHandler):
- def handle(self):
- fd = recv_fd(self.request.fileno())
- if fd < 0:
- raise XfroutException("failed to receive the FD for XFR connection")
- data_len = self.request.recv(2)
- msg_len = struct.unpack('!H', data_len)[0]
- msgdata = self.request.recv(msg_len)
- sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
- try:
- self.dns_xfrout_start(sock, msgdata)
- except Exception as e:
- if verbose_mode:
- self.log_msg(str(e))
- sock.close()
- def _parse_query_message(self, mdata):
- ''' parse query message to [socket,message]'''
- #TODO, need to add parseHeader() in case the message header is invalid
- try:
- msg = message(message_mode.PARSE)
- msg.from_wire(input_buffer(mdata))
- except Exception as err:
- if verbose_mode:
- self.log_msg(str(err))
- return rcode.FORMERR(), None
- return rcode.NOERROR(), msg
- def _get_query_zone_name(self, msg):
- q_iter = question_iter(msg)
- question = q_iter.get_question()
- return question.get_name().to_text()
- def _send_data(self, sock, data):
- size = len(data)
- total_count = 0
- while total_count < size:
- count = sock.send(data[total_count:])
- total_count += count
- def _send_message(self, sock, msg):
- obuf = output_buffer(0)
- render = message_render(obuf)
- msg.to_wire(render)
- header_len = struct.pack('H', socket.htons(obuf.get_length()))
- self._send_data(sock, header_len)
- self._send_data(sock, obuf.get_data())
- def _reply_query_with_error_rcode(self, msg, sock, rcode_):
- msg.make_response()
- msg.set_rcode(rcode_)
- self._send_message(sock, msg)
- def _reply_query_with_format_error(self, msg, sock):
- '''query message format isn't legal.'''
- if not msg:
- return # query message is invalid. send nothing back.
- msg.make_response()
- msg.set_rcode(rcode.FORMERR())
- self._send_message(sock, msg)
- def _zone_is_empty(self, zone):
- if sqlite3_ds.get_zone_soa(zone, self.server.get_db_file()):
- return False
- return True
- def _zone_exist(self, zonename):
- # Find zone in datasource, should this works? maybe should ask
- # config manager.
- soa = sqlite3_ds.get_zone_soa(zonename, self.server.get_db_file())
- if soa:
- return True
- return False
-
- def _check_xfrout_available(self, zone_name):
- '''Check if xfr request can be responsed.
- TODO, Get zone's configuration from cfgmgr or some other place
- eg. check allow_transfer setting,
- '''
- if not self._zone_exist(zone_name):
- return rcode.NOTAUTH()
- if self._zone_is_empty(zone_name):
- return rcode.SERVFAIL()
- #TODO, check allow_transfer
- if not self.server.increase_transfers_counter():
- return rcode.REFUSED()
- return rcode.NOERROR()
- def dns_xfrout_start(self, sock, msg_query):
- rcode_, msg = self._parse_query_message(msg_query)
- #TODO. create query message and parse header
- if rcode_ != rcode.NOERROR():
- return self._reply_query_with_format_error(msg, sock)
- zone_name = self._get_query_zone_name(msg)
- rcode_ = self._check_xfrout_available(zone_name)
- if rcode_ != rcode.NOERROR():
- return self. _reply_query_with_error_rcode(msg, sock, rcode_)
- try:
- if verbose_mode:
- self.log_msg("transfer of '%s/IN': AXFR started" % zone_name)
- self._reply_xfrout_query(msg, sock, zone_name)
- if verbose_mode:
- self.log_msg("transfer of '%s/IN': AXFR end" % zone_name)
- except Exception as err:
- if verbose_mode:
- sys.stderr.write(str(err))
- self.server.decrease_transfers_counter()
- return
- def _clear_message(self, msg):
- qid = msg.get_qid()
- opcode = msg.get_opcode()
- rcode = msg.get_rcode()
-
- msg.clear(message_mode.RENDER)
- msg.set_qid(qid)
- msg.set_opcode(opcode)
- msg.set_rcode(rcode)
- msg.set_header_flag(message_flag.AA())
- msg.set_header_flag(message_flag.QR())
- return msg
- def _create_rrset_from_db_record(self, record):
- '''Create one rrset from one record of datasource, if the schema of record is changed,
- This function should be updated first.
- '''
- rrtype_ = rr_type(record[5])
- rdata_ = create_rdata(rrtype_, rr_class.IN(), " ".join(record[7:]))
- rrset_ = rrset(name(record[2]), rr_class.IN(), rrtype_, rr_ttl( int(record[4])))
- rrset_.add_rdata(rdata_)
- return rrset_
-
- def _send_message_with_last_soa(self, msg, sock, rrset_soa):
- '''Add the SOA record to the end of message. If it can't be
- added, a new message should be created to send out the last soa .
- '''
- obuf = output_buffer(0)
- render = message_render(obuf)
- msg.to_wire(render)
- old_message_len = obuf.get_length()
- msg.add_rrset(section.ANSWER(), rrset_soa)
- msg.to_wire(render)
- message_len = obuf.get_length()
- if message_len != old_message_len:
- self._send_message(sock, msg)
- else:
- msg = self._clear_message(msg)
- msg.add_rrset(section.ANSWER(), rrset_soa)
- self._send_message(sock, msg)
- def _get_message_len(self, msg):
- '''Get message length, every time need do like this? Actually there should be
- a better way, I need check with jinmei later.
- '''
- obuf = output_buffer(0)
- render = message_render(obuf)
- msg.to_wire(render)
- return obuf.get_length()
- def _reply_xfrout_query(self, msg, sock, zone_name):
- #TODO, there should be a better way to insert rrset.
- msg.make_response()
- msg.set_header_flag(message_flag.AA())
- soa_record = sqlite3_ds.get_zone_soa(zone_name, self.server.get_db_file())
- rrset_soa = self._create_rrset_from_db_record(soa_record)
- msg.add_rrset(section.ANSWER(), rrset_soa)
- old_message_len = 0
- # TODO, Since add_rrset() return nothing when rrset can't be added, so I have to compare
- # the message length to know if the rrset has been added sucessfully.
- for rr_data in sqlite3_ds.get_zone_datas(zone_name, self.server.get_db_file()):
- if self.server._shutdown_event.is_set(): # Check if xfrout is shutdown
- raise XfroutException("shutdown!")
- if rr_type(rr_data[5]) == rr_type.SOA(): #ignore soa record
- continue
-
- rrset_ = self._create_rrset_from_db_record(rr_data)
- msg.add_rrset(section.ANSWER(), rrset_)
- message_len = self._get_message_len(msg)
- if message_len != old_message_len:
- old_message_len = message_len
- continue
- self._send_message(sock, msg)
- msg = self._clear_message(msg)
- msg.add_rrset(section.ANSWER(), rrset_) # Add the rrset to the new message
- old_message_len = 0
- self._send_message_with_last_soa(msg, sock, rrset_soa)
- def log_msg(self, msg):
- print('[b10-xfrout] ', msg)
-
- class UnixSockServer(ThreadingUnixStreamServer):
- '''The unix domain socket server which accept xfr query sent from auth server.'''
- def __init__(self, sock_file, handle_class, shutdown_event, config_data):
- try:
- os.unlink(sock_file)
- except:
- pass
-
- self._sock_file = sock_file
- ThreadingUnixStreamServer.__init__(self, sock_file, handle_class)
- self._lock = threading.Lock()
- self._transfers_counter = 0
- self._shutdown_event = shutdown_event
- self.update_config_data(config_data)
-
- def shutdown(self):
- ThreadingUnixStreamServer.shutdown(self)
- try:
- os.unlink(self._sock_file)
- except:
- pass
- def update_config_data(self, new_config):
- '''Apply the new config setting of xfrout module. '''
- self._lock.acquire()
- self._max_transfers_out = new_config.get('transfers_out')
- self._db_file = new_config.get('db_file')
- self._lock.release()
- def get_db_file(self):
- self._lock.acquire()
- file = self._db_file
- self._lock.release()
- return file
- def increase_transfers_counter(self):
- '''Return False, if counter + 1 > max_transfers_out, or else
- return True
- '''
- ret = False
- self._lock.acquire()
- if self._transfers_counter < self._max_transfers_out:
- self._transfers_counter += 1
- ret = True
- self._lock.release()
- return ret
- def decrease_transfers_counter(self):
- self._lock.acquire()
- self._transfers_counter -= 1
- self._lock.release()
- def listen_on_xfr_query(unix_socket_server):
- '''Listen xfr query in one single thread. Polls for shutdown
- every 0.1 seconds, is there a better time?
- '''
- unix_socket_server.serve_forever(poll_interval = 0.1)
-
- class XfroutServer:
- def __init__(self):
- self._unix_socket_server = None
- self._listen_sock_file = UNIX_SOCKET_FILE
- self._shutdown_event = threading.Event()
- self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
- self._config_data = self._cc.get_full_config()
- self._cc.start()
- self._start_xfr_query_listener()
- def _start_xfr_query_listener(self):
- '''Start a new thread to accept xfr query. '''
-
- self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession,
- self._shutdown_event, self._config_data);
- listener = threading.Thread(target = listen_on_xfr_query, args = (self._unix_socket_server,))
- listener.start()
- def config_handler(self, new_config):
- '''Update config data. TODO. Do error check'''
- answer = create_answer(0)
- for key in new_config:
- if key not in self._config_data:
- answer = create_answer(1, "Unknown config data: " + str(key))
- continue
- self._config_data[key] = new_config[key]
-
- if self._unix_socket_server:
- self._unix_socket_server.update_config_data(self._config_data)
- return answer
- def shutdown(self):
- ''' shutdown the xfrout process. The thread which is doing zone transfer-out should be
- terminated.
- '''
- global xfrout_server
- xfrout_server = None #Avoid shutdown is called twice
- self._shutdown_event.set()
- if self._unix_socket_server:
- self._unix_socket_server.shutdown()
- main_thread = threading.currentThread()
- for th in threading.enumerate():
- if th is main_thread:
- continue
- th.join()
- def command_handler(self, cmd, args):
- if cmd == "shutdown":
- if verbose_mode:
- log_msg("Received shutdown command")
- self.shutdown()
- answer = create_answer(0)
- else:
- answer = create_answer(1, "Unknown command:" + str(cmd))
- return answer
-
- def run(self):
- '''Get and process all commands sent from cfgmgr or other modules. '''
- while not self._shutdown_event.is_set():
- self._cc.check_command()
- xfrout_server = None
- def signal_handler(signal, frame):
- if xfrout_server:
- xfrout_server.shutdown()
- sys.exit(0)
- def set_signal_handler():
- signal.signal(signal.SIGTERM, signal_handler)
- signal.signal(signal.SIGINT, signal_handler)
- def set_cmd_options(parser):
- parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
- help="display more about what is going on")
- if '__main__' == __name__:
- try:
- parser = OptionParser()
- set_cmd_options(parser)
- (options, args) = parser.parse_args()
- verbose_mode = options.verbose
- set_signal_handler()
- xfrout_server = XfroutServer()
- xfrout_server.run()
- except KeyboardInterrupt:
- print("[b10-xfrout] exit xfrout process")
- except SessionError as e:
- print('[b10-xfrout] Error creating xfrout, '
- 'is the command channel daemon running?' )
- except ModuleCCSessionError as e:
- print('[b10-xfrout] exit xfrout process:', e)
- if xfrout_server:
- xfrout_server.shutdown()
|