12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043 |
- #!@PYTHON@
- # Copyright (C) 2009-2011 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- import sys; sys.path.append ('@@PYTHONPATH@@')
- import os
- import signal
- import isc
- import asyncore
- import struct
- import threading
- import socket
- import random
- from optparse import OptionParser, OptionValueError
- from isc.config.ccsession import *
- from isc.notify import notify_out
- import isc.util.process
- from isc.datasrc import DataSourceClient, ZoneFinder
- import isc.net.parse
- from isc.xfrin.diff import Diff
- from isc.log_messages.xfrin_messages import *
- isc.log.init("b10-xfrin")
- logger = isc.log.Logger("xfrin")
- try:
- from pydnspp import *
- except ImportError as e:
- # C++ loadable module may not be installed; even so the xfrin process
- # must keep running, so we warn about it and move forward.
- logger.error(XFRIN_IMPORT_DNS, str(e))
- isc.util.process.rename()
- # If B10_FROM_BUILD is set in the environment, we use data files
- # from a directory relative to that, otherwise we use the ones
- # installed on the system
- if "B10_FROM_BUILD" in os.environ:
- SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/xfrin"
- AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth"
- else:
- PREFIX = "@prefix@"
- DATAROOTDIR = "@datarootdir@"
- SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
- AUTH_SPECFILE_PATH = SPECFILE_PATH
- SPECFILE_LOCATION = SPECFILE_PATH + "/xfrin.spec"
- AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + "/auth.spec"
- XFROUT_MODULE_NAME = 'Xfrout'
- ZONE_MANAGER_MODULE_NAME = 'Zonemgr'
- REFRESH_FROM_ZONEMGR = 'refresh_from_zonemgr'
- ZONE_XFRIN_FAILED = 'zone_xfrin_failed'
- # Constants for debug levels, to be removed when we have #1074.
- DBG_XFRIN_TRACE = 3
- # These two default are currently hard-coded. For config this isn't
- # necessary, but we need these defaults for optional command arguments
- # (TODO: have similar support to get default values for command
- # arguments as we do for config options)
- DEFAULT_MASTER_PORT = 53
- DEFAULT_ZONE_CLASS = RRClass.IN()
- __version__ = 'BIND10'
- # define xfrin rcode
- XFRIN_OK = 0
- XFRIN_FAIL = 1
- class XfrinException(Exception):
- pass
- class XfrinProtocolError(Exception):
- '''An exception raised for errors encountered in xfrin protocol handling.
- '''
- pass
- class XfrinZoneInfoException(Exception):
- """This exception is raised if there is an error in the given
- configuration (part), or when a command does not have a required
- argument or has bad arguments, for instance when the zone's master
- address is not a valid IP address, when the zone does not
- have a name, or when multiple settings are given for the same
- zone."""
- pass
- def _check_zone_name(zone_name_str):
- """Checks if the given zone name is a valid domain name, and returns
- it as a Name object. Raises an XfrinException if it is not."""
- try:
- # In the _zones dict, part of the key is the zone name,
- # but due to a limitation in the Name class, we
- # cannot directly use it as a dict key, and we use to_text()
- #
- # Downcase the name here for that reason.
- return Name(zone_name_str, True)
- except (EmptyLabel, TooLongLabel, BadLabelType, BadEscape,
- TooLongName, IncompleteName) as ne:
- raise XfrinZoneInfoException("bad zone name: " + zone_name_str + " (" + str(ne) + ")")
- def _check_zone_class(zone_class_str):
- """If the given argument is a string: checks if the given class is
- a valid one, and returns an RRClass object if so.
- Raises XfrinZoneInfoException if not.
- If it is None, this function returns the default RRClass.IN()"""
- if zone_class_str is None:
- return DEFAULT_ZONE_CLASS
- try:
- return RRClass(zone_class_str)
- except InvalidRRClass as irce:
- raise XfrinZoneInfoException("bad zone class: " + zone_class_str + " (" + str(irce) + ")")
- def get_soa_serial(soa_rdata):
- '''Extract the serial field of an SOA RDATA and returns it as an intger.
- We don't have to be very efficient here, so we first dump the entire RDATA
- as a string and convert the first corresponding field. This should be
- sufficient in practice, but may not always work when the MNAME or RNAME
- contains an (escaped) space character in their labels. Ideally there
- should be a more direct and convenient way to get access to the SOA
- fields.
- '''
- return int(soa_rdata.to_text().split()[2])
- class XfrinState:
- '''
- The states of the incomding *XFR state machine.
- '''
- def set_xfrstate(self, conn, new_state):
- '''Set the XfrConnection to a given new state
- As a "friend" class, this method intentionally gets access to the
- connection's "private" method.
- '''
- conn._XfrinConnection__set_xfrstate(new_state)
- class XfrinInitialSOA(XfrinState):
- def handle_rr(self, conn, rr):
- if rr.get_type() != RRType.SOA():
- raise XfrinProtocolError('First RR in zone transfer must be SOA ('
- + rr.get_type().to_text() + ' given)')
- conn._end_serial = get_soa_serial(rr.get_rdata()[0])
- # FIXME: we need to check the serial is actually greater than ours.
- # To do so, however, we need a way to find records from datasource.
- # Complete that part later as a separate task. (Always performing
- # xfr could be inefficient, but shouldn't do any harm otherwise)
- self.set_xfrstate(conn, XfrinFirstData())
- return True
- class XfrinFirstData(XfrinState):
- def handle_rr(self, conn, rr):
- # If the transfer begins with one SOA record, it is an AXFR,
- # if it begins with two SOAs (the serial of the second one being
- # equal to our serial), it is an IXFR.
- if conn._request_type == RRType.IXFR() and \
- rr.get_type() == RRType.SOA() and \
- conn._request_serial == get_soa_serial(rr.get_rdata()[0]):
- logger.debug(DBG_XFRIN_TRACE, XFRIN_GOT_INCREMENTAL_RESP,
- conn.zone_str())
- self.set_xfrstate(conn, XfrinIXFRDeleteSOA())
- else:
- logger.debug(DBG_XFRIN_TRACE, XFRIN_GOT_NONINCREMENTAL_RESP,
- conn.zone_str())
- self.set_xfrstate(conn, XfrinAXFR())
- return False # need to revisit this RR in an update context
- class XfrinIXFRDeleteSOA(XfrinState):
- def handle_rr(self, conn, rr):
- if rr.get_type() != RRType.SOA():
- # this shouldn't happen; should this occur it means an internal
- # bug.
- raise XfrinException(rr.get_type().to_text() + \
- ' RR is given in IXFRDeleteSOA state')
- conn._diff.remove_data(rr)
- self.set_xfrstate(conn, XfrinIXFRDelete())
- return True
- class XfrinIXFRDelete(XfrinState):
- def handle_rr(self, conn, rr):
- if rr.get_type() == RRType.SOA():
- # This is the only place where current_serial is set
- conn._current_serial = get_soa_serial(rr.get_rdata()[0])
- self.set_xfrstate(conn, XfrinIXFRAddSOA())
- return False
- conn._diff.remove_data(rr)
- return True
- class XfrinIXFRAddSOA(XfrinState):
- def handle_rr(self, conn, rr):
- if rr.get_type() != RRType.SOA():
- # this shouldn't happen; should this occur it means an internal
- # bug.
- raise XfrinException(rr.get_type().to_text() + \
- ' RR is given in IXFRAddSOA state')
- conn._diff.add_data(rr)
- self.set_xfrstate(conn, XfrinIXFRAdd())
- return True
- class XfrinIXFRAdd(XfrinState):
- def handle_rr(self, conn, rr):
- if rr.get_type() == RRType.SOA():
- soa_serial = get_soa_serial(rr.get_rdata()[0])
- if soa_serial == conn._end_serial:
- conn._diff.commit()
- self.set_xfrstate(conn, XfrinIXFREnd())
- return True
- elif soa_serial != conn._current_serial:
- raise XfrinProtocolError('IXFR out of sync: expected ' + \
- 'serial ' + \
- str(conn._current_serial) + \
- ', got ' + str(soa_serial))
- else:
- conn._diff.commit()
- self.set_xfrstate(conn, XfrinIXFRDeleteSOA())
- return False
- conn._diff.add_data(rr)
- return True
- class XfrinIXFREnd(XfrinState):
- def handle_rr(self, conn, rr):
- raise XfrinProtocolError('Extra data after the end of IXFR diffs: ' + \
- rr.to_text())
- class XfrinAXFR(XfrinState):
- def handle_rr(self, conn, rr):
- raise XfrinException('Falling back from IXFR to AXFR not ' + \
- 'supported yet')
- class XfrinConnection(asyncore.dispatcher):
- '''Do xfrin in this class. '''
- def __init__(self,
- sock_map, zone_name, rrclass, db_file, shutdown_event,
- master_addrinfo, tsig_key = None, verbose = False,
- idle_timeout = 60):
- ''' idle_timeout: max idle time for read data from socket.
- db_file: specify the data source file.
- check_soa: when it's true, check soa first before sending xfr query
- '''
- asyncore.dispatcher.__init__(self, map=sock_map)
- self.__state = None
- # Requested transfer type (RRType.AXFR or RRType.IXFR). The actual
- # transfer type may differ due to IXFR->AXFR fallback:
- self._request_type = None
- self._end_serial = None # essentially private
- # Zone parameters
- self._zone_name = zone_name
- self._rrclass = rrclass
- # Data source handlers
- self._db_file = db_file # temporary for sqlite3 specific code
- self._datasrc_client = self._get_datasrc_client(rrclass)
- self.create_socket(master_addrinfo[0], master_addrinfo[1])
- self._sock_map = sock_map
- self._soa_rr_count = 0
- self._idle_timeout = idle_timeout
- self.setblocking(1)
- self._shutdown_event = shutdown_event
- self._verbose = verbose
- self._master_address = master_addrinfo[2]
- self._tsig_key = tsig_key
- self._tsig_ctx = None
- # tsig_ctx_creator is introduced to allow tests to use a mock class for
- # easier tests (in normal case we always use the default)
- self._tsig_ctx_creator = self.__create_tsig_ctx
- def __create_tsig_ctx(self, key):
- return TSIGContext(key)
- def _get_datasrc_client(self, rrclass):
- # Create a client here once #1206 is done
- return None
- def __set_xfrstate(self, new_state):
- self.__state = new_state
- def get_xfrstate(self):
- return self.__state
- def zone_str(self):
- '''A convenient function for logging to include zone name and class'''
- return self._zone_name.to_text() + '/' + str(self._rrclass)
- def connect_to_master(self):
- '''Connect to master in TCP.'''
- try:
- self.connect(self._master_address)
- return True
- except socket.error as e:
- logger.error(XFRIN_CONNECT_MASTER, self._master_address, str(e))
- return False
- def _create_query(self, query_type):
- '''Create an XFR-related query message.
- query_type is either SOA, AXFR or IXFR. For type IXFR, it searches
- the associated data source for the current SOA record to include
- it in the query. If the corresponding zone or the SOA record
- cannot be found, it raises an XfrinException exception. Note that
- this may not necessarily a broken configuration; for the first attempt
- of transfer the secondary may not have any boot-strap zone
- information, in which case IXFR simply won't work. The xfrin
- should then fall back to AXFR. _request_serial is recorded for
- later use.
- '''
- msg = Message(Message.RENDER)
- query_id = random.randint(0, 0xFFFF)
- self._query_id = query_id
- msg.set_qid(query_id)
- msg.set_opcode(Opcode.QUERY())
- msg.set_rcode(Rcode.NOERROR())
- msg.add_question(Question(self._zone_name, self._rrclass, query_type))
- if query_type == RRType.IXFR():
- # get the zone finder. this must be SUCCESS (not even
- # PARTIALMATCH) because we are specifying the zone origin name.
- result, finder = self._datasrc_client.find_zone(self._zone_name)
- if result != DataSourceClient.SUCCESS:
- raise XfrinException('Zone not found in the given data ' +
- 'source: ' + self.zone_str())
- result, soa_rrset = finder.find(self._zone_name, RRType.SOA(),
- None, ZoneFinder.FIND_DEFAULT)
- if result != ZoneFinder.SUCCESS:
- raise XfrinException('SOA RR not found in zone: ' +
- self.zone_str())
- # Especially for database-based zones, working zones may be in
- # a broken state where it has more than one SOA RR. We proactively
- # check the condition and abort the xfr attempt if we identify it.
- if soa_rrset.get_rdata_count() != 1:
- raise XfrinException('Invalid number of SOA RRs for ' +
- self.zone_str() + ': ' +
- str(soa_rrset.get_rdata_count()))
- msg.add_rrset(Message.SECTION_AUTHORITY, soa_rrset)
- self._request_serial = get_soa_serial(soa_rrset.get_rdata()[0])
- return msg
- def _send_data(self, data):
- size = len(data)
- total_count = 0
- while total_count < size:
- count = self.send(data[total_count:])
- total_count += count
- def _send_query(self, query_type):
- '''Send query message over TCP. '''
- msg = self._create_query(query_type)
- render = MessageRenderer()
- # XXX Currently, python wrapper doesn't accept 'None' parameter in this case,
- # we should remove the if statement and use a universal interface later.
- if self._tsig_key is not None:
- self._tsig_ctx = self._tsig_ctx_creator(self._tsig_key)
- msg.to_wire(render, self._tsig_ctx)
- else:
- msg.to_wire(render)
- header_len = struct.pack('H', socket.htons(render.get_length()))
- self._send_data(header_len)
- self._send_data(render.get_data())
- def _asyncore_loop(self):
- '''
- This method is a trivial wrapper for asyncore.loop(). It's extracted from
- _get_request_response so that we can test the rest of the code without
- involving actual communication with a remote server.'''
- asyncore.loop(self._idle_timeout, map=self._sock_map, count=1)
- def _get_request_response(self, size):
- recv_size = 0
- data = b''
- while recv_size < size:
- self._recv_time_out = True
- self._need_recv_size = size - recv_size
- self._asyncore_loop()
- if self._recv_time_out:
- raise XfrinException('receive data from socket time out.')
- recv_size += self._recvd_size
- data += self._recvd_data
- return data
- def _check_response_tsig(self, msg, response_data):
- tsig_record = msg.get_tsig_record()
- if self._tsig_ctx is not None:
- tsig_error = self._tsig_ctx.verify(tsig_record, response_data)
- if tsig_error != TSIGError.NOERROR:
- raise XfrinException('TSIG verify fail: %s' % str(tsig_error))
- elif tsig_record is not None:
- # If the response includes a TSIG while we didn't sign the query,
- # we treat it as an error. RFC doesn't say anything about this
- # case, but it clearly states the server must not sign a response
- # to an unsigned request. Although we could be flexible, no sane
- # implementation would return such a response, and since this is
- # part of security mechanism, it's probably better to be more
- # strict.
- raise XfrinException('Unexpected TSIG in response')
- def _check_soa_serial(self):
- ''' Compare the soa serial, if soa serial in master is less than
- the soa serial in local, Finish xfrin.
- False: soa serial in master is less or equal to the local one.
- True: soa serial in master is bigger
- '''
- self._send_query(RRType.SOA())
- data_len = self._get_request_response(2)
- msg_len = socket.htons(struct.unpack('H', data_len)[0])
- soa_response = self._get_request_response(msg_len)
- msg = Message(Message.PARSE)
- msg.from_wire(soa_response)
- # TSIG related checks, including an unexpected signed response
- self._check_response_tsig(msg, soa_response)
- # perform some minimal level validation. It's an open issue how
- # strict we should be (see the comment in _check_response_header())
- self._check_response_header(msg)
- # TODO, need select soa record from data source then compare the two
- # serial, current just return OK, since this function hasn't been used
- # now.
- return XFRIN_OK
- def do_xfrin(self, check_soa, ixfr_first=False):
- '''Do an xfr session by sending xfr request and parsing responses.'''
- try:
- ret = XFRIN_OK
- if check_soa:
- logstr = 'SOA check for \'%s\' ' % self.zone_str()
- ret = self._check_soa_serial()
- if ret == XFRIN_OK:
- if not ixfr_first:
- logger.info(XFRIN_AXFR_TRANSFER_STARTED, self.zone_str())
- self._send_query(RRType.AXFR())
- isc.datasrc.sqlite3_ds.load(self._db_file,
- self._zone_name.to_text(),
- self._handle_axfrin_response)
- logger.info(XFRIN_AXFR_TRANSFER_SUCCESS, self.zone_str())
- except XfrinException as e:
- logger.error(XFRIN_AXFR_TRANSFER_FAILURE, self.zone_str(), str(e))
- ret = XFRIN_FAIL
- #TODO, recover data source.
- except isc.datasrc.sqlite3_ds.Sqlite3DSError as e:
- logger.error(XFRIN_AXFR_DATABASE_FAILURE, self.zone_str(), str(e))
- ret = XFRIN_FAIL
- except UserWarning as e:
- # XXX: this is an exception from our C++ library via the
- # Boost.Python binding. It would be better to have more more
- # specific exceptions, but at this moment this is the finest
- # granularity.
- logger.error(XFRIN_AXFR_INTERNAL_FAILURE, self.zone_str(), str(e))
- ret = XFRIN_FAIL
- finally:
- self.close()
- return ret
- def _check_response_header(self, msg):
- '''Perform minimal validation on responses'''
- # It's not clear how strict we should be about response validation.
- # BIND 9 ignores some cases where it would normally be considered a
- # bogus response. For example, it accepts a response even if its
- # opcode doesn't match that of the corresponding request.
- # According to an original developer of BIND 9 some of the missing
- # checks are deliberate to be kind to old implementations that would
- # cause interoperability trouble with stricter checks.
- msg_rcode = msg.get_rcode()
- if msg_rcode != Rcode.NOERROR():
- raise XfrinException('error response: %s' % msg_rcode.to_text())
- if not msg.get_header_flag(Message.HEADERFLAG_QR):
- raise XfrinException('response is not a response')
- if msg.get_qid() != self._query_id:
- raise XfrinException('bad query id')
- def _check_response_status(self, msg):
- '''Check validation of xfr response. '''
- self._check_response_header(msg)
- if msg.get_rr_count(Message.SECTION_ANSWER) == 0:
- raise XfrinException('answer section is empty')
- if msg.get_rr_count(Message.SECTION_QUESTION) > 1:
- raise XfrinException('query section count greater than 1')
- def _handle_answer_section(self, answer_section):
- '''Return a generator for the reponse in one tcp package to a zone transfer.'''
- for rrset in answer_section:
- rrset_name = rrset.get_name().to_text()
- rrset_ttl = int(rrset.get_ttl().to_text())
- rrset_class = rrset.get_class().to_text()
- rrset_type = rrset.get_type().to_text()
- for rdata in rrset.get_rdata():
- # Count the soa record count
- if rrset.get_type() == RRType.SOA():
- self._soa_rr_count += 1
- # XXX: the current DNS message parser can't preserve the
- # RR order or separete the beginning and ending SOA RRs.
- # As a short term workaround, we simply ignore the second
- # SOA, and ignore the erroneous case where the transfer
- # session doesn't end with an SOA.
- if (self._soa_rr_count == 2):
- # Avoid inserting soa record twice
- break
- rdata_text = rdata.to_text()
- yield (rrset_name, rrset_ttl, rrset_class, rrset_type,
- rdata_text)
- def _handle_axfrin_response(self):
- '''Return a generator for the response to a zone transfer. '''
- while True:
- data_len = self._get_request_response(2)
- msg_len = socket.htons(struct.unpack('H', data_len)[0])
- recvdata = self._get_request_response(msg_len)
- msg = Message(Message.PARSE)
- msg.from_wire(recvdata)
- # TSIG related checks, including an unexpected signed response
- self._check_response_tsig(msg, recvdata)
- # Perform response status validation
- self._check_response_status(msg)
- answer_section = msg.get_section(Message.SECTION_ANSWER)
- for rr in self._handle_answer_section(answer_section):
- yield rr
- if self._soa_rr_count == 2:
- break
- if self._shutdown_event.is_set():
- raise XfrinException('xfrin is forced to stop')
- def handle_read(self):
- '''Read query's response from socket. '''
- self._recvd_data = self.recv(self._need_recv_size)
- self._recvd_size = len(self._recvd_data)
- self._recv_time_out = False
- def writable(self):
- '''Ignore the writable socket. '''
- return False
- def log_info(self, msg, type='info'):
- # Overwrite the log function, log nothing
- pass
- def process_xfrin(server, xfrin_recorder, zone_name, rrclass, db_file,
- shutdown_event, master_addrinfo, check_soa, verbose,
- tsig_key):
- xfrin_recorder.increment(zone_name)
- sock_map = {}
- conn = XfrinConnection(sock_map, zone_name, rrclass, db_file,
- shutdown_event, master_addrinfo,
- tsig_key, verbose)
- ret = XFRIN_FAIL
- if conn.connect_to_master():
- ret = conn.do_xfrin(check_soa)
- # Publish the zone transfer result news, so zonemgr can reset the
- # zone timer, and xfrout can notify the zone's slaves if the result
- # is success.
- server.publish_xfrin_news(zone_name, rrclass, ret)
- xfrin_recorder.decrement(zone_name)
- class XfrinRecorder:
- def __init__(self):
- self._lock = threading.Lock()
- self._zones = []
- def increment(self, zone_name):
- self._lock.acquire()
- self._zones.append(zone_name)
- self._lock.release()
- def decrement(self, zone_name):
- self._lock.acquire()
- if zone_name in self._zones:
- self._zones.remove(zone_name)
- self._lock.release()
- def xfrin_in_progress(self, zone_name):
- self._lock.acquire()
- ret = zone_name in self._zones
- self._lock.release()
- return ret
- def count(self):
- self._lock.acquire()
- ret = len(self._zones)
- self._lock.release()
- return ret
- class ZoneInfo:
- def __init__(self, config_data, module_cc):
- """Creates a zone_info with the config data element as
- specified by the 'zones' list in xfrin.spec. Module_cc is
- needed to get the defaults from the specification"""
- self._module_cc = module_cc
- self.set_name(config_data.get('name'))
- self.set_master_addr(config_data.get('master_addr'))
- self.set_master_port(config_data.get('master_port'))
- self.set_zone_class(config_data.get('class'))
- self.set_tsig_key(config_data.get('tsig_key'))
- self.set_ixfr_disabled(config_data.get('ixfr_disabled'))
- def set_name(self, name_str):
- """Set the name for this zone given a name string.
- Raises XfrinZoneInfoException if name_str is None or if it
- cannot be parsed."""
- if name_str is None:
- raise XfrinZoneInfoException("Configuration zones list "
- "element does not contain "
- "'name' attribute")
- else:
- self.name = _check_zone_name(name_str)
- def set_master_addr(self, master_addr_str):
- """Set the master address for this zone given an IP address
- string. Raises XfrinZoneInfoException if master_addr_str is
- None or if it cannot be parsed."""
- if master_addr_str is None:
- raise XfrinZoneInfoException("master address missing from config data")
- else:
- try:
- self.master_addr = isc.net.parse.addr_parse(master_addr_str)
- except ValueError:
- logger.error(XFRIN_BAD_MASTER_ADDR_FORMAT, master_addr_str)
- errmsg = "bad format for zone's master: " + master_addr_str
- raise XfrinZoneInfoException(errmsg)
- def set_master_port(self, master_port_str):
- """Set the master port given a port number string. If
- master_port_str is None, the default from the specification
- for this module will be used. Raises XfrinZoneInfoException if
- the string contains an invalid port number"""
- if master_port_str is None:
- self.master_port = self._module_cc.get_default_value("zones/master_port")
- else:
- try:
- self.master_port = isc.net.parse.port_parse(master_port_str)
- except ValueError:
- logger.error(XFRIN_BAD_MASTER_PORT_FORMAT, master_port_str)
- errmsg = "bad format for zone's master port: " + master_port_str
- raise XfrinZoneInfoException(errmsg)
- def set_zone_class(self, zone_class_str):
- """Set the zone class given an RR class str (e.g. "IN"). If
- zone_class_str is None, it will default to what is specified
- in the specification file for this module. Raises
- XfrinZoneInfoException if the string cannot be parsed."""
- # TODO: remove _str
- self.class_str = zone_class_str or self._module_cc.get_default_value("zones/class")
- if zone_class_str == None:
- #TODO rrclass->zone_class
- self.rrclass = RRClass(self._module_cc.get_default_value("zones/class"))
- else:
- try:
- self.rrclass = RRClass(zone_class_str)
- except InvalidRRClass:
- logger.error(XFRIN_BAD_ZONE_CLASS, zone_class_str)
- errmsg = "invalid zone class: " + zone_class_str
- raise XfrinZoneInfoException(errmsg)
- def set_tsig_key(self, tsig_key_str):
- """Set the tsig_key for this zone, given a TSIG key string
- representation. If tsig_key_str is None, no TSIG key will
- be set. Raises XfrinZoneInfoException if tsig_key_str cannot
- be parsed."""
- if tsig_key_str is None:
- self.tsig_key = None
- else:
- try:
- self.tsig_key = TSIGKey(tsig_key_str)
- except InvalidParameter as ipe:
- logger.error(XFRIN_BAD_TSIG_KEY_STRING, tsig_key_str)
- errmsg = "bad TSIG key string: " + tsig_key_str
- raise XfrinZoneInfoException(errmsg)
- def set_ixfr_disabled(self, ixfr_disabled):
- """Set ixfr_disabled. If set to False (the default), it will use
- IXFR for incoming transfers. If set to True, it will use AXFR.
- At this moment there is no automatic fallback"""
- # don't care what type it is; if evaluates to true, set to True
- if ixfr_disabled:
- self.ixfr_disabled = True
- else:
- self.ixfr_disabled = False
- def get_master_addr_info(self):
- return (self.master_addr.family, socket.SOCK_STREAM,
- (str(self.master_addr), self.master_port))
- class Xfrin:
- def __init__(self, verbose = False):
- self._max_transfers_in = 10
- self._zones = {}
- self._cc_setup()
- self.recorder = XfrinRecorder()
- self._shutdown_event = threading.Event()
- self._verbose = verbose
- def _cc_setup(self):
- '''This method is used only as part of initialization, but is
- implemented separately for convenience of unit tests; by letting
- the test code override this method we can test most of this class
- without requiring a command channel.'''
- # Create one session for sending command to other modules, because the
- # listening session will block the send operation.
- self._send_cc_session = isc.cc.Session()
- self._module_cc = isc.config.ModuleCCSession(SPECFILE_LOCATION,
- self.config_handler,
- self.command_handler)
- self._module_cc.start()
- config_data = self._module_cc.get_full_config()
- self.config_handler(config_data)
- def _cc_check_command(self):
- '''This is a straightforward wrapper for cc.check_command,
- but provided as a separate method for the convenience
- of unit tests.'''
- self._module_cc.check_command(False)
- def _get_zone_info(self, name, rrclass):
- """Returns the ZoneInfo object containing the configured data
- for the given zone name. If the zone name did not have any
- data, returns None"""
- return self._zones.get((name.to_text(), rrclass.to_text()))
- def _add_zone_info(self, zone_info):
- """Add the zone info. Raises a XfrinZoneInfoException if a zone
- with the same name and class is already configured"""
- key = (zone_info.name.to_text(), zone_info.class_str)
- if key in self._zones:
- raise XfrinZoneInfoException("zone " + str(key) +
- " configured multiple times")
- self._zones[key] = zone_info
- def _clear_zone_info(self):
- self._zones = {}
- def config_handler(self, new_config):
- # backup all config data (should there be a problem in the new
- # data)
- old_max_transfers_in = self._max_transfers_in
- old_zones = self._zones
- self._max_transfers_in = new_config.get("transfers_in") or self._max_transfers_in
- if 'zones' in new_config:
- self._clear_zone_info()
- for zone_config in new_config.get('zones'):
- try:
- zone_info = ZoneInfo(zone_config, self._module_cc)
- self._add_zone_info(zone_info)
- except XfrinZoneInfoException as xce:
- self._zones = old_zones
- self._max_transfers_in = old_max_transfers_in
- return create_answer(1, str(xce))
- return create_answer(0)
- def shutdown(self):
- ''' shutdown the xfrin process. the thread which is doing xfrin should be
- terminated.
- '''
- self._shutdown_event.set()
- main_thread = threading.currentThread()
- for th in threading.enumerate():
- if th is main_thread:
- continue
- th.join()
- def command_handler(self, command, args):
- answer = create_answer(0)
- try:
- if command == 'shutdown':
- self._shutdown_event.set()
- elif command == 'notify' or command == REFRESH_FROM_ZONEMGR:
- # Xfrin receives the refresh/notify command from zone manager.
- # notify command maybe has the parameters which
- # specify the notifyfrom address and port, according the RFC1996, zone
- # transfer should starts first from the notifyfrom, but now, let 'TODO' it.
- # (using the value now, while we can only set one master address, would be
- # a security hole. Once we add the ability to have multiple master addresses,
- # we should check if it matches one of them, and then use it.)
- (zone_name, rrclass) = self._parse_zone_name_and_class(args)
- zone_info = self._get_zone_info(zone_name, rrclass)
- if zone_info is None:
- # TODO what to do? no info known about zone. defaults?
- errmsg = "Got notification to retransfer unknown zone " + zone_name.to_text()
- logger.error(XFRIN_RETRANSFER_UNKNOWN_ZONE, zone_name.to_text())
- answer = create_answer(1, errmsg)
- else:
- master_addr = zone_info.get_master_addr_info()
- ret = self.xfrin_start(zone_name,
- rrclass,
- self._get_db_file(),
- master_addr,
- zone_info.tsig_key,
- True)
- answer = create_answer(ret[0], ret[1])
- elif command == 'retransfer' or command == 'refresh':
- # Xfrin receives the retransfer/refresh from cmdctl(sent by bindctl).
- # If the command has specified master address, do transfer from the
- # master address, or else do transfer from the configured masters.
- (zone_name, rrclass) = self._parse_zone_name_and_class(args)
- master_addr = self._parse_master_and_port(args, zone_name,
- rrclass)
- zone_info = self._get_zone_info(zone_name, rrclass)
- tsig_key = None
- if zone_info:
- tsig_key = zone_info.tsig_key
- db_file = args.get('db_file') or self._get_db_file()
- ret = self.xfrin_start(zone_name,
- rrclass,
- db_file,
- master_addr,
- tsig_key,
- (False if command == 'retransfer' else True))
- answer = create_answer(ret[0], ret[1])
- else:
- answer = create_answer(1, 'unknown command: ' + command)
- except XfrinException as err:
- logger.error(XFRIN_COMMAND_ERROR, command, str(err))
- answer = create_answer(1, str(err))
- return answer
- def _parse_zone_name_and_class(self, args):
- zone_name_str = args.get('zone_name')
- if zone_name_str is None:
- raise XfrinException('zone name should be provided')
- return (_check_zone_name(zone_name_str), _check_zone_class(args.get('zone_class')))
- def _parse_master_and_port(self, args, zone_name, zone_class):
- """
- Return tuple (family, socktype, sockaddr) for address and port in given
- args dict.
- IPv4 and IPv6 are the only supported addresses now, so sockaddr will be
- (address, port). The socktype is socket.SOCK_STREAM for now.
- """
- # check if we have configured info about this zone, in case
- # port or master are not specified
- zone_info = self._get_zone_info(zone_name, zone_class)
- addr_str = args.get('master')
- if addr_str is None:
- if zone_info is not None:
- addr = zone_info.master_addr
- else:
- raise XfrinException("Master address not given or "
- "configured for " + zone_name.to_text())
- else:
- try:
- addr = isc.net.parse.addr_parse(addr_str)
- except ValueError as err:
- raise XfrinException("failed to resolve master address %s: %s" %
- (addr_str, str(err)))
- port_str = args.get('port')
- if port_str is None:
- if zone_info is not None:
- port = zone_info.master_port
- else:
- port = DEFAULT_MASTER_PORT
- else:
- try:
- port = isc.net.parse.port_parse(port_str)
- except ValueError as err:
- raise XfrinException("failed to parse port=%s: %s" %
- (port_str, str(err)))
- return (addr.family, socket.SOCK_STREAM, (str(addr), port))
- def _get_db_file(self):
- #TODO, the db file path should be got in auth server's configuration
- # if we need access to this configuration more often, we
- # should add it on start, and not remove it here
- # (or, if we have writable ds, we might not need this in
- # the first place)
- self._module_cc.add_remote_config(AUTH_SPECFILE_LOCATION)
- db_file, is_default = self._module_cc.get_remote_config_value("Auth", "database_file")
- if is_default and "B10_FROM_BUILD" in os.environ:
- # this too should be unnecessary, but currently the
- # 'from build' override isn't stored in the config
- # (and we don't have writable datasources yet)
- db_file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3"
- self._module_cc.remove_remote_config(AUTH_SPECFILE_LOCATION)
- return db_file
- def publish_xfrin_news(self, zone_name, zone_class, xfr_result):
- '''Send command to xfrout/zone manager module.
- If xfrin has finished successfully for one zone, tell the good
- news(command: zone_new_data_ready) to zone manager and xfrout.
- if xfrin failed, just tell the bad news to zone manager, so that
- it can reset the refresh timer for that zone. '''
- param = {'zone_name': zone_name, 'zone_class': zone_class.to_text()}
- if xfr_result == XFRIN_OK:
- msg = create_command(notify_out.ZONE_NEW_DATA_READY_CMD, param)
- # catch the exception, in case msgq has been killed.
- try:
- seq = self._send_cc_session.group_sendmsg(msg,
- XFROUT_MODULE_NAME)
- try:
- answer, env = self._send_cc_session.group_recvmsg(False,
- seq)
- except isc.cc.session.SessionTimeout:
- pass # for now we just ignore the failure
- seq = self._send_cc_session.group_sendmsg(msg, ZONE_MANAGER_MODULE_NAME)
- try:
- answer, env = self._send_cc_session.group_recvmsg(False,
- seq)
- except isc.cc.session.SessionTimeout:
- pass # for now we just ignore the failure
- except socket.error as err:
- logger.error(XFRIN_MSGQ_SEND_ERROR, XFROUT_MODULE_NAME, ZONE_MANAGER_MODULE_NAME)
- else:
- msg = create_command(ZONE_XFRIN_FAILED, param)
- # catch the exception, in case msgq has been killed.
- try:
- seq = self._send_cc_session.group_sendmsg(msg, ZONE_MANAGER_MODULE_NAME)
- try:
- answer, env = self._send_cc_session.group_recvmsg(False,
- seq)
- except isc.cc.session.SessionTimeout:
- pass # for now we just ignore the failure
- except socket.error as err:
- logger.error(XFRIN_MSGQ_SEND_ERROR_ZONE_MANAGER, ZONE_MANAGER_MODULE_NAME)
- def startup(self):
- while not self._shutdown_event.is_set():
- self._cc_check_command()
- def xfrin_start(self, zone_name, rrclass, db_file, master_addrinfo, tsig_key,
- check_soa = True):
- if "pydnspp" not in sys.modules:
- return (1, "xfrin failed, can't load dns message python library: 'pydnspp'")
- # check max_transfer_in, else return quota error
- if self.recorder.count() >= self._max_transfers_in:
- return (1, 'xfrin quota error')
- if self.recorder.xfrin_in_progress(zone_name):
- return (1, 'zone xfrin is in progress')
- xfrin_thread = threading.Thread(target = process_xfrin,
- args = (self,
- self.recorder,
- zone_name,
- rrclass,
- db_file,
- self._shutdown_event,
- master_addrinfo, check_soa,
- self._verbose,
- tsig_key))
- xfrin_thread.start()
- return (0, 'zone xfrin is started')
- xfrind = None
- def signal_handler(signal, frame):
- if xfrind:
- xfrind.shutdown()
- sys.exit(0)
- def set_signal_handler():
- signal.signal(signal.SIGTERM, signal_handler)
- signal.signal(signal.SIGINT, signal_handler)
- def set_cmd_options(parser):
- parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
- help="display more about what is going on")
- def main(xfrin_class, use_signal = True):
- """The main loop of the Xfrin daemon.
- @param xfrin_class: A class of the Xfrin object. This is normally Xfrin,
- but can be a subclass of it for customization.
- @param use_signal: True if this process should catch signals. This is
- normally True, but may be disabled when this function is called in a
- testing context."""
- global xfrind
- try:
- parser = OptionParser(version = __version__)
- set_cmd_options(parser)
- (options, args) = parser.parse_args()
- if use_signal:
- set_signal_handler()
- xfrind = xfrin_class(verbose = options.verbose)
- xfrind.startup()
- except KeyboardInterrupt:
- logger.info(XFRIN_STOPPED_BY_KEYBOARD)
- except isc.cc.session.SessionError as e:
- logger.error(XFRIN_CC_SESSION_ERROR, str(e))
- except Exception as e:
- logger.error(XFRIN_UNKNOWN_ERROR, str(e))
- if xfrind:
- xfrind.shutdown()
- if __name__ == '__main__':
- main(Xfrin)
|