123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263 |
- #!@PYTHON@
- # Copyright (C) 2009-2011 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- import sys; sys.path.append ('@@PYTHONPATH@@')
- import os
- import signal
- import isc
- import asyncore
- import struct
- import threading
- import socket
- import random
- from optparse import OptionParser, OptionValueError
- from isc.config.ccsession import *
- from isc.notify import notify_out
- import isc.util.process
- from isc.datasrc import DataSourceClient, ZoneFinder
- import isc.net.parse
- from isc.xfrin.diff import Diff
- from isc.log_messages.xfrin_messages import *
- isc.log.init("b10-xfrin")
- logger = isc.log.Logger("xfrin")
- try:
- from pydnspp import *
- except ImportError as e:
- # C++ loadable module may not be installed; even so the xfrin process
- # must keep running, so we warn about it and move forward.
- logger.error(XFRIN_IMPORT_DNS, str(e))
- isc.util.process.rename()
- # If B10_FROM_BUILD is set in the environment, we use data files
- # from a directory relative to that, otherwise we use the ones
- # installed on the system
- if "B10_FROM_BUILD" in os.environ:
- SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/xfrin"
- AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth"
- else:
- PREFIX = "@prefix@"
- DATAROOTDIR = "@datarootdir@"
- SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
- AUTH_SPECFILE_PATH = SPECFILE_PATH
- SPECFILE_LOCATION = SPECFILE_PATH + "/xfrin.spec"
- AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + "/auth.spec"
- XFROUT_MODULE_NAME = 'Xfrout'
- ZONE_MANAGER_MODULE_NAME = 'Zonemgr'
- REFRESH_FROM_ZONEMGR = 'refresh_from_zonemgr'
- ZONE_XFRIN_FAILED = 'zone_xfrin_failed'
- # Constants for debug levels, to be removed when we have #1074.
- DBG_XFRIN_TRACE = 3
- # These two default are currently hard-coded. For config this isn't
- # necessary, but we need these defaults for optional command arguments
- # (TODO: have similar support to get default values for command
- # arguments as we do for config options)
- DEFAULT_MASTER_PORT = 53
- DEFAULT_ZONE_CLASS = RRClass.IN()
- __version__ = 'BIND10'
- # define xfrin rcode
- XFRIN_OK = 0
- XFRIN_FAIL = 1
- class XfrinException(Exception):
- pass
- class XfrinProtocolError(Exception):
- '''An exception raised for errors encountered in xfrin protocol handling.
- '''
- pass
- class XfrinZoneInfoException(Exception):
- """This exception is raised if there is an error in the given
- configuration (part), or when a command does not have a required
- argument or has bad arguments, for instance when the zone's master
- address is not a valid IP address, when the zone does not
- have a name, or when multiple settings are given for the same
- zone."""
- pass
- def _check_zone_name(zone_name_str):
- """Checks if the given zone name is a valid domain name, and returns
- it as a Name object. Raises an XfrinException if it is not."""
- try:
- # In the _zones dict, part of the key is the zone name,
- # but due to a limitation in the Name class, we
- # cannot directly use it as a dict key, and we use to_text()
- #
- # Downcase the name here for that reason.
- return Name(zone_name_str, True)
- except (EmptyLabel, TooLongLabel, BadLabelType, BadEscape,
- TooLongName, IncompleteName) as ne:
- raise XfrinZoneInfoException("bad zone name: " + zone_name_str + " (" + str(ne) + ")")
- def _check_zone_class(zone_class_str):
- """If the given argument is a string: checks if the given class is
- a valid one, and returns an RRClass object if so.
- Raises XfrinZoneInfoException if not.
- If it is None, this function returns the default RRClass.IN()"""
- if zone_class_str is None:
- return DEFAULT_ZONE_CLASS
- try:
- return RRClass(zone_class_str)
- except InvalidRRClass as irce:
- raise XfrinZoneInfoException("bad zone class: " + zone_class_str + " (" + str(irce) + ")")
- def get_soa_serial(soa_rdata):
- '''Extract the serial field of an SOA RDATA and returns it as an intger.
- We don't have to be very efficient here, so we first dump the entire RDATA
- as a string and convert the first corresponding field. This should be
- sufficient in practice, but may not always work when the MNAME or RNAME
- contains an (escaped) space character in their labels. Ideally there
- should be a more direct and convenient way to get access to the SOA
- fields.
- '''
- return int(soa_rdata.to_text().split()[2])
- class XfrinState:
- '''
- The states of the incomding *XFR state machine.
- We (will) handle both IXFR and AXFR with a single integrated state
- machine because they cannot be distinguished immediately - an AXFR
- response to an IXFR request can only be detected when the first two (2)
- response RRs have already been received.
- The following diagram summarizes the state transition. After sending
- the query, xfrin starts the process with the InitialSOA state (all
- IXFR/AXFR response begins with an SOA). When it reaches IXFREnd
- or AXFREnd, the process successfully completes.
- (AXFR or
- (recv SOA) AXFR-style IXFR) (SOA, add)
- InitialSOA------->FirstData------------->AXFR--------->AXFREnd
- | | ^ (post xfr
- | | | checks, then
- | +--+ commit)
- | (non SOA, add)
- |
- | (non SOA, delete)
- (pure IXFR,| +-------+
- keep handling)| (Delete SOA) V |
- + ->IXFRDeleteSOA------>IXFRDelete--+
- ^ |
- (see SOA, not end, | (see SOA)|
- commit, keep handling) | |
- | V
- +---------IXFRAdd<----------+IXFRAddSOA
- (non SOA, add)| ^ | (Add SOA)
- ----------+ |
- |(see SOA w/ end serial, commit changes)
- V
- IXFREnd
- Note that changes are committed for every "difference sequence"
- (i.e. changes for one SOA update). This means when an IXFR response
- contains multiple difference sequences and something goes wrong
- after several commits, these changes have been published and visible
- to clients even if the IXFR session is subsequently aborted.
- It is not clear if this is valid in terms of the protocol specification.
- Section 4 of RFC 1995 states:
- An IXFR client, should only replace an older version with a newer
- version after all the differences have been successfully processed.
- If this "replacement" is for the changes of one difference sequence
- and "all the differences" mean the changes for that sequence, this
- implementation strictly follows what RFC states. If this is for
- the entire IXFR response (that may contain multiple sequences),
- we should implement it with one big transaction and one final commit
- at the very end.
- For now, we implement it with multiple smaller commits for two
- reasons. First, this is what BIND 9 does, and we generally port
- the implementation logic here. BIND 9 has been supporting IXFR
- for many years, so the fact that it still behaves this way
- probably means it at least doesn't cause a severe operational
- problem in practice. Second, especially because BIND 10 would
- often uses a database backend, a larger transaction could cause an
- undesirable effects, e.g. suspending normal lookups for a longer
- period depending on the characteristics of the database. Even if
- we find something wrong in a later sequeunce and abort the
- session, we can start another incremental update from what has
- been validated, or we can switch to AXFR to replace the zone
- completely.
- This implementation uses the state design pattern, where each state
- is represented as a subclass of the base XfrinState class. Each concrete
- subclass of XfrinState is assumed to define two methods: handle_rr() and
- finish_message(). These methods handle specific part of XFR protocols
- and (if necessary) perform the state transition.
- Conceptually, XfrinState and its subclasses are a "friend" of
- XfrinConnection and are assumed to be allowed to access its internal
- information (even though Python does not have a strict access control
- between different classes).
- The XfrinState and its subclasses are designed to be stateless, and
- can be used as singleton objects. For now, however, we always instantiate
- a new object for every state transition, partly because the introduction
- of singleton will make a code bit complicated, and partly because
- the overhead of object instantiotion wouldn't be significant for xfrin.
- '''
- def set_xfrstate(self, conn, new_state):
- '''Set the XfrConnection to a given new state.
- As a "friend" class, this method intentionally gets access to the
- connection's "private" method.
- '''
- conn._XfrinConnection__set_xfrstate(new_state)
- def handle_rr(self, conn):
- '''Handle one RR of an XFR response message.
- Depending on the state, the RR is generally added or deleted in the
- corresponding data source, or in some special cases indicates
- a specifi transition, such as starting a new IXFR difference
- sequence or completing the session.
- All subclass has their specific behaviors for this method, so
- there is no default definition. If the base class version
- is called, it's a bug of the caller, and it's notified via
- an XfrinException exception.
- This method returns a boolean value: True if the given RR was
- fully handled and the caller should go to the next RR; False
- if the caller needs to call this method with the (possibly) new
- state for the same RR again.
- '''
- raise XfrinException("Internal bug: " +
- "XfrinState.handle_rr() called directly")
- def finish_message(self, conn):
- '''Perform any final processing after handling all RRs of a response.
- This method then returns a boolean indicating whether to continue
- receiving the message. Unless it's in the end of the entire XFR
- session, we should continue, so this default method simply returns
- True.
- '''
- return True
- class XfrinInitialSOA(XfrinState):
- def handle_rr(self, conn, rr):
- if rr.get_type() != RRType.SOA():
- raise XfrinProtocolError('First RR in zone transfer must be SOA ('
- + rr.get_type().to_text() + ' received)')
- conn._end_serial = get_soa_serial(rr.get_rdata()[0])
- # FIXME: we need to check the serial is actually greater than ours.
- # To do so, however, we need to implement serial number arithmetic.
- # Although it wouldn't be a big task, we'll leave it for a separate
- # task for now. (Always performing xfr could be inefficient, but
- # shouldn't do any harm otherwise)
- self.set_xfrstate(conn, XfrinFirstData())
- return True
- class XfrinFirstData(XfrinState):
- def handle_rr(self, conn, rr):
- '''Handle the first RR after initial SOA in an XFR session.
- This state happens exactly once in an XFR session, where
- we decide whether it's incremental update ("real" IXFR) or
- non incremental update (AXFR or AXFR-style IXFR).
- If we initiated IXFR and the transfer begins with two SOAs
- (the serial of the second one being equal to our serial),
- it's incremental; otherwise it's non incremental.
- This method always return False (unlike many other handle_rr()
- methods) because this first RR must be examined again in the
- determined update context.
- Note that in the non incremental case the RR should normally be
- something other SOA, but it's still possible it's an SOA with a
- different serial than ours. The only possible interpretation at
- this point is that it's non incremental update that only consists
- of the SOA RR. It will result in broken zone (for example, it
- wouldn't even contain an apex NS) and should be rejected at post
- XFR processing, but in terms of the XFR session processing we
- accept it and move forward.
- Note further that, in the half-broken SOA-only transfer case,
- these two SOAs are supposed to be the same as stated in Section 2.2
- of RFC 5936. We don't check that condition here, either; we'll
- leave whether and how to deal with that situation to the end of
- the processing of non incremental update. See also a related
- discussion at the IETF dnsext wg:
- http://www.ietf.org/mail-archive/web/dnsext/current/msg07908.html
- '''
- if conn._request_type == RRType.IXFR() and \
- rr.get_type() == RRType.SOA() and \
- conn._request_serial == get_soa_serial(rr.get_rdata()[0]):
- logger.debug(DBG_XFRIN_TRACE, XFRIN_GOT_INCREMENTAL_RESP,
- conn.zone_str())
- self.set_xfrstate(conn, XfrinIXFRDeleteSOA())
- else:
- logger.debug(DBG_XFRIN_TRACE, XFRIN_GOT_NONINCREMENTAL_RESP,
- conn.zone_str())
- # We are now going to add RRs to the new zone. We need create
- # a Diff object. It will be used throughtout the XFR session.
- conn._diff = Diff(conn._datasrc_client, conn._zone_name, True)
- self.set_xfrstate(conn, XfrinAXFR())
- return False
- class XfrinIXFRDeleteSOA(XfrinState):
- def handle_rr(self, conn, rr):
- if rr.get_type() != RRType.SOA():
- # this shouldn't happen; should this occur it means an internal
- # bug.
- raise XfrinException(rr.get_type().to_text() +
- ' RR is given in IXFRDeleteSOA state')
- # This is the beginning state of one difference sequence (changes
- # for one SOA update). We need to create a new Diff object now.
- conn._diff = Diff(conn._datasrc_client, conn._zone_name)
- conn._diff.delete_data(rr)
- self.set_xfrstate(conn, XfrinIXFRDelete())
- return True
- class XfrinIXFRDelete(XfrinState):
- def handle_rr(self, conn, rr):
- if rr.get_type() == RRType.SOA():
- # This is the only place where current_serial is set
- conn._current_serial = get_soa_serial(rr.get_rdata()[0])
- self.set_xfrstate(conn, XfrinIXFRAddSOA())
- return False
- conn._diff.delete_data(rr)
- return True
- class XfrinIXFRAddSOA(XfrinState):
- def handle_rr(self, conn, rr):
- if rr.get_type() != RRType.SOA():
- # this shouldn't happen; should this occur it means an internal
- # bug.
- raise XfrinException(rr.get_type().to_text() +
- ' RR is given in IXFRAddSOA state')
- conn._diff.add_data(rr)
- self.set_xfrstate(conn, XfrinIXFRAdd())
- return True
- class XfrinIXFRAdd(XfrinState):
- def handle_rr(self, conn, rr):
- if rr.get_type() == RRType.SOA():
- soa_serial = get_soa_serial(rr.get_rdata()[0])
- if soa_serial == conn._end_serial:
- conn._diff.commit()
- self.set_xfrstate(conn, XfrinIXFREnd())
- return True
- elif soa_serial != conn._current_serial:
- raise XfrinProtocolError('IXFR out of sync: expected ' +
- 'serial ' +
- str(conn._current_serial) +
- ', got ' + str(soa_serial))
- else:
- conn._diff.commit()
- self.set_xfrstate(conn, XfrinIXFRDeleteSOA())
- return False
- conn._diff.add_data(rr)
- return True
- class XfrinIXFREnd(XfrinState):
- def handle_rr(self, conn, rr):
- raise XfrinProtocolError('Extra data after the end of IXFR diffs: ' +
- rr.to_text())
- def finish_message(self, conn):
- '''Final processing after processing an entire IXFR session.
- There will be more actions here, but for now we simply return False,
- indicating there will be no more message to receive.
- '''
- return False
- class XfrinAXFR(XfrinState):
- def handle_rr(self, conn, rr):
- """
- Handle the RR by putting it into the zone.
- """
- conn._diff.add_data(rr)
- if rr.get_type() == RRType.SOA():
- # SOA means end. Don't commit it yet - we need to perform
- # post-transfer checks
- soa_serial = get_soa_serial(rr.get_rdata()[0])
- if conn._end_serial != soa_serial:
- logger.warn(XFRIN_AXFR_INCONSISTENT_SOA, conn.zone_str(),
- conn._end_serial, soa_serial)
- self.set_xfrstate(conn, XfrinAXFREnd())
- # Yes, we've eaten this RR.
- return True
- class XfrinAXFREnd(XfrinState):
- def handle_rr(self, conn, rr):
- raise XfrinProtocolError('Extra data after the end of AXFR: ' +
- rr.to_text())
- def finish_message(self, conn):
- """
- Final processing after processing an entire AXFR session.
- In this process all the AXFR changes are committed to the
- data source.
- There might be more actions here, but for now we simply return False,
- indicating there will be no more message to receive.
- """
- conn._diff.commit()
- return False
- class XfrinConnection(asyncore.dispatcher):
- '''Do xfrin in this class. '''
- def __init__(self,
- sock_map, zone_name, rrclass, datasrc_client,
- shutdown_event, master_addrinfo, tsig_key=None,
- idle_timeout=60):
- '''Constructor of the XfirnConnection class.
- idle_timeout: max idle time for read data from socket.
- datasrc_client: the data source client object used for the XFR session.
- This will eventually replace db_file completely.
- '''
- asyncore.dispatcher.__init__(self, map=sock_map)
- # The XFR state. Conceptually this is purely private, so we emphasize
- # the fact by the double underscore. Other classes are assumed to
- # get access to this via get_xfrstate(), and only XfrinState classes
- # are assumed to be allowed to modify it via __set_xfrstate().
- self.__state = None
- # Requested transfer type (RRType.AXFR or RRType.IXFR). The actual
- # transfer type may differ due to IXFR->AXFR fallback:
- self._request_type = None
- # Zone parameters
- self._zone_name = zone_name
- self._rrclass = rrclass
- # Data source handler
- self._datasrc_client = datasrc_client
- self.create_socket(master_addrinfo[0], master_addrinfo[1])
- self._sock_map = sock_map
- self._soa_rr_count = 0
- self._idle_timeout = idle_timeout
- self.setblocking(1)
- self._shutdown_event = shutdown_event
- self._master_address = master_addrinfo[2]
- self._tsig_key = tsig_key
- self._tsig_ctx = None
- # tsig_ctx_creator is introduced to allow tests to use a mock class for
- # easier tests (in normal case we always use the default)
- self._tsig_ctx_creator = self.__create_tsig_ctx
- def __create_tsig_ctx(self, key):
- return TSIGContext(key)
- def __set_xfrstate(self, new_state):
- self.__state = new_state
- def get_xfrstate(self):
- return self.__state
- def zone_str(self):
- '''A convenient function for logging to include zone name and class'''
- return self._zone_name.to_text() + '/' + str(self._rrclass)
- def connect_to_master(self):
- '''Connect to master in TCP.'''
- try:
- self.connect(self._master_address)
- return True
- except socket.error as e:
- logger.error(XFRIN_CONNECT_MASTER, self._master_address, str(e))
- return False
- def _create_query(self, query_type):
- '''Create an XFR-related query message.
- query_type is either SOA, AXFR or IXFR. For type IXFR, it searches
- the associated data source for the current SOA record to include
- it in the query. If the corresponding zone or the SOA record
- cannot be found, it raises an XfrinException exception. Note that
- this may not necessarily a broken configuration; for the first attempt
- of transfer the secondary may not have any boot-strap zone
- information, in which case IXFR simply won't work. The xfrin
- should then fall back to AXFR. _request_serial is recorded for
- later use.
- '''
- msg = Message(Message.RENDER)
- query_id = random.randint(0, 0xFFFF)
- self._query_id = query_id
- msg.set_qid(query_id)
- msg.set_opcode(Opcode.QUERY())
- msg.set_rcode(Rcode.NOERROR())
- msg.add_question(Question(self._zone_name, self._rrclass, query_type))
- if query_type == RRType.IXFR():
- # get the zone finder. this must be SUCCESS (not even
- # PARTIALMATCH) because we are specifying the zone origin name.
- result, finder = self._datasrc_client.find_zone(self._zone_name)
- if result != DataSourceClient.SUCCESS:
- raise XfrinException('Zone not found in the given data ' +
- 'source: ' + self.zone_str())
- result, soa_rrset = finder.find(self._zone_name, RRType.SOA(),
- None, ZoneFinder.FIND_DEFAULT)
- if result != ZoneFinder.SUCCESS:
- raise XfrinException('SOA RR not found in zone: ' +
- self.zone_str())
- # Especially for database-based zones, a working zone may be in
- # a broken state where it has more than one SOA RR. We proactively
- # check the condition and abort the xfr attempt if we identify it.
- if soa_rrset.get_rdata_count() != 1:
- raise XfrinException('Invalid number of SOA RRs for ' +
- self.zone_str() + ': ' +
- str(soa_rrset.get_rdata_count()))
- msg.add_rrset(Message.SECTION_AUTHORITY, soa_rrset)
- self._request_serial = get_soa_serial(soa_rrset.get_rdata()[0])
- return msg
- def _send_data(self, data):
- size = len(data)
- total_count = 0
- while total_count < size:
- count = self.send(data[total_count:])
- total_count += count
- def _send_query(self, query_type):
- '''Send query message over TCP. '''
- msg = self._create_query(query_type)
- render = MessageRenderer()
- # XXX Currently, python wrapper doesn't accept 'None' parameter in this case,
- # we should remove the if statement and use a universal interface later.
- if self._tsig_key is not None:
- self._tsig_ctx = self._tsig_ctx_creator(self._tsig_key)
- msg.to_wire(render, self._tsig_ctx)
- else:
- msg.to_wire(render)
- header_len = struct.pack('H', socket.htons(render.get_length()))
- self._send_data(header_len)
- self._send_data(render.get_data())
- def _asyncore_loop(self):
- '''
- This method is a trivial wrapper for asyncore.loop(). It's extracted from
- _get_request_response so that we can test the rest of the code without
- involving actual communication with a remote server.'''
- asyncore.loop(self._idle_timeout, map=self._sock_map, count=1)
- def _get_request_response(self, size):
- recv_size = 0
- data = b''
- while recv_size < size:
- self._recv_time_out = True
- self._need_recv_size = size - recv_size
- self._asyncore_loop()
- if self._recv_time_out:
- raise XfrinException('receive data from socket time out.')
- recv_size += self._recvd_size
- data += self._recvd_data
- return data
- def _check_response_tsig(self, msg, response_data):
- tsig_record = msg.get_tsig_record()
- if self._tsig_ctx is not None:
- tsig_error = self._tsig_ctx.verify(tsig_record, response_data)
- if tsig_error != TSIGError.NOERROR:
- raise XfrinException('TSIG verify fail: %s' % str(tsig_error))
- elif tsig_record is not None:
- # If the response includes a TSIG while we didn't sign the query,
- # we treat it as an error. RFC doesn't say anything about this
- # case, but it clearly states the server must not sign a response
- # to an unsigned request. Although we could be flexible, no sane
- # implementation would return such a response, and since this is
- # part of security mechanism, it's probably better to be more
- # strict.
- raise XfrinException('Unexpected TSIG in response')
- def _check_soa_serial(self):
- ''' Compare the soa serial, if soa serial in master is less than
- the soa serial in local, Finish xfrin.
- False: soa serial in master is less or equal to the local one.
- True: soa serial in master is bigger
- '''
- self._send_query(RRType.SOA())
- data_len = self._get_request_response(2)
- msg_len = socket.htons(struct.unpack('H', data_len)[0])
- soa_response = self._get_request_response(msg_len)
- msg = Message(Message.PARSE)
- msg.from_wire(soa_response)
- # TSIG related checks, including an unexpected signed response
- self._check_response_tsig(msg, soa_response)
- # perform some minimal level validation. It's an open issue how
- # strict we should be (see the comment in _check_response_header())
- self._check_response_header(msg)
- # TODO, need select soa record from data source then compare the two
- # serial, current just return OK, since this function hasn't been used
- # now.
- return XFRIN_OK
- def do_xfrin(self, check_soa, request_type=RRType.AXFR()):
- '''Do an xfr session by sending xfr request and parsing responses.'''
- try:
- ret = XFRIN_OK
- self._request_type = request_type
- # Right now RRType.[IA]XFR().to_text() is 'TYPExxx', so we need
- # to hardcode here.
- request_str = 'IXFR' if request_type == RRType.IXFR() else 'AXFR'
- if check_soa:
- ret = self._check_soa_serial()
- if ret == XFRIN_OK:
- logger.info(XFRIN_XFR_TRANSFER_STARTED, request_str,
- self.zone_str())
- self._send_query(self._request_type)
- self.__state = XfrinInitialSOA()
- self._handle_xfrin_responses()
- logger.info(XFRIN_XFR_TRANSFER_SUCCESS, request_str,
- self.zone_str())
- except (XfrinException, XfrinProtocolError) as e:
- logger.error(XFRIN_XFR_TRANSFER_FAILURE, request_str,
- self.zone_str(), str(e))
- ret = XFRIN_FAIL
- except Exception as e:
- # Catching all possible exceptions like this is generally not a
- # good practice, but handling an xfr session could result in
- # so many types of exceptions, including ones from the DNS library
- # or from the data source library. Eventually we'd introduce a
- # hierarchy for exception classes from a base "ISC exception" and
- # catch it here, but until then we need broadest coverage so that
- # we won't miss anything.
- logger.error(XFRIN_XFR_OTHER_FAILURE, request_str,
- self.zone_str(), str(e))
- ret = XFRIN_FAIL
- finally:
- # Make sure any remaining transaction in the diff is closed
- # (if not yet - possible in case of xfr-level exception) as soon
- # as possible
- self._diff = None
- self.close()
- return ret
- def _check_response_header(self, msg):
- '''Perform minimal validation on responses'''
- # It's not clear how strict we should be about response validation.
- # BIND 9 ignores some cases where it would normally be considered a
- # bogus response. For example, it accepts a response even if its
- # opcode doesn't match that of the corresponding request.
- # According to an original developer of BIND 9 some of the missing
- # checks are deliberate to be kind to old implementations that would
- # cause interoperability trouble with stricter checks.
- msg_rcode = msg.get_rcode()
- if msg_rcode != Rcode.NOERROR():
- raise XfrinException('error response: %s' % msg_rcode.to_text())
- if not msg.get_header_flag(Message.HEADERFLAG_QR):
- raise XfrinException('response is not a response')
- if msg.get_qid() != self._query_id:
- raise XfrinException('bad query id')
- def _check_response_status(self, msg):
- '''Check validation of xfr response. '''
- self._check_response_header(msg)
- if msg.get_rr_count(Message.SECTION_QUESTION) > 1:
- raise XfrinException('query section count greater than 1')
- def _handle_answer_section(self, answer_section):
- '''Return a generator for the reponse in one tcp package to a zone transfer.'''
- for rrset in answer_section:
- rrset_name = rrset.get_name().to_text()
- rrset_ttl = int(rrset.get_ttl().to_text())
- rrset_class = rrset.get_class().to_text()
- rrset_type = rrset.get_type().to_text()
- for rdata in rrset.get_rdata():
- # Count the soa record count
- if rrset.get_type() == RRType.SOA():
- self._soa_rr_count += 1
- # XXX: the current DNS message parser can't preserve the
- # RR order or separete the beginning and ending SOA RRs.
- # As a short term workaround, we simply ignore the second
- # SOA, and ignore the erroneous case where the transfer
- # session doesn't end with an SOA.
- if (self._soa_rr_count == 2):
- # Avoid inserting soa record twice
- break
- rdata_text = rdata.to_text()
- yield (rrset_name, rrset_ttl, rrset_class, rrset_type,
- rdata_text)
- def _handle_xfrin_responses(self):
- read_next_msg = True
- while read_next_msg:
- data_len = self._get_request_response(2)
- msg_len = socket.htons(struct.unpack('H', data_len)[0])
- recvdata = self._get_request_response(msg_len)
- msg = Message(Message.PARSE)
- msg.from_wire(recvdata, Message.PRESERVE_ORDER)
- # TSIG related checks, including an unexpected signed response
- self._check_response_tsig(msg, recvdata)
- # Perform response status validation
- self._check_response_status(msg)
- for rr in msg.get_section(Message.SECTION_ANSWER):
- rr_handled = False
- while not rr_handled:
- rr_handled = self.__state.handle_rr(self, rr)
- read_next_msg = self.__state.finish_message(self)
- if self._shutdown_event.is_set():
- raise XfrinException('xfrin is forced to stop')
- def handle_read(self):
- '''Read query's response from socket. '''
- self._recvd_data = self.recv(self._need_recv_size)
- self._recvd_size = len(self._recvd_data)
- self._recv_time_out = False
- def writable(self):
- '''Ignore the writable socket. '''
- return False
- def log_info(self, msg, type='info'):
- # Overwrite the log function, log nothing
- pass
- def process_xfrin(server, xfrin_recorder, zone_name, rrclass, db_file,
- shutdown_event, master_addrinfo, check_soa, tsig_key,
- request_type):
- xfrin_recorder.increment(zone_name)
- # Create a data source client used in this XFR session. Right now we
- # still assume an sqlite3-based data source, and use both the old and new
- # data source APIs. We also need to use a mock client for tests.
- # For a temporary workaround to deal with these situations, we skip the
- # creation when the given file is none (the test case). Eventually
- # this code will be much cleaner.
- datasrc_client = None
- if db_file is not None:
- # temporary hardcoded sqlite initialization. Once we decide on
- # the config specification, we need to update this (TODO)
- # this may depend on #1207, or any followup ticket created for #1207
- datasrc_type = "sqlite3"
- datasrc_config = "{ \"database_file\": \"" + db_file + "\"}"
- datasrc_client = DataSourceClient(datasrc_type, datasrc_config)
- # Create a TCP connection for the XFR session and perform the operation.
- sock_map = {}
- conn = XfrinConnection(sock_map, zone_name, rrclass, datasrc_client,
- shutdown_event, master_addrinfo, tsig_key)
- ret = XFRIN_FAIL
- if conn.connect_to_master():
- ret = conn.do_xfrin(check_soa, request_type)
- # Publish the zone transfer result news, so zonemgr can reset the
- # zone timer, and xfrout can notify the zone's slaves if the result
- # is success.
- server.publish_xfrin_news(zone_name, rrclass, ret)
- xfrin_recorder.decrement(zone_name)
- class XfrinRecorder:
- def __init__(self):
- self._lock = threading.Lock()
- self._zones = []
- def increment(self, zone_name):
- self._lock.acquire()
- self._zones.append(zone_name)
- self._lock.release()
- def decrement(self, zone_name):
- self._lock.acquire()
- if zone_name in self._zones:
- self._zones.remove(zone_name)
- self._lock.release()
- def xfrin_in_progress(self, zone_name):
- self._lock.acquire()
- ret = zone_name in self._zones
- self._lock.release()
- return ret
- def count(self):
- self._lock.acquire()
- ret = len(self._zones)
- self._lock.release()
- return ret
- class ZoneInfo:
- def __init__(self, config_data, module_cc):
- """Creates a zone_info with the config data element as
- specified by the 'zones' list in xfrin.spec. Module_cc is
- needed to get the defaults from the specification"""
- self._module_cc = module_cc
- self.set_name(config_data.get('name'))
- self.set_master_addr(config_data.get('master_addr'))
- self.set_master_port(config_data.get('master_port'))
- self.set_zone_class(config_data.get('class'))
- self.set_tsig_key(config_data.get('tsig_key'))
- self.set_ixfr_disabled(config_data.get('ixfr_disabled'))
- def set_name(self, name_str):
- """Set the name for this zone given a name string.
- Raises XfrinZoneInfoException if name_str is None or if it
- cannot be parsed."""
- if name_str is None:
- raise XfrinZoneInfoException("Configuration zones list "
- "element does not contain "
- "'name' attribute")
- else:
- self.name = _check_zone_name(name_str)
- def set_master_addr(self, master_addr_str):
- """Set the master address for this zone given an IP address
- string. Raises XfrinZoneInfoException if master_addr_str is
- None or if it cannot be parsed."""
- if master_addr_str is None:
- raise XfrinZoneInfoException("master address missing from config data")
- else:
- try:
- self.master_addr = isc.net.parse.addr_parse(master_addr_str)
- except ValueError:
- logger.error(XFRIN_BAD_MASTER_ADDR_FORMAT, master_addr_str)
- errmsg = "bad format for zone's master: " + master_addr_str
- raise XfrinZoneInfoException(errmsg)
- def set_master_port(self, master_port_str):
- """Set the master port given a port number string. If
- master_port_str is None, the default from the specification
- for this module will be used. Raises XfrinZoneInfoException if
- the string contains an invalid port number"""
- if master_port_str is None:
- self.master_port = self._module_cc.get_default_value("zones/master_port")
- else:
- try:
- self.master_port = isc.net.parse.port_parse(master_port_str)
- except ValueError:
- logger.error(XFRIN_BAD_MASTER_PORT_FORMAT, master_port_str)
- errmsg = "bad format for zone's master port: " + master_port_str
- raise XfrinZoneInfoException(errmsg)
- def set_zone_class(self, zone_class_str):
- """Set the zone class given an RR class str (e.g. "IN"). If
- zone_class_str is None, it will default to what is specified
- in the specification file for this module. Raises
- XfrinZoneInfoException if the string cannot be parsed."""
- # TODO: remove _str
- self.class_str = zone_class_str or self._module_cc.get_default_value("zones/class")
- if zone_class_str == None:
- #TODO rrclass->zone_class
- self.rrclass = RRClass(self._module_cc.get_default_value("zones/class"))
- else:
- try:
- self.rrclass = RRClass(zone_class_str)
- except InvalidRRClass:
- logger.error(XFRIN_BAD_ZONE_CLASS, zone_class_str)
- errmsg = "invalid zone class: " + zone_class_str
- raise XfrinZoneInfoException(errmsg)
- def set_tsig_key(self, tsig_key_str):
- """Set the tsig_key for this zone, given a TSIG key string
- representation. If tsig_key_str is None, no TSIG key will
- be set. Raises XfrinZoneInfoException if tsig_key_str cannot
- be parsed."""
- if tsig_key_str is None:
- self.tsig_key = None
- else:
- try:
- self.tsig_key = TSIGKey(tsig_key_str)
- except InvalidParameter as ipe:
- logger.error(XFRIN_BAD_TSIG_KEY_STRING, tsig_key_str)
- errmsg = "bad TSIG key string: " + tsig_key_str
- raise XfrinZoneInfoException(errmsg)
- def set_ixfr_disabled(self, ixfr_disabled):
- """Set ixfr_disabled. If set to False (the default), it will use
- IXFR for incoming transfers. If set to True, it will use AXFR.
- At this moment there is no automatic fallback"""
- # don't care what type it is; if evaluates to true, set to True
- if ixfr_disabled:
- self.ixfr_disabled = True
- else:
- self.ixfr_disabled = False
- def get_master_addr_info(self):
- return (self.master_addr.family, socket.SOCK_STREAM,
- (str(self.master_addr), self.master_port))
- class Xfrin:
- def __init__(self):
- self._max_transfers_in = 10
- self._zones = {}
- self._cc_setup()
- self.recorder = XfrinRecorder()
- self._shutdown_event = threading.Event()
- def _cc_setup(self):
- '''This method is used only as part of initialization, but is
- implemented separately for convenience of unit tests; by letting
- the test code override this method we can test most of this class
- without requiring a command channel.'''
- # Create one session for sending command to other modules, because the
- # listening session will block the send operation.
- self._send_cc_session = isc.cc.Session()
- self._module_cc = isc.config.ModuleCCSession(SPECFILE_LOCATION,
- self.config_handler,
- self.command_handler)
- self._module_cc.start()
- config_data = self._module_cc.get_full_config()
- self.config_handler(config_data)
- def _cc_check_command(self):
- '''This is a straightforward wrapper for cc.check_command,
- but provided as a separate method for the convenience
- of unit tests.'''
- self._module_cc.check_command(False)
- def _get_zone_info(self, name, rrclass):
- """Returns the ZoneInfo object containing the configured data
- for the given zone name. If the zone name did not have any
- data, returns None"""
- return self._zones.get((name.to_text(), rrclass.to_text()))
- def _add_zone_info(self, zone_info):
- """Add the zone info. Raises a XfrinZoneInfoException if a zone
- with the same name and class is already configured"""
- key = (zone_info.name.to_text(), zone_info.class_str)
- if key in self._zones:
- raise XfrinZoneInfoException("zone " + str(key) +
- " configured multiple times")
- self._zones[key] = zone_info
- def _clear_zone_info(self):
- self._zones = {}
- def config_handler(self, new_config):
- # backup all config data (should there be a problem in the new
- # data)
- old_max_transfers_in = self._max_transfers_in
- old_zones = self._zones
- self._max_transfers_in = new_config.get("transfers_in") or self._max_transfers_in
- if 'zones' in new_config:
- self._clear_zone_info()
- for zone_config in new_config.get('zones'):
- try:
- zone_info = ZoneInfo(zone_config, self._module_cc)
- self._add_zone_info(zone_info)
- except XfrinZoneInfoException as xce:
- self._zones = old_zones
- self._max_transfers_in = old_max_transfers_in
- return create_answer(1, str(xce))
- return create_answer(0)
- def shutdown(self):
- ''' shutdown the xfrin process. the thread which is doing xfrin should be
- terminated.
- '''
- self._shutdown_event.set()
- main_thread = threading.currentThread()
- for th in threading.enumerate():
- if th is main_thread:
- continue
- th.join()
- def command_handler(self, command, args):
- answer = create_answer(0)
- try:
- if command == 'shutdown':
- self._shutdown_event.set()
- elif command == 'notify' or command == REFRESH_FROM_ZONEMGR:
- # Xfrin receives the refresh/notify command from zone manager.
- # notify command maybe has the parameters which
- # specify the notifyfrom address and port, according the RFC1996, zone
- # transfer should starts first from the notifyfrom, but now, let 'TODO' it.
- # (using the value now, while we can only set one master address, would be
- # a security hole. Once we add the ability to have multiple master addresses,
- # we should check if it matches one of them, and then use it.)
- (zone_name, rrclass) = self._parse_zone_name_and_class(args)
- zone_info = self._get_zone_info(zone_name, rrclass)
- if zone_info is None:
- # TODO what to do? no info known about zone. defaults?
- errmsg = "Got notification to retransfer unknown zone " + zone_name.to_text()
- logger.error(XFRIN_RETRANSFER_UNKNOWN_ZONE, zone_name.to_text())
- answer = create_answer(1, errmsg)
- else:
- master_addr = zone_info.get_master_addr_info()
- ret = self.xfrin_start(zone_name,
- rrclass,
- self._get_db_file(),
- master_addr,
- zone_info.tsig_key, RRType.AXFR(),
- True)
- answer = create_answer(ret[0], ret[1])
- elif command == 'retransfer' or command == 'refresh':
- # Xfrin receives the retransfer/refresh from cmdctl(sent by bindctl).
- # If the command has specified master address, do transfer from the
- # master address, or else do transfer from the configured masters.
- (zone_name, rrclass) = self._parse_zone_name_and_class(args)
- master_addr = self._parse_master_and_port(args, zone_name,
- rrclass)
- zone_info = self._get_zone_info(zone_name, rrclass)
- tsig_key = None
- request_type = RRType.AXFR()
- if zone_info:
- tsig_key = zone_info.tsig_key
- if not zone_info.ixfr_disabled:
- request_type = RRType.IXFR()
- db_file = args.get('db_file') or self._get_db_file()
- ret = self.xfrin_start(zone_name,
- rrclass,
- db_file,
- master_addr,
- tsig_key, request_type,
- (False if command == 'retransfer' else True))
- answer = create_answer(ret[0], ret[1])
- else:
- answer = create_answer(1, 'unknown command: ' + command)
- except XfrinException as err:
- logger.error(XFRIN_COMMAND_ERROR, command, str(err))
- answer = create_answer(1, str(err))
- return answer
- def _parse_zone_name_and_class(self, args):
- zone_name_str = args.get('zone_name')
- if zone_name_str is None:
- raise XfrinException('zone name should be provided')
- return (_check_zone_name(zone_name_str), _check_zone_class(args.get('zone_class')))
- def _parse_master_and_port(self, args, zone_name, zone_class):
- """
- Return tuple (family, socktype, sockaddr) for address and port in given
- args dict.
- IPv4 and IPv6 are the only supported addresses now, so sockaddr will be
- (address, port). The socktype is socket.SOCK_STREAM for now.
- """
- # check if we have configured info about this zone, in case
- # port or master are not specified
- zone_info = self._get_zone_info(zone_name, zone_class)
- addr_str = args.get('master')
- if addr_str is None:
- if zone_info is not None:
- addr = zone_info.master_addr
- else:
- raise XfrinException("Master address not given or "
- "configured for " + zone_name.to_text())
- else:
- try:
- addr = isc.net.parse.addr_parse(addr_str)
- except ValueError as err:
- raise XfrinException("failed to resolve master address %s: %s" %
- (addr_str, str(err)))
- port_str = args.get('port')
- if port_str is None:
- if zone_info is not None:
- port = zone_info.master_port
- else:
- port = DEFAULT_MASTER_PORT
- else:
- try:
- port = isc.net.parse.port_parse(port_str)
- except ValueError as err:
- raise XfrinException("failed to parse port=%s: %s" %
- (port_str, str(err)))
- return (addr.family, socket.SOCK_STREAM, (str(addr), port))
- def _get_db_file(self):
- #TODO, the db file path should be got in auth server's configuration
- # if we need access to this configuration more often, we
- # should add it on start, and not remove it here
- # (or, if we have writable ds, we might not need this in
- # the first place)
- self._module_cc.add_remote_config(AUTH_SPECFILE_LOCATION)
- db_file, is_default = self._module_cc.get_remote_config_value("Auth", "database_file")
- if is_default and "B10_FROM_BUILD" in os.environ:
- # this too should be unnecessary, but currently the
- # 'from build' override isn't stored in the config
- # (and we don't have writable datasources yet)
- db_file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3"
- self._module_cc.remove_remote_config(AUTH_SPECFILE_LOCATION)
- return db_file
- def publish_xfrin_news(self, zone_name, zone_class, xfr_result):
- '''Send command to xfrout/zone manager module.
- If xfrin has finished successfully for one zone, tell the good
- news(command: zone_new_data_ready) to zone manager and xfrout.
- if xfrin failed, just tell the bad news to zone manager, so that
- it can reset the refresh timer for that zone. '''
- param = {'zone_name': zone_name.to_text(),
- 'zone_class': zone_class.to_text()}
- if xfr_result == XFRIN_OK:
- msg = create_command(notify_out.ZONE_NEW_DATA_READY_CMD, param)
- # catch the exception, in case msgq has been killed.
- try:
- seq = self._send_cc_session.group_sendmsg(msg,
- XFROUT_MODULE_NAME)
- try:
- answer, env = self._send_cc_session.group_recvmsg(False,
- seq)
- except isc.cc.session.SessionTimeout:
- pass # for now we just ignore the failure
- seq = self._send_cc_session.group_sendmsg(msg, ZONE_MANAGER_MODULE_NAME)
- try:
- answer, env = self._send_cc_session.group_recvmsg(False,
- seq)
- except isc.cc.session.SessionTimeout:
- pass # for now we just ignore the failure
- except socket.error as err:
- logger.error(XFRIN_MSGQ_SEND_ERROR, XFROUT_MODULE_NAME, ZONE_MANAGER_MODULE_NAME)
- else:
- msg = create_command(ZONE_XFRIN_FAILED, param)
- # catch the exception, in case msgq has been killed.
- try:
- seq = self._send_cc_session.group_sendmsg(msg, ZONE_MANAGER_MODULE_NAME)
- try:
- answer, env = self._send_cc_session.group_recvmsg(False,
- seq)
- except isc.cc.session.SessionTimeout:
- pass # for now we just ignore the failure
- except socket.error as err:
- logger.error(XFRIN_MSGQ_SEND_ERROR_ZONE_MANAGER, ZONE_MANAGER_MODULE_NAME)
- def startup(self):
- while not self._shutdown_event.is_set():
- self._cc_check_command()
- def xfrin_start(self, zone_name, rrclass, db_file, master_addrinfo,
- tsig_key, request_type, check_soa=True):
- if "pydnspp" not in sys.modules:
- return (1, "xfrin failed, can't load dns message python library: 'pydnspp'")
- # check max_transfer_in, else return quota error
- if self.recorder.count() >= self._max_transfers_in:
- return (1, 'xfrin quota error')
- if self.recorder.xfrin_in_progress(zone_name):
- return (1, 'zone xfrin is in progress')
- xfrin_thread = threading.Thread(target = process_xfrin,
- args = (self,
- self.recorder,
- zone_name,
- rrclass,
- db_file,
- self._shutdown_event,
- master_addrinfo, check_soa,
- tsig_key, request_type))
- xfrin_thread.start()
- return (0, 'zone xfrin is started')
- xfrind = None
- def signal_handler(signal, frame):
- if xfrind:
- xfrind.shutdown()
- sys.exit(0)
- def set_signal_handler():
- signal.signal(signal.SIGTERM, signal_handler)
- signal.signal(signal.SIGINT, signal_handler)
- def set_cmd_options(parser):
- parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
- help="This option is obsolete and has no effect.")
- def main(xfrin_class, use_signal=True):
- """The main loop of the Xfrin daemon.
- @param xfrin_class: A class of the Xfrin object. This is normally Xfrin,
- but can be a subclass of it for customization.
- @param use_signal: True if this process should catch signals. This is
- normally True, but may be disabled when this function is called in a
- testing context."""
- global xfrind
- try:
- parser = OptionParser(version = __version__)
- set_cmd_options(parser)
- (options, args) = parser.parse_args()
- if use_signal:
- set_signal_handler()
- xfrind = xfrin_class()
- xfrind.startup()
- except KeyboardInterrupt:
- logger.info(XFRIN_STOPPED_BY_KEYBOARD)
- except isc.cc.session.SessionError as e:
- logger.error(XFRIN_CC_SESSION_ERROR, str(e))
- except Exception as e:
- logger.error(XFRIN_UNKNOWN_ERROR, str(e))
- if xfrind:
- xfrind.shutdown()
- if __name__ == '__main__':
- main(Xfrin)
|