12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049 |
- #!@PYTHON@
- # Copyright (C) 2010 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- import sys; sys.path.append ('@@PYTHONPATH@@')
- import isc
- import isc.cc
- import threading
- import struct
- import signal
- from isc.datasrc import DataSourceClient, ZoneFinder, ZoneJournalReader
- from socketserver import *
- import os
- from isc.config.ccsession import *
- from isc.cc import SessionError, SessionTimeout
- from isc.notify import notify_out
- import isc.util.process
- import socket
- import select
- import errno
- from optparse import OptionParser, OptionValueError
- from isc.util import socketserver_mixin
- from isc.log_messages.xfrout_messages import *
- isc.log.init("b10-xfrout")
- logger = isc.log.Logger("xfrout")
- DBG_XFROUT_TRACE = logger.DBGLVL_TRACE_BASIC
- try:
- from libutil_io_python import *
- from pydnspp import *
- except ImportError as e:
- # C++ loadable module may not be installed; even so the xfrout process
- # must keep running, so we warn about it and move forward.
- logger.error(XFROUT_IMPORT, str(e))
- from isc.acl.acl import ACCEPT, REJECT, DROP, LoaderError
- from isc.acl.dns import REQUEST_LOADER
- isc.util.process.rename()
- class XfroutConfigError(Exception):
- """An exception indicating an error in updating xfrout configuration.
- This exception is raised when the xfrout process encouters an error in
- handling configuration updates. Not all syntax error can be caught
- at the module-CC layer, so xfrout needs to (explicitly or implicitly)
- validate the given configuration data itself. When it finds an error
- it raises this exception (either directly or by converting an exception
- from other modules) as a unified error in configuration.
- """
- pass
- class XfroutSessionError(Exception):
- '''An exception raised for some unexpected events during an xfrout session.
- '''
- pass
- def init_paths():
- global SPECFILE_PATH
- global AUTH_SPECFILE_PATH
- global UNIX_SOCKET_FILE
- if "B10_FROM_BUILD" in os.environ:
- SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/xfrout"
- AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth"
- if "B10_FROM_SOURCE_LOCALSTATEDIR" in os.environ:
- UNIX_SOCKET_FILE = os.environ["B10_FROM_SOURCE_LOCALSTATEDIR"] + \
- "/auth_xfrout_conn"
- else:
- UNIX_SOCKET_FILE = os.environ["B10_FROM_BUILD"] + "/auth_xfrout_conn"
- else:
- PREFIX = "@prefix@"
- DATAROOTDIR = "@datarootdir@"
- SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
- AUTH_SPECFILE_PATH = SPECFILE_PATH
- if "BIND10_XFROUT_SOCKET_FILE" in os.environ:
- UNIX_SOCKET_FILE = os.environ["BIND10_XFROUT_SOCKET_FILE"]
- else:
- UNIX_SOCKET_FILE = "@@LOCALSTATEDIR@@/@PACKAGE_NAME@/auth_xfrout_conn"
- init_paths()
- SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec"
- AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + os.sep + "auth.spec"
- VERBOSE_MODE = False
- XFROUT_DNS_HEADER_SIZE = 12 # protocol constant
- XFROUT_MAX_MESSAGE_SIZE = 65535 # ditto
- # borrowed from xfrin.py @ #1298. We should eventually unify it.
- def format_zone_str(zone_name, zone_class):
- """Helper function to format a zone name and class as a string of
- the form '<name>/<class>'.
- Parameters:
- zone_name (isc.dns.Name) name to format
- zone_class (isc.dns.RRClass) class to format
- """
- return zone_name.to_text(True) + '/' + str(zone_class)
- # borrowed from xfrin.py @ #1298.
- def format_addrinfo(addrinfo):
- """Helper function to format the addrinfo as a string of the form
- <addr>:<port> (for IPv4) or [<addr>]:port (for IPv6). For unix domain
- sockets, and unknown address families, it returns a basic string
- conversion of the third element of the passed tuple.
- Parameters:
- addrinfo: a 3-tuple consisting of address family, socket type, and,
- depending on the family, either a 2-tuple with the address
- and port, or a filename
- """
- try:
- if addrinfo[0] == socket.AF_INET:
- return str(addrinfo[2][0]) + ":" + str(addrinfo[2][1])
- elif addrinfo[0] == socket.AF_INET6:
- return "[" + str(addrinfo[2][0]) + "]:" + str(addrinfo[2][1])
- else:
- return str(addrinfo[2])
- except IndexError:
- raise TypeError("addrinfo argument to format_addrinfo() does not "
- "appear to be consisting of (family, socktype, (addr, port))")
- def get_rrset_len(rrset):
- """Returns the wire length of the given RRset"""
- bytes = bytearray()
- rrset.to_wire(bytes)
- return len(bytes)
- def get_soa_serial(soa_rdata):
- '''Extract the serial field of an SOA RDATA and returns it as an Serial object.
- '''
- return Serial(int(soa_rdata.to_text().split()[2]))
- class XfroutSession():
- def __init__(self, sock_fd, request_data, server, tsig_key_ring, remote,
- default_acl, zone_config, client_class=DataSourceClient):
- self._sock_fd = sock_fd
- self._request_data = request_data
- self._server = server
- self._tsig_key_ring = tsig_key_ring
- self._tsig_ctx = None
- self._tsig_len = 0
- self._remote = remote
- self._request_type = None
- self._request_typestr = None
- self._acl = default_acl
- self._zone_config = zone_config
- self.ClientClass = client_class # parameterize this for testing
- self._soa = None # will be set in _xfrout_setup or in tests
- self._jnl_reader = None # will be set to a reader for IXFR
- self._handle()
- def create_tsig_ctx(self, tsig_record, tsig_key_ring):
- return TSIGContext(tsig_record.get_name(), tsig_record.get_rdata().get_algorithm(),
- tsig_key_ring)
- def _handle(self):
- ''' Handle a xfrout query, send xfrout response(s).
- This is separated from the constructor so that we can override
- it from tests.
- '''
- # Check the xfrout quota. We do both increase/decrease in this
- # method so it's clear we always release it once acuired.
- quota_ok = self._server.increase_transfers_counter()
- ex = None
- try:
- self.dns_xfrout_start(self._sock_fd, self._request_data, quota_ok)
- except Exception as e:
- # To avoid resource leak we need catch all possible exceptions
- # We log it later to exclude the case where even logger raises
- # an exception.
- ex = e
- # Release any critical resources
- if quota_ok:
- self._server.decrease_transfers_counter()
- self._close_socket()
- if ex is not None:
- logger.error(XFROUT_HANDLE_QUERY_ERROR, ex)
- def _close_socket(self):
- '''Simply close the socket via the given FD.
- This is a dedicated subroutine of handle() and is sepsarated from it
- for the convenience of tests.
- '''
- os.close(self._sock_fd)
- def _check_request_tsig(self, msg, request_data):
- ''' If request has a tsig record, perform tsig related checks '''
- tsig_record = msg.get_tsig_record()
- if tsig_record is not None:
- self._tsig_len = tsig_record.get_length()
- self._tsig_ctx = self.create_tsig_ctx(tsig_record,
- self._tsig_key_ring)
- tsig_error = self._tsig_ctx.verify(tsig_record, request_data)
- if tsig_error != TSIGError.NOERROR:
- return Rcode.NOTAUTH()
- return Rcode.NOERROR()
- def _parse_query_message(self, mdata):
- ''' parse query message to [socket,message]'''
- #TODO, need to add parseHeader() in case the message header is invalid
- try:
- msg = Message(Message.PARSE)
- Message.from_wire(msg, mdata)
- except Exception as err: # Exception is too broad
- logger.error(XFROUT_PARSE_QUERY_ERROR, err)
- return Rcode.FORMERR(), None
- # TSIG related checks
- rcode = self._check_request_tsig(msg, mdata)
- if rcode != Rcode.NOERROR():
- return rcode, msg
- # Make sure the question is valid. This should be ensured by
- # the auth server, but since it's far from xfrout itself, we check
- # it by ourselves. A viloation would be an internal bug, so we
- # raise and stop here rather than returning a FORMERR or SERVFAIL.
- if msg.get_rr_count(Message.SECTION_QUESTION) != 1:
- raise RuntimeError('Invalid number of question for XFR: ' +
- str(msg.get_rr_count(Message.SECTION_QUESTION)))
- question = msg.get_question()[0]
- # Identify the request type
- self._request_type = question.get_type()
- if self._request_type == RRType.AXFR():
- self._request_typestr = 'AXFR'
- elif self._request_type == RRType.IXFR():
- self._request_typestr = 'IXFR'
- else:
- # Likewise, this should be impossible.
- raise RuntimeError('Unexpected XFR type: ' +
- str(self._request_type))
- # ACL checks
- zone_name = question.get_name()
- zone_class = question.get_class()
- acl = self._get_transfer_acl(zone_name, zone_class)
- acl_result = acl.execute(
- isc.acl.dns.RequestContext(self._remote[2], msg.get_tsig_record()))
- if acl_result == DROP:
- logger.debug(DBG_XFROUT_TRACE, XFROUT_QUERY_DROPPED,
- self._request_type, format_addrinfo(self._remote),
- format_zone_str(zone_name, zone_class))
- return None, None
- elif acl_result == REJECT:
- logger.debug(DBG_XFROUT_TRACE, XFROUT_QUERY_REJECTED,
- self._request_type, format_addrinfo(self._remote),
- format_zone_str(zone_name, zone_class))
- return Rcode.REFUSED(), msg
- return rcode, msg
- def _get_transfer_acl(self, zone_name, zone_class):
- '''Return the ACL that should be applied for a given zone.
- The zone is identified by a tuple of name and RR class.
- If a per zone configuration for the zone exists and contains
- transfer_acl, that ACL will be used; otherwise, the default
- ACL will be used.
- '''
- # Internally zone names are managed in lower cased label characters,
- # so we first need to convert the name.
- zone_name_lower = Name(zone_name.to_text(), True)
- config_key = (zone_class.to_text(), zone_name_lower.to_text())
- if config_key in self._zone_config and \
- 'transfer_acl' in self._zone_config[config_key]:
- return self._zone_config[config_key]['transfer_acl']
- return self._acl
- def _send_data(self, sock_fd, data):
- size = len(data)
- total_count = 0
- while total_count < size:
- count = os.write(sock_fd, data[total_count:])
- total_count += count
- def _send_message(self, sock_fd, msg, tsig_ctx=None):
- render = MessageRenderer()
- # As defined in RFC5936 section3.4, perform case-preserving name
- # compression for AXFR message.
- render.set_compress_mode(MessageRenderer.CASE_SENSITIVE)
- render.set_length_limit(XFROUT_MAX_MESSAGE_SIZE)
- # XXX Currently, python wrapper doesn't accept 'None' parameter in this case,
- # we should remove the if statement and use a universal interface later.
- if tsig_ctx is not None:
- msg.to_wire(render, tsig_ctx)
- else:
- msg.to_wire(render)
- header_len = struct.pack('H', socket.htons(render.get_length()))
- self._send_data(sock_fd, header_len)
- self._send_data(sock_fd, render.get_data())
- def _reply_query_with_error_rcode(self, msg, sock_fd, rcode_):
- if not msg:
- return # query message is invalid. send nothing back.
- msg.make_response()
- msg.set_rcode(rcode_)
- self._send_message(sock_fd, msg, self._tsig_ctx)
- def _get_zone_soa(self, zone_name):
- '''Retrieve the SOA RR of the given zone.
- It returns a pair of RCODE and the SOA (in the form of RRset).
- On success RCODE is NOERROR and returned SOA is not None;
- on failure RCODE indicates the appropriate code in the context of
- xfr processing, and the returned SOA is None.
- '''
- result, finder = self._datasrc_client.find_zone(zone_name)
- if result != DataSourceClient.SUCCESS:
- return (Rcode.NOTAUTH(), None)
- result, soa_rrset, _ = finder.find(zone_name, RRType.SOA())
- if result != ZoneFinder.SUCCESS:
- return (Rcode.SERVFAIL(), None)
- # Especially for database-based zones, a working zone may be in
- # a broken state where it has more than one SOA RR. We proactively
- # check the condition and abort the xfr attempt if we identify it.
- if soa_rrset.get_rdata_count() != 1:
- return (Rcode.SERVFAIL(), None)
- return (Rcode.NOERROR(), soa_rrset)
- def __axfr_setup(self, zone_name):
- '''Setup a zone iterator for AXFR or AXFR-style IXFR.
- '''
- try:
- # Note that we enable 'separate_rrs'. In xfr-out we need to
- # preserve as many things as possible (even if it's half broken)
- # stored in the zone.
- self._iterator = self._datasrc_client.get_iterator(zone_name,
- True)
- except isc.datasrc.Error:
- # If the current name server does not have authority for the
- # zone, xfrout can't serve for it, return rcode NOTAUTH.
- # Note: this exception can happen for other reasons. We should
- # update get_iterator() API so that we can distinguish "no such
- # zone" and other cases (#1373). For now we consider all these
- # cases as NOTAUTH.
- return Rcode.NOTAUTH()
- # If we are an authoritative name server for the zone, but fail
- # to find the zone's SOA record in datasource, xfrout can't
- # provide zone transfer for it.
- self._soa = self._iterator.get_soa()
- if self._soa is None or self._soa.get_rdata_count() != 1:
- return Rcode.SERVFAIL()
- return Rcode.NOERROR()
- def __ixfr_setup(self, request_msg, zone_name, zone_class):
- '''Setup a zone journal reader for IXFR.
- If the underlying data source does not know the requested range
- of zone differences it automatically falls back to AXFR-style
- IXFR by setting up a zone iterator instead of a journal reader.
- '''
- # Check the authority section. Look for a SOA record with
- # the same name and class as the question.
- remote_soa = None
- for auth_rrset in request_msg.get_section(Message.SECTION_AUTHORITY):
- # Ignore data whose owner name is not the zone apex, and
- # ignore non-SOA or different class of records.
- if auth_rrset.get_name() != zone_name or \
- auth_rrset.get_type() != RRType.SOA() or \
- auth_rrset.get_class() != zone_class:
- continue
- if auth_rrset.get_rdata_count() != 1:
- logger.info(XFROUT_IXFR_MULTIPLE_SOA,
- format_addrinfo(self._remote))
- return Rcode.FORMERR()
- remote_soa = auth_rrset
- if remote_soa is None:
- logger.info(XFROUT_IXFR_NO_SOA, format_addrinfo(self._remote))
- return Rcode.FORMERR()
- # Retrieve the local SOA
- rcode, self._soa = self._get_zone_soa(zone_name)
- if rcode != Rcode.NOERROR():
- return rcode
- # RFC1995 says "If an IXFR query with the same or newer version
- # number than that of the server is received, it is replied to with
- # a single SOA record of the server's current version, just as
- # in AXFR". The claim about AXFR is incorrect, but other than that,
- # we do as the RFC says.
- begin_serial = get_soa_serial(remote_soa.get_rdata()[0])
- end_serial = get_soa_serial(self._soa.get_rdata()[0])
- if begin_serial >= end_serial:
- # clear both iterator and jnl_reader to signal we won't do
- # iteration in response generation
- self._iterator = None
- self._jnl_reader = None
- logger.info(XFROUT_IXFR_UPTODATE, format_addrinfo(self._remote),
- format_zone_str(zone_name, zone_class),
- begin_serial, end_serial)
- return Rcode.NOERROR()
- # Set up the journal reader or fall back to AXFR-style IXFR
- try:
- code, self._jnl_reader = self._datasrc_client.get_journal_reader(
- zone_name, begin_serial.get_value(), end_serial.get_value())
- except isc.datasrc.NotImplemented as ex:
- # The underlying data source doesn't support journaling.
- # Fall back to AXFR-style IXFR.
- logger.info(XFROUT_IXFR_NO_JOURNAL_SUPPORT,
- format_addrinfo(self._remote),
- format_zone_str(zone_name, zone_class))
- return self.__axfr_setup(zone_name)
- if code == ZoneJournalReader.NO_SUCH_VERSION:
- logger.info(XFROUT_IXFR_NO_VERSION, format_addrinfo(self._remote),
- format_zone_str(zone_name, zone_class),
- begin_serial, end_serial)
- return self.__axfr_setup(zone_name)
- if code == ZoneJournalReader.NO_SUCH_ZONE:
- # this is quite unexpected as we know zone's SOA exists.
- # It might be a bug or the data source is somehow broken,
- # but it can still happen if someone has removed the zone
- # between these two operations. We treat it as NOTAUTH.
- logger.warn(XFROUT_IXFR_NO_ZONE, format_addrinfo(self._remote),
- format_zone_str(zone_name, zone_class))
- return Rcode.NOTAUTH()
- # Use the reader as the iterator to generate the response.
- self._iterator = self._jnl_reader
- return Rcode.NOERROR()
- def _xfrout_setup(self, request_msg, zone_name, zone_class):
- '''Setup a context for xfr responses according to the request type.
- This method identifies the most appropriate data source for the
- request and set up a zone iterator or journal reader depending on
- whether the request is AXFR or IXFR. If it identifies any protocol
- level error it returns an RCODE other than NOERROR.
- '''
- # Identify the data source for the requested zone and see if it has
- # SOA while initializing objects used for request processing later.
- # We should eventually generalize this so that we can choose the
- # appropriate data source from (possible) multiple candidates.
- # We should eventually take into account the RR class here.
- # For now, we hardcode a particular type (SQLite3-based), and only
- # consider that one.
- datasrc_config = '{ "database_file": "' + \
- self._server.get_db_file() + '"}'
- self._datasrc_client = self.ClientClass('sqlite3', datasrc_config)
- if self._request_type == RRType.AXFR():
- return self.__axfr_setup(zone_name)
- else:
- return self.__ixfr_setup(request_msg, zone_name, zone_class)
- def dns_xfrout_start(self, sock_fd, msg_query, quota_ok=True):
- rcode_, msg = self._parse_query_message(msg_query)
- #TODO. create query message and parse header
- if rcode_ is None: # Dropped by ACL
- return
- elif rcode_ == Rcode.NOTAUTH() or rcode_ == Rcode.REFUSED():
- return self._reply_query_with_error_rcode(msg, sock_fd, rcode_)
- elif rcode_ != Rcode.NOERROR():
- return self._reply_query_with_error_rcode(msg, sock_fd,
- Rcode.FORMERR())
- elif not quota_ok:
- logger.warn(XFROUT_QUERY_QUOTA_EXCCEEDED, self._request_typestr,
- format_addrinfo(self._remote),
- self._server._max_transfers_out)
- return self._reply_query_with_error_rcode(msg, sock_fd,
- Rcode.REFUSED())
- question = msg.get_question()[0]
- zone_name = question.get_name()
- zone_class = question.get_class()
- zone_str = format_zone_str(zone_name, zone_class) # for logging
- try:
- rcode_ = self._xfrout_setup(msg, zone_name, zone_class)
- except Exception as ex:
- logger.error(XFROUT_XFR_TRANSFER_CHECK_ERROR, self._request_typestr,
- format_addrinfo(self._remote), zone_str, ex)
- rcode_ = Rcode.SERVFAIL()
- if rcode_ != Rcode.NOERROR():
- logger.info(XFROUT_XFR_TRANSFER_FAILED, self._request_typestr,
- format_addrinfo(self._remote), zone_str, rcode_)
- return self._reply_query_with_error_rcode(msg, sock_fd, rcode_)
- try:
- logger.info(XFROUT_XFR_TRANSFER_STARTED, self._request_typestr,
- format_addrinfo(self._remote), zone_str)
- self._reply_xfrout_query(msg, sock_fd)
- except Exception as err:
- logger.error(XFROUT_XFR_TRANSFER_ERROR, self._request_typestr,
- format_addrinfo(self._remote), zone_str, err)
- logger.info(XFROUT_XFR_TRANSFER_DONE, self._request_typestr,
- format_addrinfo(self._remote), zone_str)
- def _clear_message(self, msg):
- qid = msg.get_qid()
- opcode = msg.get_opcode()
- rcode = msg.get_rcode()
- msg.clear(Message.RENDER)
- msg.set_qid(qid)
- msg.set_opcode(opcode)
- msg.set_rcode(rcode)
- msg.set_header_flag(Message.HEADERFLAG_AA)
- msg.set_header_flag(Message.HEADERFLAG_QR)
- return msg
- def _send_message_with_last_soa(self, msg, sock_fd, rrset_soa,
- message_upper_len):
- '''Add the SOA record to the end of message.
- If it would exceed the maximum allowable size of a message, a new
- message will be created to send out the last SOA.
- We assume a message with a single SOA can always fit the buffer
- with or without TSIG. In theory this could be wrong if TSIG is
- stupidly large, but in practice this assumption should be reasonable.
- '''
- if message_upper_len + get_rrset_len(rrset_soa) > \
- XFROUT_MAX_MESSAGE_SIZE:
- self._send_message(sock_fd, msg, self._tsig_ctx)
- msg = self._clear_message(msg)
- msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
- self._send_message(sock_fd, msg, self._tsig_ctx)
- def _reply_xfrout_query(self, msg, sock_fd):
- msg.make_response()
- msg.set_header_flag(Message.HEADERFLAG_AA)
- # Reserved space for the fixed header size, the size of the question
- # section, and TSIG size (when included). The size of the question
- # section is the sum of the qname length and the size of the
- # fixed-length fields (type and class, 2 bytes each).
- message_upper_len = XFROUT_DNS_HEADER_SIZE + \
- msg.get_question()[0].get_name().get_length() + 4 + \
- self._tsig_len
- # If the iterator is None, we are responding to IXFR with a single
- # SOA RR.
- if self._iterator is None:
- self._send_message_with_last_soa(msg, sock_fd, self._soa,
- message_upper_len)
- return
- # Add the beginning SOA
- msg.add_rrset(Message.SECTION_ANSWER, self._soa)
- message_upper_len += get_rrset_len(self._soa)
- # Add the rest of the zone/diff contets
- for rrset in self._iterator:
- # Check if xfrout is shutdown
- if self._server._shutdown_event.is_set():
- logger.info(XFROUT_STOPPING)
- return
- # For AXFR (or AXFR-style IXFR), in which case _jnl_reader is None,
- # we should skip SOAs from the iterator.
- if self._jnl_reader is None and rrset.get_type() == RRType.SOA():
- continue
- # We calculate the maximum size of the RRset (i.e. the
- # size without compression) and use that to see if we
- # may have reached the limit
- rrset_len = get_rrset_len(rrset)
- if message_upper_len + rrset_len <= XFROUT_MAX_MESSAGE_SIZE:
- msg.add_rrset(Message.SECTION_ANSWER, rrset)
- message_upper_len += rrset_len
- continue
- # RR would not fit. If there are other RRs in the buffer, send
- # them now and leave this RR to the next message.
- self._send_message(sock_fd, msg, self._tsig_ctx)
- # Create a new message and reserve space for the carried-over
- # RR (and TSIG space in case it's to be TSIG signed)
- msg = self._clear_message(msg)
- message_upper_len = XFROUT_DNS_HEADER_SIZE + rrset_len + \
- self._tsig_len
- # If this RR overflows the buffer all by itself, fail. In theory
- # some RRs might fit in a TCP message when compressed even if they
- # do not fit when uncompressed, but surely we don't want to send
- # such monstrosities to an unsuspecting slave.
- if message_upper_len > XFROUT_MAX_MESSAGE_SIZE:
- raise XfroutSessionError('RR too large for zone transfer (' +
- str(rrset_len) + ' bytes)')
- # Add the RRset to the new message
- msg.add_rrset(Message.SECTION_ANSWER, rrset)
- # Add and send the trailing SOA
- self._send_message_with_last_soa(msg, sock_fd, self._soa,
- message_upper_len)
- class UnixSockServer(socketserver_mixin.NoPollMixIn,
- ThreadingUnixStreamServer):
- '''The unix domain socket server which accept xfr query sent from auth server.'''
- def __init__(self, sock_file, handle_class, shutdown_event, config_data,
- cc):
- self._remove_unused_sock_file(sock_file)
- self._sock_file = sock_file
- socketserver_mixin.NoPollMixIn.__init__(self)
- ThreadingUnixStreamServer.__init__(self, sock_file, handle_class)
- self._shutdown_event = shutdown_event
- self._write_sock, self._read_sock = socket.socketpair()
- self._common_init()
- self._cc = cc
- self.update_config_data(config_data)
- def _common_init(self):
- '''Initialization shared with the mock server class used for tests'''
- self._lock = threading.Lock()
- self._transfers_counter = 0
- self._zone_config = {}
- self._acl = None # this will be initialized in update_config_data()
- def _receive_query_message(self, sock):
- ''' receive request message from sock'''
- # receive data length
- data_len = sock.recv(2)
- if not data_len:
- return None
- msg_len = struct.unpack('!H', data_len)[0]
- # receive data
- recv_size = 0
- msgdata = b''
- while recv_size < msg_len:
- data = sock.recv(msg_len - recv_size)
- if not data:
- return None
- recv_size += len(data)
- msgdata += data
- return msgdata
- def handle_request(self):
- ''' Enable server handle a request until shutdown or auth is closed.'''
- try:
- request, client_address = self.get_request()
- except socket.error:
- logger.error(XFROUT_FETCH_REQUEST_ERROR)
- return
- # Check self._shutdown_event to ensure the real shutdown comes.
- # Linux could trigger a spurious readable event on the _read_sock
- # due to a bug, so we need perform a double check.
- while not self._shutdown_event.is_set(): # Check if xfrout is shutdown
- try:
- (rlist, wlist, xlist) = select.select([self._read_sock, request], [], [])
- except select.error as e:
- if e.args[0] == errno.EINTR:
- (rlist, wlist, xlist) = ([], [], [])
- continue
- else:
- logger.error(XFROUT_SOCKET_SELECT_ERROR, str(e))
- break
- # self.server._shutdown_event will be set by now, if it is not a false
- # alarm
- if self._read_sock in rlist:
- continue
- try:
- self.process_request(request)
- except Exception as pre:
- logger.error(XFROUT_PROCESS_REQUEST_ERROR, str(pre))
- break
- def _handle_request_noblock(self):
- """Override the function _handle_request_noblock(), it creates a new
- thread to handle requests for each auth"""
- td = threading.Thread(target=self.handle_request)
- td.setDaemon(True)
- td.start()
- def process_request(self, request):
- """Receive socket fd and query message from auth, then
- start a new thread to process the request."""
- sock_fd = recv_fd(request.fileno())
- if sock_fd < 0:
- # This may happen when one xfrout process try to connect to
- # xfrout unix socket server, to check whether there is another
- # xfrout running.
- if sock_fd == FD_SYSTEM_ERROR:
- logger.error(XFROUT_RECEIVE_FILE_DESCRIPTOR_ERROR)
- return
- # receive request msg
- request_data = self._receive_query_message(request)
- if not request_data:
- return
- t = threading.Thread(target=self.finish_request,
- args = (sock_fd, request_data))
- if self.daemon_threads:
- t.daemon = True
- t.start()
- def _guess_remote(self, sock_fd):
- """Guess remote address and port of the socket.
- The sock_fd must be a file descriptor of a socket.
- This method retuns a 3-tuple consisting of address family,
- socket type, and a 2-tuple with the address (string) and port (int).
- """
- # This uses a trick. If the socket is IPv4 in reality and we pretend
- # it to be IPv6, it returns IPv4 address anyway. This doesn't seem
- # to care about the SOCK_STREAM parameter at all (which it really is,
- # except for testing)
- if socket.has_ipv6:
- sock = socket.fromfd(sock_fd, socket.AF_INET6, socket.SOCK_STREAM)
- else:
- # To make it work even on hosts without IPv6 support
- # (Any idea how to simulate this in test?)
- sock = socket.fromfd(sock_fd, socket.AF_INET, socket.SOCK_STREAM)
- peer = sock.getpeername()
- # Identify the correct socket family. Due to the above "trick",
- # we cannot simply use sock.family.
- family = socket.AF_INET6
- try:
- socket.inet_pton(socket.AF_INET6, peer[0])
- except socket.error:
- family = socket.AF_INET
- return (family, socket.SOCK_STREAM, peer)
- def finish_request(self, sock_fd, request_data):
- '''Finish one request by instantiating RequestHandlerClass.
- This is an entry point of a separate thread spawned in
- UnixSockServer.process_request().
- This method creates a XfroutSession object.
- '''
- self._lock.acquire()
- acl = self._acl
- zone_config = self._zone_config
- self._lock.release()
- self.RequestHandlerClass(sock_fd, request_data, self,
- self.tsig_key_ring,
- self._guess_remote(sock_fd), acl, zone_config)
- def _remove_unused_sock_file(self, sock_file):
- '''Try to remove the socket file. If the file is being used
- by one running xfrout process, exit from python.
- If it's not a socket file or nobody is listening
- , it will be removed. If it can't be removed, exit from python. '''
- if self._sock_file_in_use(sock_file):
- logger.error(XFROUT_UNIX_SOCKET_FILE_IN_USE, sock_file)
- sys.exit(0)
- else:
- if not os.path.exists(sock_file):
- return
- try:
- os.unlink(sock_file)
- except OSError as err:
- logger.error(XFROUT_REMOVE_OLD_UNIX_SOCKET_FILE_ERROR, sock_file, str(err))
- sys.exit(0)
- def _sock_file_in_use(self, sock_file):
- '''Check whether the socket file 'sock_file' exists and
- is being used by one running xfrout process. If it is,
- return True, or else return False. '''
- try:
- sock = socket.socket(socket.AF_UNIX)
- sock.connect(sock_file)
- except socket.error as err:
- return False
- else:
- return True
- def shutdown(self):
- self._write_sock.send(b"shutdown") #terminate the xfrout session thread
- super().shutdown() # call the shutdown() of class socketserver_mixin.NoPollMixIn
- try:
- os.unlink(self._sock_file)
- except Exception as e:
- logger.error(XFROUT_REMOVE_UNIX_SOCKET_FILE_ERROR, self._sock_file, str(e))
- def update_config_data(self, new_config):
- '''Apply the new config setting of xfrout module.
- '''
- self._lock.acquire()
- try:
- logger.info(XFROUT_NEW_CONFIG)
- new_acl = self._acl
- if 'transfer_acl' in new_config:
- try:
- new_acl = REQUEST_LOADER.load(new_config['transfer_acl'])
- except LoaderError as e:
- raise XfroutConfigError('Failed to parse transfer_acl: ' +
- str(e))
- new_zone_config = self._zone_config
- zconfig_data = new_config.get('zone_config')
- if zconfig_data is not None:
- new_zone_config = self.__create_zone_config(zconfig_data)
- self._acl = new_acl
- self._zone_config = new_zone_config
- self._max_transfers_out = new_config.get('transfers_out')
- self.set_tsig_key_ring(new_config.get('tsig_key_ring'))
- except Exception as e:
- self._lock.release()
- raise e
- self._lock.release()
- logger.info(XFROUT_NEW_CONFIG_DONE)
- def __create_zone_config(self, zone_config_list):
- new_config = {}
- for zconf in zone_config_list:
- # convert the class, origin (name) pair. First build pydnspp
- # object to reject invalid input.
- zclass_str = zconf.get('class')
- if zclass_str is None:
- #zclass_str = 'IN' # temporary
- zclass_str = self._cc.get_default_value('zone_config/class')
- zclass = RRClass(zclass_str)
- zorigin = Name(zconf['origin'], True)
- config_key = (zclass.to_text(), zorigin.to_text())
- # reject duplicate config
- if config_key in new_config:
- raise XfroutConfigError('Duplicate zone_config for ' +
- str(zorigin) + '/' + str(zclass))
- # create a new config entry, build any given (and known) config
- new_config[config_key] = {}
- if 'transfer_acl' in zconf:
- try:
- new_config[config_key]['transfer_acl'] = \
- REQUEST_LOADER.load(zconf['transfer_acl'])
- except LoaderError as e:
- raise XfroutConfigError('Failed to parse transfer_acl ' +
- 'for ' + zorigin.to_text() + '/' +
- zclass_str + ': ' + str(e))
- return new_config
- def set_tsig_key_ring(self, key_list):
- """Set the tsig_key_ring , given a TSIG key string list representation. """
- # XXX add values to configure zones/tsig options
- self.tsig_key_ring = TSIGKeyRing()
- # If key string list is empty, create a empty tsig_key_ring
- if not key_list:
- return
- for key_item in key_list:
- try:
- self.tsig_key_ring.add(TSIGKey(key_item))
- except InvalidParameter as ipe:
- logger.error(XFROUT_BAD_TSIG_KEY_STRING, str(key_item))
- def get_db_file(self):
- file, is_default = self._cc.get_remote_config_value("Auth", "database_file")
- # this too should be unnecessary, but currently the
- # 'from build' override isn't stored in the config
- # (and we don't have indirect python access to datasources yet)
- if is_default and "B10_FROM_BUILD" in os.environ:
- file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3"
- return file
- def increase_transfers_counter(self):
- '''Return False, if counter + 1 > max_transfers_out, or else
- return True
- '''
- ret = False
- self._lock.acquire()
- if self._transfers_counter < self._max_transfers_out:
- self._transfers_counter += 1
- ret = True
- self._lock.release()
- return ret
- def decrease_transfers_counter(self):
- self._lock.acquire()
- self._transfers_counter -= 1
- self._lock.release()
- class XfroutServer:
- def __init__(self):
- self._unix_socket_server = None
- self._listen_sock_file = UNIX_SOCKET_FILE
- self._shutdown_event = threading.Event()
- self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
- self._config_data = self._cc.get_full_config()
- self._cc.start()
- self._cc.add_remote_config(AUTH_SPECFILE_LOCATION);
- self._start_xfr_query_listener()
- self._start_notifier()
- def _start_xfr_query_listener(self):
- '''Start a new thread to accept xfr query. '''
- self._unix_socket_server = UnixSockServer(self._listen_sock_file,
- XfroutSession,
- self._shutdown_event,
- self._config_data,
- self._cc)
- listener = threading.Thread(target=self._unix_socket_server.serve_forever)
- listener.start()
- def _start_notifier(self):
- datasrc = self._unix_socket_server.get_db_file()
- self._notifier = notify_out.NotifyOut(datasrc)
- self._notifier.dispatcher()
- def send_notify(self, zone_name, zone_class):
- self._notifier.send_notify(zone_name, zone_class)
- def config_handler(self, new_config):
- '''Update config data. TODO. Do error check'''
- answer = create_answer(0)
- for key in new_config:
- if key not in self._config_data:
- answer = create_answer(1, "Unknown config data: " + str(key))
- continue
- self._config_data[key] = new_config[key]
- if self._unix_socket_server:
- try:
- self._unix_socket_server.update_config_data(self._config_data)
- except Exception as e:
- answer = create_answer(1,
- "Failed to handle new configuration: " +
- str(e))
- return answer
- def shutdown(self):
- ''' shutdown the xfrout process. The thread which is doing zone transfer-out should be
- terminated.
- '''
- global xfrout_server
- xfrout_server = None #Avoid shutdown is called twice
- self._shutdown_event.set()
- self._notifier.shutdown()
- if self._unix_socket_server:
- self._unix_socket_server.shutdown()
- # Wait for all threads to terminate
- main_thread = threading.currentThread()
- for th in threading.enumerate():
- if th is main_thread:
- continue
- th.join()
- def command_handler(self, cmd, args):
- if cmd == "shutdown":
- logger.info(XFROUT_RECEIVED_SHUTDOWN_COMMAND)
- self.shutdown()
- answer = create_answer(0)
- elif cmd == notify_out.ZONE_NEW_DATA_READY_CMD:
- zone_name = args.get('zone_name')
- zone_class = args.get('zone_class')
- if zone_name and zone_class:
- logger.info(XFROUT_NOTIFY_COMMAND, zone_name, zone_class)
- self.send_notify(zone_name, zone_class)
- answer = create_answer(0)
- else:
- answer = create_answer(1, "Bad command parameter:" + str(args))
- else:
- answer = create_answer(1, "Unknown command:" + str(cmd))
- return answer
- def run(self):
- '''Get and process all commands sent from cfgmgr or other modules. '''
- while not self._shutdown_event.is_set():
- self._cc.check_command(False)
- xfrout_server = None
- def signal_handler(signal, frame):
- if xfrout_server:
- xfrout_server.shutdown()
- sys.exit(0)
- def set_signal_handler():
- signal.signal(signal.SIGTERM, signal_handler)
- signal.signal(signal.SIGINT, signal_handler)
- def set_cmd_options(parser):
- parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
- help="display more about what is going on")
- if '__main__' == __name__:
- try:
- parser = OptionParser()
- set_cmd_options(parser)
- (options, args) = parser.parse_args()
- VERBOSE_MODE = options.verbose
- set_signal_handler()
- xfrout_server = XfroutServer()
- xfrout_server.run()
- except KeyboardInterrupt:
- logger.INFO(XFROUT_STOPPED_BY_KEYBOARD)
- except SessionError as e:
- logger.error(XFROUT_CC_SESSION_ERROR, str(e))
- except ModuleCCSessionError as e:
- logger.error(XFROUT_MODULECC_SESSION_ERROR, str(e))
- except XfroutConfigError as e:
- logger.error(XFROUT_CONFIG_ERROR, str(e))
- except SessionTimeout as e:
- logger.error(XFROUT_CC_SESSION_TIMEOUT_ERROR)
- if xfrout_server:
- xfrout_server.shutdown()
|