123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824 |
- #!@PYTHON@
- # Copyright (C) 2010 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- import sys; sys.path.append ('@@PYTHONPATH@@')
- import isc
- import isc.cc
- import threading
- import struct
- import signal
- from isc.datasrc import sqlite3_ds
- from socketserver import *
- import os
- from isc.config.ccsession import *
- from isc.cc import SessionError, SessionTimeout
- from isc.notify import notify_out
- import isc.util.process
- import socket
- import select
- import errno
- from optparse import OptionParser, OptionValueError
- from isc.util import socketserver_mixin
- from isc.log_messages.xfrout_messages import *
- isc.log.init("b10-xfrout")
- logger = isc.log.Logger("xfrout")
- try:
- from libutil_io_python import *
- from pydnspp import *
- except ImportError as e:
- # C++ loadable module may not be installed; even so the xfrout process
- # must keep running, so we warn about it and move forward.
- log.error(XFROUT_IMPORT, str(e))
- from isc.acl.acl import ACCEPT, REJECT, DROP, LoaderError
- from isc.acl.dns import REQUEST_LOADER
- isc.util.process.rename()
- class XfroutConfigError(Exception):
- """An exception indicating an error in updating xfrout configuration.
- This exception is raised when the xfrout process encouters an error in
- handling configuration updates. Not all syntax error can be caught
- at the module-CC layer, so xfrout needs to (explicitly or implicitly)
- validate the given configuration data itself. When it finds an error
- it raises this exception (either directly or by converting an exception
- from other modules) as a unified error in configuration.
- """
- pass
- def init_paths():
- global SPECFILE_PATH
- global AUTH_SPECFILE_PATH
- global UNIX_SOCKET_FILE
- if "B10_FROM_BUILD" in os.environ:
- SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/xfrout"
- AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth"
- if "B10_FROM_SOURCE_LOCALSTATEDIR" in os.environ:
- UNIX_SOCKET_FILE = os.environ["B10_FROM_SOURCE_LOCALSTATEDIR"] + \
- "/auth_xfrout_conn"
- else:
- UNIX_SOCKET_FILE = os.environ["B10_FROM_BUILD"] + "/auth_xfrout_conn"
- else:
- PREFIX = "@prefix@"
- DATAROOTDIR = "@datarootdir@"
- SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
- AUTH_SPECFILE_PATH = SPECFILE_PATH
- if "BIND10_XFROUT_SOCKET_FILE" in os.environ:
- UNIX_SOCKET_FILE = os.environ["BIND10_XFROUT_SOCKET_FILE"]
- else:
- UNIX_SOCKET_FILE = "@@LOCALSTATEDIR@@/@PACKAGE_NAME@/auth_xfrout_conn"
- init_paths()
- SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec"
- AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + os.sep + "auth.spec"
- VERBOSE_MODE = False
- # tsig sign every N axfr packets.
- TSIG_SIGN_EVERY_NTH = 96
- XFROUT_MAX_MESSAGE_SIZE = 65535
- def get_rrset_len(rrset):
- """Returns the wire length of the given RRset"""
- bytes = bytearray()
- rrset.to_wire(bytes)
- return len(bytes)
- class XfroutSession():
- def __init__(self, sock_fd, request_data, server, tsig_key_ring, remote,
- default_acl, zone_config):
- self._sock_fd = sock_fd
- self._request_data = request_data
- self._server = server
- self._tsig_key_ring = tsig_key_ring
- self._tsig_ctx = None
- self._tsig_len = 0
- self._remote = remote
- self._acl = default_acl
- self._zone_config = zone_config
- self.handle()
- def create_tsig_ctx(self, tsig_record, tsig_key_ring):
- return TSIGContext(tsig_record.get_name(), tsig_record.get_rdata().get_algorithm(),
- tsig_key_ring)
- def handle(self):
- ''' Handle a xfrout query, send xfrout response '''
- try:
- self.dns_xfrout_start(self._sock_fd, self._request_data)
- #TODO, avoid catching all exceptions
- except Exception as e:
- logger.error(XFROUT_HANDLE_QUERY_ERROR, e)
- pass
- os.close(self._sock_fd)
- def _check_request_tsig(self, msg, request_data):
- ''' If request has a tsig record, perform tsig related checks '''
- tsig_record = msg.get_tsig_record()
- if tsig_record is not None:
- self._tsig_len = tsig_record.get_length()
- self._tsig_ctx = self.create_tsig_ctx(tsig_record, self._tsig_key_ring)
- tsig_error = self._tsig_ctx.verify(tsig_record, request_data)
- if tsig_error != TSIGError.NOERROR:
- return Rcode.NOTAUTH()
- return Rcode.NOERROR()
- def _parse_query_message(self, mdata):
- ''' parse query message to [socket,message]'''
- #TODO, need to add parseHeader() in case the message header is invalid
- try:
- msg = Message(Message.PARSE)
- Message.from_wire(msg, mdata)
- except Exception as err: # Exception is too broad
- logger.error(XFROUT_PARSE_QUERY_ERROR, err)
- return Rcode.FORMERR(), None
- # TSIG related checks
- rcode = self._check_request_tsig(msg, mdata)
- if rcode == Rcode.NOERROR():
- # ACL checks
- zone_name = msg.get_question()[0].get_name()
- zone_class = msg.get_question()[0].get_class()
- acl = self._get_transfer_acl(zone_name, zone_class)
- acl_result = acl.execute(
- isc.acl.dns.RequestContext(self._remote,
- msg.get_tsig_record()))
- if acl_result == DROP:
- logger.info(XFROUT_QUERY_DROPPED, zone_name, zone_class,
- self._remote[0], self._remote[1])
- return None, None
- elif acl_result == REJECT:
- logger.info(XFROUT_QUERY_REJECTED, zone_name, zone_class,
- self._remote[0], self._remote[1])
- return Rcode.REFUSED(), msg
- return rcode, msg
- def _get_transfer_acl(self, zone_name, zone_class):
- '''Return the ACL that should be applied for a given zone.
- The zone is identified by a tuple of name and RR class.
- If a per zone configuration for the zone exists and contains
- transfer_acl, that ACL will be used; otherwise, the default
- ACL will be used.
- '''
- # Internally zone names are managed in lower cased label characters,
- # so we first need to convert the name.
- zone_name_lower = Name(zone_name.to_text(), True)
- config_key = (zone_class.to_text(), zone_name_lower.to_text())
- if config_key in self._zone_config and \
- 'transfer_acl' in self._zone_config[config_key]:
- return self._zone_config[config_key]['transfer_acl']
- return self._acl
- def _get_query_zone_name(self, msg):
- question = msg.get_question()[0]
- return question.get_name().to_text()
- def _get_query_zone_class(self, msg):
- question = msg.get_question()[0]
- return question.get_class().to_text()
- def _send_data(self, sock_fd, data):
- size = len(data)
- total_count = 0
- while total_count < size:
- count = os.write(sock_fd, data[total_count:])
- total_count += count
- def _send_message(self, sock_fd, msg, tsig_ctx=None):
- render = MessageRenderer()
- # As defined in RFC5936 section3.4, perform case-preserving name
- # compression for AXFR message.
- render.set_compress_mode(MessageRenderer.CASE_SENSITIVE)
- render.set_length_limit(XFROUT_MAX_MESSAGE_SIZE)
- # XXX Currently, python wrapper doesn't accept 'None' parameter in this case,
- # we should remove the if statement and use a universal interface later.
- if tsig_ctx is not None:
- msg.to_wire(render, tsig_ctx)
- else:
- msg.to_wire(render)
- header_len = struct.pack('H', socket.htons(render.get_length()))
- self._send_data(sock_fd, header_len)
- self._send_data(sock_fd, render.get_data())
- def _reply_query_with_error_rcode(self, msg, sock_fd, rcode_):
- if not msg:
- return # query message is invalid. send nothing back.
- msg.make_response()
- msg.set_rcode(rcode_)
- self._send_message(sock_fd, msg, self._tsig_ctx)
- def _zone_has_soa(self, zone):
- '''Judge if the zone has an SOA record.'''
- # In some sense, the SOA defines a zone.
- # If the current name server has authority for the
- # specific zone, we need to judge if the zone has an SOA record;
- # if not, we consider the zone has incomplete data, so xfrout can't
- # serve for it.
- if sqlite3_ds.get_zone_soa(zone, self._server.get_db_file()):
- return True
- return False
- def _zone_exist(self, zonename):
- '''Judge if the zone is configured by config manager.'''
- # Currently, if we find the zone in datasource successfully, we
- # consider the zone is configured, and the current name server has
- # authority for the specific zone.
- # TODO: should get zone's configuration from cfgmgr or other place
- # in future.
- return sqlite3_ds.zone_exist(zonename, self._server.get_db_file())
- def _check_xfrout_available(self, zone_name):
- '''Check if xfr request can be responsed.
- TODO, Get zone's configuration from cfgmgr or some other place
- eg. check allow_transfer setting,
- '''
- # If the current name server does not have authority for the
- # zone, xfrout can't serve for it, return rcode NOTAUTH.
- if not self._zone_exist(zone_name):
- return Rcode.NOTAUTH()
- # If we are an authoritative name server for the zone, but fail
- # to find the zone's SOA record in datasource, xfrout can't
- # provide zone transfer for it.
- if not self._zone_has_soa(zone_name):
- return Rcode.SERVFAIL()
- #TODO, check allow_transfer
- if not self._server.increase_transfers_counter():
- return Rcode.REFUSED()
- return Rcode.NOERROR()
- def dns_xfrout_start(self, sock_fd, msg_query):
- rcode_, msg = self._parse_query_message(msg_query)
- #TODO. create query message and parse header
- if rcode_ is None: # Dropped by ACL
- return
- elif rcode_ == Rcode.NOTAUTH() or rcode_ == Rcode.REFUSED():
- return self._reply_query_with_error_rcode(msg, sock_fd, rcode_)
- elif rcode_ != Rcode.NOERROR():
- return self._reply_query_with_error_rcode(msg, sock_fd,
- Rcode.FORMERR())
- zone_name = self._get_query_zone_name(msg)
- zone_class_str = self._get_query_zone_class(msg)
- # TODO: should we not also include class in the check?
- rcode_ = self._check_xfrout_available(zone_name)
- if rcode_ != Rcode.NOERROR():
- logger.info(XFROUT_AXFR_TRANSFER_FAILED, zone_name,
- zone_class_str, rcode_.to_text())
- return self._reply_query_with_error_rcode(msg, sock_fd, rcode_)
- try:
- logger.info(XFROUT_AXFR_TRANSFER_STARTED, zone_name, zone_class_str)
- self._reply_xfrout_query(msg, sock_fd, zone_name)
- except Exception as err:
- logger.error(XFROUT_AXFR_TRANSFER_ERROR, zone_name,
- zone_class_str, str(err))
- pass
- logger.info(XFROUT_AXFR_TRANSFER_DONE, zone_name, zone_class_str)
- self._server.decrease_transfers_counter()
- return
- def _clear_message(self, msg):
- qid = msg.get_qid()
- opcode = msg.get_opcode()
- rcode = msg.get_rcode()
- msg.clear(Message.RENDER)
- msg.set_qid(qid)
- msg.set_opcode(opcode)
- msg.set_rcode(rcode)
- msg.set_header_flag(Message.HEADERFLAG_AA)
- msg.set_header_flag(Message.HEADERFLAG_QR)
- return msg
- def _create_rrset_from_db_record(self, record):
- '''Create one rrset from one record of datasource, if the schema of record is changed,
- This function should be updated first.
- '''
- rrtype_ = RRType(record[5])
- rdata_ = Rdata(rrtype_, RRClass("IN"), " ".join(record[7:]))
- rrset_ = RRset(Name(record[2]), RRClass("IN"), rrtype_, RRTTL( int(record[4])))
- rrset_.add_rdata(rdata_)
- return rrset_
- def _send_message_with_last_soa(self, msg, sock_fd, rrset_soa, message_upper_len,
- count_since_last_tsig_sign):
- '''Add the SOA record to the end of message. If it can't be
- added, a new message should be created to send out the last soa .
- '''
- rrset_len = get_rrset_len(rrset_soa)
- if (count_since_last_tsig_sign == TSIG_SIGN_EVERY_NTH and
- message_upper_len + rrset_len >= XFROUT_MAX_MESSAGE_SIZE):
- # If tsig context exist, sign the packet with serial number TSIG_SIGN_EVERY_NTH
- self._send_message(sock_fd, msg, self._tsig_ctx)
- msg = self._clear_message(msg)
- elif (count_since_last_tsig_sign != TSIG_SIGN_EVERY_NTH and
- message_upper_len + rrset_len + self._tsig_len >= XFROUT_MAX_MESSAGE_SIZE):
- self._send_message(sock_fd, msg)
- msg = self._clear_message(msg)
- # If tsig context exist, sign the last packet
- msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
- self._send_message(sock_fd, msg, self._tsig_ctx)
- def _reply_xfrout_query(self, msg, sock_fd, zone_name):
- #TODO, there should be a better way to insert rrset.
- count_since_last_tsig_sign = TSIG_SIGN_EVERY_NTH
- msg.make_response()
- msg.set_header_flag(Message.HEADERFLAG_AA)
- soa_record = sqlite3_ds.get_zone_soa(zone_name, self._server.get_db_file())
- rrset_soa = self._create_rrset_from_db_record(soa_record)
- msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
- message_upper_len = get_rrset_len(rrset_soa) + self._tsig_len
- for rr_data in sqlite3_ds.get_zone_datas(zone_name, self._server.get_db_file()):
- if self._server._shutdown_event.is_set(): # Check if xfrout is shutdown
- logger.info(XFROUT_STOPPING)
- return
- # TODO: RRType.SOA() ?
- if RRType(rr_data[5]) == RRType("SOA"): #ignore soa record
- continue
- rrset_ = self._create_rrset_from_db_record(rr_data)
- # We calculate the maximum size of the RRset (i.e. the
- # size without compression) and use that to see if we
- # may have reached the limit
- rrset_len = get_rrset_len(rrset_)
- if message_upper_len + rrset_len < XFROUT_MAX_MESSAGE_SIZE:
- msg.add_rrset(Message.SECTION_ANSWER, rrset_)
- message_upper_len += rrset_len
- continue
- # If tsig context exist, sign every N packets
- if count_since_last_tsig_sign == TSIG_SIGN_EVERY_NTH:
- count_since_last_tsig_sign = 0
- self._send_message(sock_fd, msg, self._tsig_ctx)
- else:
- self._send_message(sock_fd, msg)
- count_since_last_tsig_sign += 1
- msg = self._clear_message(msg)
- msg.add_rrset(Message.SECTION_ANSWER, rrset_) # Add the rrset to the new message
- # Reserve tsig space for signed packet
- if count_since_last_tsig_sign == TSIG_SIGN_EVERY_NTH:
- message_upper_len = rrset_len + self._tsig_len
- else:
- message_upper_len = rrset_len
- self._send_message_with_last_soa(msg, sock_fd, rrset_soa, message_upper_len,
- count_since_last_tsig_sign)
- class UnixSockServer(socketserver_mixin.NoPollMixIn,
- ThreadingUnixStreamServer):
- '''The unix domain socket server which accept xfr query sent from auth server.'''
- def __init__(self, sock_file, handle_class, shutdown_event, config_data,
- cc):
- self._remove_unused_sock_file(sock_file)
- self._sock_file = sock_file
- socketserver_mixin.NoPollMixIn.__init__(self)
- ThreadingUnixStreamServer.__init__(self, sock_file, handle_class)
- self._shutdown_event = shutdown_event
- self._write_sock, self._read_sock = socket.socketpair()
- self._common_init()
- self._cc = cc
- self.update_config_data(config_data)
- def _common_init(self):
- '''Initialization shared with the mock server class used for tests'''
- self._lock = threading.Lock()
- self._transfers_counter = 0
- self._zone_config = {}
- self._acl = None # this will be initialized in update_config_data()
- def _receive_query_message(self, sock):
- ''' receive request message from sock'''
- # receive data length
- data_len = sock.recv(2)
- if not data_len:
- return None
- msg_len = struct.unpack('!H', data_len)[0]
- # receive data
- recv_size = 0
- msgdata = b''
- while recv_size < msg_len:
- data = sock.recv(msg_len - recv_size)
- if not data:
- return None
- recv_size += len(data)
- msgdata += data
- return msgdata
- def handle_request(self):
- ''' Enable server handle a request until shutdown or auth is closed.'''
- try:
- request, client_address = self.get_request()
- except socket.error:
- logger.error(XFROUT_FETCH_REQUEST_ERROR)
- return
- # Check self._shutdown_event to ensure the real shutdown comes.
- # Linux could trigger a spurious readable event on the _read_sock
- # due to a bug, so we need perform a double check.
- while not self._shutdown_event.is_set(): # Check if xfrout is shutdown
- try:
- (rlist, wlist, xlist) = select.select([self._read_sock, request], [], [])
- except select.error as e:
- if e.args[0] == errno.EINTR:
- (rlist, wlist, xlist) = ([], [], [])
- continue
- else:
- logger.error(XFROUT_SOCKET_SELECT_ERROR, str(e))
- break
- # self.server._shutdown_event will be set by now, if it is not a false
- # alarm
- if self._read_sock in rlist:
- continue
- try:
- self.process_request(request)
- except Exception as pre:
- log.error(XFROUT_PROCESS_REQUEST_ERROR, str(pre))
- break
- def _handle_request_noblock(self):
- """Override the function _handle_request_noblock(), it creates a new
- thread to handle requests for each auth"""
- td = threading.Thread(target=self.handle_request)
- td.setDaemon(True)
- td.start()
- def process_request(self, request):
- """Receive socket fd and query message from auth, then
- start a new thread to process the request."""
- sock_fd = recv_fd(request.fileno())
- if sock_fd < 0:
- # This may happen when one xfrout process try to connect to
- # xfrout unix socket server, to check whether there is another
- # xfrout running.
- if sock_fd == FD_COMM_ERROR:
- logger.error(XFROUT_RECEIVE_FILE_DESCRIPTOR_ERROR)
- return
- # receive request msg
- request_data = self._receive_query_message(request)
- if not request_data:
- return
- t = threading.Thread(target=self.finish_request,
- args = (sock_fd, request_data))
- if self.daemon_threads:
- t.daemon = True
- t.start()
- def _guess_remote(self, sock_fd):
- """
- Guess remote address and port of the socket. The sock_fd must be a
- socket
- """
- # This uses a trick. If the socket is IPv4 in reality and we pretend
- # it to be IPv6, it returns IPv4 address anyway. This doesn't seem
- # to care about the SOCK_STREAM parameter at all (which it really is,
- # except for testing)
- if socket.has_ipv6:
- sock = socket.fromfd(sock_fd, socket.AF_INET6, socket.SOCK_STREAM)
- else:
- # To make it work even on hosts without IPv6 support
- # (Any idea how to simulate this in test?)
- sock = socket.fromfd(sock_fd, socket.AF_INET, socket.SOCK_STREAM)
- return sock.getpeername()
- def finish_request(self, sock_fd, request_data):
- '''Finish one request by instantiating RequestHandlerClass.
- This method creates a XfroutSession object.
- '''
- self._lock.acquire()
- acl = self._acl
- zone_config = self._zone_config
- self._lock.release()
- self.RequestHandlerClass(sock_fd, request_data, self,
- self.tsig_key_ring,
- self._guess_remote(sock_fd), acl, zone_config)
- def _remove_unused_sock_file(self, sock_file):
- '''Try to remove the socket file. If the file is being used
- by one running xfrout process, exit from python.
- If it's not a socket file or nobody is listening
- , it will be removed. If it can't be removed, exit from python. '''
- if self._sock_file_in_use(sock_file):
- logger.error(XFROUT_UNIX_SOCKET_FILE_IN_USE, sock_file)
- sys.exit(0)
- else:
- if not os.path.exists(sock_file):
- return
- try:
- os.unlink(sock_file)
- except OSError as err:
- logger.error(XFROUT_REMOVE_OLD_UNIX_SOCKET_FILE_ERROR, sock_file, str(err))
- sys.exit(0)
- def _sock_file_in_use(self, sock_file):
- '''Check whether the socket file 'sock_file' exists and
- is being used by one running xfrout process. If it is,
- return True, or else return False. '''
- try:
- sock = socket.socket(socket.AF_UNIX)
- sock.connect(sock_file)
- except socket.error as err:
- return False
- else:
- return True
- def shutdown(self):
- self._write_sock.send(b"shutdown") #terminate the xfrout session thread
- super().shutdown() # call the shutdown() of class socketserver_mixin.NoPollMixIn
- try:
- os.unlink(self._sock_file)
- except Exception as e:
- logger.error(XFROUT_REMOVE_UNIX_SOCKET_FILE_ERROR, self._sock_file, str(e))
- pass
- def update_config_data(self, new_config):
- '''Apply the new config setting of xfrout module.
- '''
- self._lock.acquire()
- try:
- logger.info(XFROUT_NEW_CONFIG)
- new_acl = self._acl
- if 'transfer_acl' in new_config:
- try:
- new_acl = REQUEST_LOADER.load(new_config['transfer_acl'])
- except LoaderError as e:
- raise XfroutConfigError('Failed to parse transfer_acl: ' +
- str(e))
- new_zone_config = self._zone_config
- zconfig_data = new_config.get('zone_config')
- if zconfig_data is not None:
- new_zone_config = self.__create_zone_config(zconfig_data)
- self._acl = new_acl
- self._zone_config = new_zone_config
- self._max_transfers_out = new_config.get('transfers_out')
- self.set_tsig_key_ring(new_config.get('tsig_key_ring'))
- except Exception as e:
- self._lock.release()
- raise e
- self._lock.release()
- logger.info(XFROUT_NEW_CONFIG_DONE)
- def __create_zone_config(self, zone_config_list):
- new_config = {}
- for zconf in zone_config_list:
- # convert the class, origin (name) pair. First build pydnspp
- # object to reject invalid input.
- zclass_str = zconf.get('class')
- if zclass_str is None:
- #zclass_str = 'IN' # temporary
- zclass_str = self._cc.get_default_value('zone_config/class')
- zclass = RRClass(zclass_str)
- zorigin = Name(zconf['origin'], True)
- config_key = (zclass.to_text(), zorigin.to_text())
- # reject duplicate config
- if config_key in new_config:
- raise XfroutConfigError('Duplicate zone_config for ' +
- str(zorigin) + '/' + str(zclass))
- # create a new config entry, build any given (and known) config
- new_config[config_key] = {}
- if 'transfer_acl' in zconf:
- try:
- new_config[config_key]['transfer_acl'] = \
- REQUEST_LOADER.load(zconf['transfer_acl'])
- except LoaderError as e:
- raise XfroutConfigError('Failed to parse transfer_acl ' +
- 'for ' + zorigin.to_text() + '/' +
- zclass_str + ': ' + str(e))
- return new_config
- def set_tsig_key_ring(self, key_list):
- """Set the tsig_key_ring , given a TSIG key string list representation. """
- # XXX add values to configure zones/tsig options
- self.tsig_key_ring = TSIGKeyRing()
- # If key string list is empty, create a empty tsig_key_ring
- if not key_list:
- return
- for key_item in key_list:
- try:
- self.tsig_key_ring.add(TSIGKey(key_item))
- except InvalidParameter as ipe:
- logger.error(XFROUT_BAD_TSIG_KEY_STRING, str(key_item))
- def get_db_file(self):
- file, is_default = self._cc.get_remote_config_value("Auth", "database_file")
- # this too should be unnecessary, but currently the
- # 'from build' override isn't stored in the config
- # (and we don't have indirect python access to datasources yet)
- if is_default and "B10_FROM_BUILD" in os.environ:
- file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3"
- return file
- def increase_transfers_counter(self):
- '''Return False, if counter + 1 > max_transfers_out, or else
- return True
- '''
- ret = False
- self._lock.acquire()
- if self._transfers_counter < self._max_transfers_out:
- self._transfers_counter += 1
- ret = True
- self._lock.release()
- return ret
- def decrease_transfers_counter(self):
- self._lock.acquire()
- self._transfers_counter -= 1
- self._lock.release()
- class XfroutServer:
- def __init__(self):
- self._unix_socket_server = None
- self._listen_sock_file = UNIX_SOCKET_FILE
- self._shutdown_event = threading.Event()
- self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
- self._config_data = self._cc.get_full_config()
- self._cc.start()
- self._cc.add_remote_config(AUTH_SPECFILE_LOCATION);
- self._start_xfr_query_listener()
- self._start_notifier()
- def _start_xfr_query_listener(self):
- '''Start a new thread to accept xfr query. '''
- self._unix_socket_server = UnixSockServer(self._listen_sock_file,
- XfroutSession,
- self._shutdown_event,
- self._config_data,
- self._cc)
- listener = threading.Thread(target=self._unix_socket_server.serve_forever)
- listener.start()
- def _start_notifier(self):
- datasrc = self._unix_socket_server.get_db_file()
- self._notifier = notify_out.NotifyOut(datasrc)
- self._notifier.dispatcher()
- def send_notify(self, zone_name, zone_class):
- self._notifier.send_notify(zone_name, zone_class)
- def config_handler(self, new_config):
- '''Update config data. TODO. Do error check'''
- answer = create_answer(0)
- for key in new_config:
- if key not in self._config_data:
- answer = create_answer(1, "Unknown config data: " + str(key))
- continue
- self._config_data[key] = new_config[key]
- if self._unix_socket_server:
- try:
- self._unix_socket_server.update_config_data(self._config_data)
- except Exception as e:
- answer = create_answer(1,
- "Failed to handle new configuration: " +
- str(e))
- return answer
- def shutdown(self):
- ''' shutdown the xfrout process. The thread which is doing zone transfer-out should be
- terminated.
- '''
- global xfrout_server
- xfrout_server = None #Avoid shutdown is called twice
- self._shutdown_event.set()
- self._notifier.shutdown()
- if self._unix_socket_server:
- self._unix_socket_server.shutdown()
- # Wait for all threads to terminate
- main_thread = threading.currentThread()
- for th in threading.enumerate():
- if th is main_thread:
- continue
- th.join()
- def command_handler(self, cmd, args):
- if cmd == "shutdown":
- logger.info(XFROUT_RECEIVED_SHUTDOWN_COMMAND)
- self.shutdown()
- answer = create_answer(0)
- elif cmd == notify_out.ZONE_NEW_DATA_READY_CMD:
- zone_name = args.get('zone_name')
- zone_class = args.get('zone_class')
- if zone_name and zone_class:
- logger.info(XFROUT_NOTIFY_COMMAND, zone_name, zone_class)
- self.send_notify(zone_name, zone_class)
- answer = create_answer(0)
- else:
- answer = create_answer(1, "Bad command parameter:" + str(args))
- else:
- answer = create_answer(1, "Unknown command:" + str(cmd))
- return answer
- def run(self):
- '''Get and process all commands sent from cfgmgr or other modules. '''
- while not self._shutdown_event.is_set():
- self._cc.check_command(False)
- xfrout_server = None
- def signal_handler(signal, frame):
- if xfrout_server:
- xfrout_server.shutdown()
- sys.exit(0)
- def set_signal_handler():
- signal.signal(signal.SIGTERM, signal_handler)
- signal.signal(signal.SIGINT, signal_handler)
- def set_cmd_options(parser):
- parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
- help="display more about what is going on")
- if '__main__' == __name__:
- try:
- parser = OptionParser()
- set_cmd_options(parser)
- (options, args) = parser.parse_args()
- VERBOSE_MODE = options.verbose
- set_signal_handler()
- xfrout_server = XfroutServer()
- xfrout_server.run()
- except KeyboardInterrupt:
- logger.INFO(XFROUT_STOPPED_BY_KEYBOARD)
- except SessionError as e:
- logger.error(XFROUT_CC_SESSION_ERROR, str(e))
- except ModuleCCSessionError as e:
- logger.error(XFROUT_MODULECC_SESSION_ERROR, str(e))
- except XfroutConfigError as e:
- logger.error(XFROUT_CONFIG_ERROR, str(e))
- except SessionTimeout as e:
- logger.error(XFROUT_CC_SESSION_TIMEOUT_ERROR)
- if xfrout_server:
- xfrout_server.shutdown()
|