123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707 |
- #!@PYTHON@
- # Copyright (C) 2009-2011 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- import sys; sys.path.append ('@@PYTHONPATH@@')
- import os
- import signal
- import isc
- import asyncore
- import struct
- import threading
- import socket
- import random
- import time
- from functools import reduce
- from optparse import OptionParser, OptionValueError
- from isc.config.ccsession import *
- from isc.notify import notify_out
- import isc.util.process
- from isc.datasrc import DataSourceClient, ZoneFinder
- import isc.net.parse
- from isc.xfrin.diff import Diff
- from isc.log_messages.xfrin_messages import *
- isc.log.init("b10-xfrin")
- logger = isc.log.Logger("xfrin")
- # Pending system-wide debug level definitions, the ones we
- # use here are hardcoded for now
- DBG_PROCESS = logger.DBGLVL_TRACE_BASIC
- DBG_COMMANDS = logger.DBGLVL_TRACE_DETAIL
- try:
- from pydnspp import *
- except ImportError as e:
- # C++ loadable module may not be installed; even so the xfrin process
- # must keep running, so we warn about it and move forward.
- logger.error(XFRIN_IMPORT_DNS, str(e))
- isc.util.process.rename()
- # If B10_FROM_BUILD is set in the environment, we use data files
- # from a directory relative to that, otherwise we use the ones
- # installed on the system
- if "B10_FROM_BUILD" in os.environ:
- SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/xfrin"
- AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth"
- else:
- PREFIX = "@prefix@"
- DATAROOTDIR = "@datarootdir@"
- SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
- AUTH_SPECFILE_PATH = SPECFILE_PATH
- SPECFILE_LOCATION = SPECFILE_PATH + "/xfrin.spec"
- AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + "/auth.spec"
- AUTH_MODULE_NAME = 'Auth'
- XFROUT_MODULE_NAME = 'Xfrout'
- ZONE_MANAGER_MODULE_NAME = 'Zonemgr'
- REFRESH_FROM_ZONEMGR = 'refresh_from_zonemgr'
- ZONE_XFRIN_FAILED = 'zone_xfrin_failed'
- # Constants for debug levels.
- DBG_XFRIN_TRACE = logger.DBGLVL_TRACE_BASIC
- # These two default are currently hard-coded. For config this isn't
- # necessary, but we need these defaults for optional command arguments
- # (TODO: have similar support to get default values for command
- # arguments as we do for config options)
- DEFAULT_MASTER_PORT = 53
- DEFAULT_ZONE_CLASS = RRClass.IN()
- __version__ = 'BIND10'
- # Internal result codes of an xfr session
- XFRIN_OK = 0 # normal success
- XFRIN_FAIL = 1 # general failure (internal/external)
- class XfrinException(Exception):
- pass
- class XfrinProtocolError(Exception):
- '''An exception raised for errors encountered in xfrin protocol handling.
- '''
- pass
- class XfrinZoneUptodate(Exception):
- '''TBD
- '''
- pass
- class XfrinZoneInfoException(Exception):
- """This exception is raised if there is an error in the given
- configuration (part), or when a command does not have a required
- argument or has bad arguments, for instance when the zone's master
- address is not a valid IP address, when the zone does not
- have a name, or when multiple settings are given for the same
- zone."""
- pass
- def _check_zone_name(zone_name_str):
- """Checks if the given zone name is a valid domain name, and returns
- it as a Name object. Raises an XfrinException if it is not."""
- try:
- # In the _zones dict, part of the key is the zone name,
- # but due to a limitation in the Name class, we
- # cannot directly use it as a dict key, and we use to_text()
- #
- # Downcase the name here for that reason.
- return Name(zone_name_str, True)
- except (EmptyLabel, TooLongLabel, BadLabelType, BadEscape,
- TooLongName, IncompleteName) as ne:
- raise XfrinZoneInfoException("bad zone name: " + zone_name_str + " (" + str(ne) + ")")
- def _check_zone_class(zone_class_str):
- """If the given argument is a string: checks if the given class is
- a valid one, and returns an RRClass object if so.
- Raises XfrinZoneInfoException if not.
- If it is None, this function returns the default RRClass.IN()"""
- if zone_class_str is None:
- return DEFAULT_ZONE_CLASS
- try:
- return RRClass(zone_class_str)
- except InvalidRRClass as irce:
- raise XfrinZoneInfoException("bad zone class: " + zone_class_str + " (" + str(irce) + ")")
- def format_zone_str(zone_name, zone_class):
- """Helper function to format a zone name and class as a string of
- the form '<name>/<class>'.
- Parameters:
- zone_name (isc.dns.Name) name to format
- zone_class (isc.dns.RRClass) class to format
- """
- return zone_name.to_text(True) + '/' + str(zone_class)
- def format_addrinfo(addrinfo):
- """Helper function to format the addrinfo as a string of the form
- <addr>:<port> (for IPv4) or [<addr>]:port (for IPv6). For unix domain
- sockets, and unknown address families, it returns a basic string
- conversion of the third element of the passed tuple.
- Parameters:
- addrinfo: a 3-tuple consisting of address family, socket type, and,
- depending on the family, either a 2-tuple with the address
- and port, or a filename
- """
- try:
- if addrinfo[0] == socket.AF_INET:
- return str(addrinfo[2][0]) + ":" + str(addrinfo[2][1])
- elif addrinfo[0] == socket.AF_INET6:
- return "[" + str(addrinfo[2][0]) + "]:" + str(addrinfo[2][1])
- else:
- return str(addrinfo[2])
- except IndexError:
- raise TypeError("addrinfo argument to format_addrinfo() does not "
- "appear to be consisting of (family, socktype, (addr, port))")
- def get_soa_serial(soa_rdata):
- '''Extract the serial field of SOA RDATA and return it as a Serial object.
- We don't have to be very efficient here, so we first dump the entire RDATA
- as a string and convert the first corresponding field. This should be
- sufficient in practice, but may not always work when the MNAME or RNAME
- contains an (escaped) space character in their labels. Ideally there
- should be a more direct and convenient way to get access to the SOA
- fields.
- '''
- return Serial(int(soa_rdata.to_text().split()[2]))
- class XfrinState:
- '''
- The states of the incomding *XFR state machine.
- We (will) handle both IXFR and AXFR with a single integrated state
- machine because they cannot be distinguished immediately - an AXFR
- response to an IXFR request can only be detected when the first two (2)
- response RRs have already been received.
- The following diagram summarizes the state transition. After sending
- the query, xfrin starts the process with the InitialSOA state (all
- IXFR/AXFR response begins with an SOA). When it reaches IXFREnd
- or AXFREnd, the process successfully completes.
- (AXFR or
- (recv SOA) AXFR-style IXFR) (SOA, add)
- InitialSOA------->FirstData------------->AXFR--------->AXFREnd
- | | | ^ (post xfr
- |(IXFR && | | | checks, then
- | recv SOA | +--+ commit)
- | not new) | (non SOA, add)
- V |
- IXFRUptodate | (non SOA, delete)
- (pure IXFR,| +-------+
- keep handling)| (Delete SOA) V |
- + ->IXFRDeleteSOA------>IXFRDelete--+
- ^ |
- (see SOA, not end, | (see SOA)|
- commit, keep handling) | |
- | V
- +---------IXFRAdd<----------+IXFRAddSOA
- (non SOA, add)| ^ | (Add SOA)
- ----------+ |
- |(see SOA w/ end serial, commit changes)
- V
- IXFREnd
- Note that changes are committed for every "difference sequence"
- (i.e. changes for one SOA update). This means when an IXFR response
- contains multiple difference sequences and something goes wrong
- after several commits, these changes have been published and visible
- to clients even if the IXFR session is subsequently aborted.
- It is not clear if this is valid in terms of the protocol specification.
- Section 4 of RFC 1995 states:
- An IXFR client, should only replace an older version with a newer
- version after all the differences have been successfully processed.
- If this "replacement" is for the changes of one difference sequence
- and "all the differences" mean the changes for that sequence, this
- implementation strictly follows what RFC states. If this is for
- the entire IXFR response (that may contain multiple sequences),
- we should implement it with one big transaction and one final commit
- at the very end.
- For now, we implement it with multiple smaller commits for two
- reasons. First, this is what BIND 9 does, and we generally port
- the implementation logic here. BIND 9 has been supporting IXFR
- for many years, so the fact that it still behaves this way
- probably means it at least doesn't cause a severe operational
- problem in practice. Second, especially because BIND 10 would
- often uses a database backend, a larger transaction could cause an
- undesirable effects, e.g. suspending normal lookups for a longer
- period depending on the characteristics of the database. Even if
- we find something wrong in a later sequeunce and abort the
- session, we can start another incremental update from what has
- been validated, or we can switch to AXFR to replace the zone
- completely.
- This implementation uses the state design pattern, where each state
- is represented as a subclass of the base XfrinState class. Each concrete
- subclass of XfrinState is assumed to define two methods: handle_rr() and
- finish_message(). These methods handle specific part of XFR protocols
- and (if necessary) perform the state transition.
- Conceptually, XfrinState and its subclasses are a "friend" of
- XfrinConnection and are assumed to be allowed to access its internal
- information (even though Python does not have a strict access control
- between different classes).
- The XfrinState and its subclasses are designed to be stateless, and
- can be used as singleton objects. For now, however, we always instantiate
- a new object for every state transition, partly because the introduction
- of singleton will make a code bit complicated, and partly because
- the overhead of object instantiotion wouldn't be significant for xfrin.
- '''
- def set_xfrstate(self, conn, new_state):
- '''Set the XfrConnection to a given new state.
- As a "friend" class, this method intentionally gets access to the
- connection's "private" method.
- '''
- conn._XfrinConnection__set_xfrstate(new_state)
- def handle_rr(self, conn):
- '''Handle one RR of an XFR response message.
- Depending on the state, the RR is generally added or deleted in the
- corresponding data source, or in some special cases indicates
- a specifi transition, such as starting a new IXFR difference
- sequence or completing the session.
- All subclass has their specific behaviors for this method, so
- there is no default definition. If the base class version
- is called, it's a bug of the caller, and it's notified via
- an XfrinException exception.
- This method returns a boolean value: True if the given RR was
- fully handled and the caller should go to the next RR; False
- if the caller needs to call this method with the (possibly) new
- state for the same RR again.
- '''
- raise XfrinException("Internal bug: " +
- "XfrinState.handle_rr() called directly")
- def finish_message(self, conn):
- '''Perform any final processing after handling all RRs of a response.
- This method then returns a boolean indicating whether to continue
- receiving the message. Unless it's in the end of the entire XFR
- session, we should continue, so this default method simply returns
- True.
- '''
- return True
- class XfrinInitialSOA(XfrinState):
- def handle_rr(self, conn, rr):
- if rr.get_type() != RRType.SOA():
- raise XfrinProtocolError('First RR in zone transfer must be SOA ('
- + rr.get_type().to_text() + ' received)')
- conn._end_serial = get_soa_serial(rr.get_rdata()[0])
- if conn._request_type == RRType.IXFR() and \
- conn._end_serial <= conn._request_serial:
- logger.info(XFRIN_IXFR_UPTODATE, conn.zone_str(),
- conn._request_serial, conn._end_serial)
- self.set_xfrstate(conn, XfrinIXFRUptodate())
- else:
- self.set_xfrstate(conn, XfrinFirstData())
- return True
- class XfrinFirstData(XfrinState):
- def handle_rr(self, conn, rr):
- '''Handle the first RR after initial SOA in an XFR session.
- This state happens exactly once in an XFR session, where
- we decide whether it's incremental update ("real" IXFR) or
- non incremental update (AXFR or AXFR-style IXFR).
- If we initiated IXFR and the transfer begins with two SOAs
- (the serial of the second one being equal to our serial),
- it's incremental; otherwise it's non incremental.
- This method always return False (unlike many other handle_rr()
- methods) because this first RR must be examined again in the
- determined update context.
- Note that in the non incremental case the RR should normally be
- something other SOA, but it's still possible it's an SOA with a
- different serial than ours. The only possible interpretation at
- this point is that it's non incremental update that only consists
- of the SOA RR. It will result in broken zone (for example, it
- wouldn't even contain an apex NS) and should be rejected at post
- XFR processing, but in terms of the XFR session processing we
- accept it and move forward.
- Note further that, in the half-broken SOA-only transfer case,
- these two SOAs are supposed to be the same as stated in Section 2.2
- of RFC 5936. We don't check that condition here, either; we'll
- leave whether and how to deal with that situation to the end of
- the processing of non incremental update. See also a related
- discussion at the IETF dnsext wg:
- http://www.ietf.org/mail-archive/web/dnsext/current/msg07908.html
- '''
- if conn._request_type == RRType.IXFR() and \
- rr.get_type() == RRType.SOA() and \
- conn._request_serial == get_soa_serial(rr.get_rdata()[0]):
- logger.debug(DBG_XFRIN_TRACE, XFRIN_GOT_INCREMENTAL_RESP,
- conn.zone_str())
- self.set_xfrstate(conn, XfrinIXFRDeleteSOA())
- else:
- logger.debug(DBG_XFRIN_TRACE, XFRIN_GOT_NONINCREMENTAL_RESP,
- conn.zone_str())
- # We are now going to add RRs to the new zone. We need create
- # a Diff object. It will be used throughtout the XFR session.
- conn._diff = Diff(conn._datasrc_client, conn._zone_name, True)
- self.set_xfrstate(conn, XfrinAXFR())
- return False
- class XfrinIXFRDeleteSOA(XfrinState):
- def handle_rr(self, conn, rr):
- if rr.get_type() != RRType.SOA():
- # this shouldn't happen; should this occur it means an internal
- # bug.
- raise XfrinException(rr.get_type().to_text() +
- ' RR is given in IXFRDeleteSOA state')
- # This is the beginning state of one difference sequence (changes
- # for one SOA update). We need to create a new Diff object now.
- # Note also that we (unconditionally) enable journaling here. The
- # Diff constructor may internally disable it, however, if the
- # underlying data source doesn't support journaling.
- conn._diff = Diff(conn._datasrc_client, conn._zone_name, False, True)
- conn._diff.delete_data(rr)
- self.set_xfrstate(conn, XfrinIXFRDelete())
- conn.get_transfer_stats().ixfr_deletion_count += 1
- return True
- class XfrinIXFRDelete(XfrinState):
- def handle_rr(self, conn, rr):
- if rr.get_type() == RRType.SOA():
- # This is the only place where current_serial is set
- conn._current_serial = get_soa_serial(rr.get_rdata()[0])
- self.set_xfrstate(conn, XfrinIXFRAddSOA())
- return False
- conn._diff.delete_data(rr)
- conn.get_transfer_stats().ixfr_deletion_count += 1
- return True
- class XfrinIXFRAddSOA(XfrinState):
- def handle_rr(self, conn, rr):
- if rr.get_type() != RRType.SOA():
- # this shouldn't happen; should this occur it means an internal
- # bug.
- raise XfrinException(rr.get_type().to_text() +
- ' RR is given in IXFRAddSOA state')
- conn._diff.add_data(rr)
- self.set_xfrstate(conn, XfrinIXFRAdd())
- conn.get_transfer_stats().ixfr_addition_count += 1
- return True
- class XfrinIXFRAdd(XfrinState):
- def handle_rr(self, conn, rr):
- if rr.get_type() == RRType.SOA():
- # This SOA marks the end of a difference sequence
- conn.get_transfer_stats().ixfr_changeset_count += 1
- soa_serial = get_soa_serial(rr.get_rdata()[0])
- if soa_serial == conn._end_serial:
- conn._diff.commit()
- self.set_xfrstate(conn, XfrinIXFREnd())
- return True
- elif soa_serial != conn._current_serial:
- raise XfrinProtocolError('IXFR out of sync: expected ' +
- 'serial ' +
- str(conn._current_serial) +
- ', got ' + str(soa_serial))
- else:
- conn._diff.commit()
- self.set_xfrstate(conn, XfrinIXFRDeleteSOA())
- return False
- conn._diff.add_data(rr)
- conn.get_transfer_stats().ixfr_addition_count += 1
- return True
- class XfrinIXFREnd(XfrinState):
- def handle_rr(self, conn, rr):
- raise XfrinProtocolError('Extra data after the end of IXFR diffs: ' +
- rr.to_text())
- def finish_message(self, conn):
- '''Final processing after processing an entire IXFR session.
- There will be more actions here, but for now we simply return False,
- indicating there will be no more message to receive.
- '''
- return False
- class XfrinIXFRUptodate(XfrinState):
- def handle_rr(self, conn, rr):
- raise XfrinProtocolError('Extra data after single IXFR response ' +
- rr.to_text())
- def finish_message(self, conn):
- raise XfrinZoneUptodate
- class XfrinAXFR(XfrinState):
- def handle_rr(self, conn, rr):
- """
- Handle the RR by putting it into the zone.
- """
- conn._diff.add_data(rr)
- if rr.get_type() == RRType.SOA():
- # SOA means end. Don't commit it yet - we need to perform
- # post-transfer checks
- soa_serial = get_soa_serial(rr.get_rdata()[0])
- if conn._end_serial != soa_serial:
- logger.warn(XFRIN_AXFR_INCONSISTENT_SOA, conn.zone_str(),
- conn._end_serial, soa_serial)
- self.set_xfrstate(conn, XfrinAXFREnd())
- conn.get_transfer_stats().axfr_rr_count += 1
- # Yes, we've eaten this RR.
- return True
- class XfrinAXFREnd(XfrinState):
- def handle_rr(self, conn, rr):
- raise XfrinProtocolError('Extra data after the end of AXFR: ' +
- rr.to_text())
- def finish_message(self, conn):
- """
- Final processing after processing an entire AXFR session.
- In this process all the AXFR changes are committed to the
- data source.
- There might be more actions here, but for now we simply return False,
- indicating there will be no more message to receive.
- """
- conn._diff.commit()
- return False
- class XfrinTransferStats:
- """
- This class keeps a record of transfer data for logging purposes.
- It records number of messages, rrs, and bytes transfered, as well
- as the start and end time. The start time is set upon instantiation of
- this class. The end time is set the first time finalize(),
- get_running_time(), or get_bytes_per_second() is called. The end time is
- set only once; subsequent calls to any of these methods does not modify
- it further.
- All _count instance variables can be directly set as needed by the
- class collecting these results.
- """
- def __init__(self):
- self.message_count = 0
- self.axfr_rr_count = 0
- self.byte_count = 0
- self.ixfr_changeset_count = 0;
- self.ixfr_deletion_count = 0;
- self.ixfr_addition_count = 0;
- self._start_time = time.time()
- self._end_time = None
- def finalize(self):
- """Sets the end time to time.time() if not done already."""
- if self._end_time is None:
- self._end_time = time.time()
- def get_running_time(self):
- """Calls finalize(), then returns the difference between creation
- and finalization time"""
- self.finalize()
- return self._end_time - self._start_time
- def get_bytes_per_second(self):
- """Returns the number of bytes per second, based on the result of
- get_running_time() and the value of bytes_count."""
- runtime = self.get_running_time()
- if runtime > 0.0:
- return float(self.byte_count) / runtime
- else:
- # This should never happen, but if some clock is so
- # off or reset in the meantime, we do need to return
- # *something* (and not raise an error)
- if self.byte_count == 0:
- return 0.0
- else:
- return float("inf")
- class XfrinConnection(asyncore.dispatcher):
- '''Do xfrin in this class. '''
- def __init__(self,
- sock_map, zone_name, rrclass, datasrc_client,
- shutdown_event, master_addrinfo, db_file, tsig_key=None,
- idle_timeout=60):
- '''Constructor of the XfirnConnection class.
- db_file: SQLite3 DB file. Unforutnately we still need this for
- temporary workaround in _get_zone_soa(). This should be
- removed when we eliminate the need for the workaround.
- idle_timeout: max idle time for read data from socket.
- datasrc_client: the data source client object used for the XFR session.
- This will eventually replace db_file completely.
- '''
- asyncore.dispatcher.__init__(self, map=sock_map)
- # The XFR state. Conceptually this is purely private, so we emphasize
- # the fact by the double underscore. Other classes are assumed to
- # get access to this via get_xfrstate(), and only XfrinState classes
- # are assumed to be allowed to modify it via __set_xfrstate().
- self.__state = None
- # Requested transfer type (RRType.AXFR or RRType.IXFR). The actual
- # transfer type may differ due to IXFR->AXFR fallback:
- self._request_type = None
- # Zone parameters
- self._zone_name = zone_name
- self._rrclass = rrclass
- # Data source handler
- self._db_file = db_file
- self._datasrc_client = datasrc_client
- self._zone_soa = self._get_zone_soa()
- self._sock_map = sock_map
- self._soa_rr_count = 0
- self._idle_timeout = idle_timeout
- self._shutdown_event = shutdown_event
- self._master_addrinfo = master_addrinfo
- self._tsig_key = tsig_key
- self._tsig_ctx = None
- # tsig_ctx_creator is introduced to allow tests to use a mock class for
- # easier tests (in normal case we always use the default)
- self._tsig_ctx_creator = lambda key : TSIGContext(key)
- # keep a record of this specific transfer to log on success
- # (time, rr/s, etc)
- self._transfer_stats = XfrinTransferStats()
- def init_socket(self):
- '''Initialize the underlyig socket.
- This is essentially a part of __init__() and is expected to be
- called immediately after the constructor. It's separated from
- the constructor because otherwise we might not be able to close
- it if the constructor raises an exception after opening the socket.
- '''
- self.create_socket(self._master_addrinfo[0], self._master_addrinfo[1])
- self.setblocking(1)
- def _get_zone_soa(self):
- '''Retrieve the current SOA RR of the zone to be transferred.
- It will be used for various purposes in subsequent xfr protocol
- processing. It is validly possible that the zone is currently
- empty and therefore doesn't have an SOA, so this method doesn't
- consider it an error and returns None in such a case. It may or
- may not result in failure in the actual processing depending on
- how the SOA is used.
- When the zone has an SOA RR, this method makes sure that it's
- valid, i.e., it has exactly one RDATA; if it is not the case
- this method returns None.
- If the underlying data source doesn't even know the zone, this method
- tries to provide backward compatible behavior where xfrin is
- responsible for creating zone in the corresponding DB table.
- For a longer term we should deprecate this behavior by introducing
- more generic zone management framework, but at the moment we try
- to not surprise existing users. (Note also that the part of
- providing the compatible behavior uses the old data source API.
- We'll deprecate this API in a near future, too).
- '''
- # get the zone finder. this must be SUCCESS (not even
- # PARTIALMATCH) because we are specifying the zone origin name.
- result, finder = self._datasrc_client.find_zone(self._zone_name)
- if result != DataSourceClient.SUCCESS:
- # The data source doesn't know the zone. For now, we provide
- # backward compatibility and creates a new one ourselves.
- isc.datasrc.sqlite3_ds.load(self._db_file,
- self._zone_name.to_text(),
- lambda : [])
- logger.warn(XFRIN_ZONE_CREATED, self.zone_str())
- # try again
- result, finder = self._datasrc_client.find_zone(self._zone_name)
- if result != DataSourceClient.SUCCESS:
- return None
- result, soa_rrset, _ = finder.find(self._zone_name, RRType.SOA())
- if result != ZoneFinder.SUCCESS:
- logger.info(XFRIN_ZONE_NO_SOA, self.zone_str())
- return None
- if soa_rrset.get_rdata_count() != 1:
- logger.warn(XFRIN_ZONE_MULTIPLE_SOA, self.zone_str(),
- soa_rrset.get_rdata_count())
- return None
- return soa_rrset
- def __set_xfrstate(self, new_state):
- self.__state = new_state
- def get_xfrstate(self):
- return self.__state
- def get_transfer_stats(self):
- """Returns the transfer stats object, used to measure transfer time,
- and number of messages/records/bytes transfered."""
- return self._transfer_stats
- def zone_str(self):
- '''A convenience function for logging to include zone name and class'''
- return format_zone_str(self._zone_name, self._rrclass)
- def connect_to_master(self):
- '''Connect to master in TCP.'''
- try:
- self.connect(self._master_addrinfo[2])
- return True
- except socket.error as e:
- logger.error(XFRIN_CONNECT_MASTER, self._master_addrinfo[2],
- str(e))
- return False
- def _create_query(self, query_type):
- '''Create an XFR-related query message.
- query_type is either SOA, AXFR or IXFR. An IXFR query needs the
- zone's current SOA record. If it's not known, it raises an
- XfrinException exception. Note that this may not necessarily a
- broken configuration; for the first attempt of transfer the secondary
- may not have any boot-strap zone information, in which case IXFR
- simply won't work. The xfrin should then fall back to AXFR.
- _request_serial is recorded for later use.
- '''
- msg = Message(Message.RENDER)
- query_id = random.randint(0, 0xFFFF)
- self._query_id = query_id
- msg.set_qid(query_id)
- msg.set_opcode(Opcode.QUERY())
- msg.set_rcode(Rcode.NOERROR())
- msg.add_question(Question(self._zone_name, self._rrclass, query_type))
- # Remember our serial, if known
- self._request_serial = get_soa_serial(self._zone_soa.get_rdata()[0]) \
- if self._zone_soa is not None else None
- # Set the authority section with our SOA for IXFR
- if query_type == RRType.IXFR():
- if self._zone_soa is None:
- # (incremental) IXFR doesn't work without known SOA
- raise XfrinException('Failed to create IXFR query due to no ' +
- 'SOA for ' + self.zone_str())
- msg.add_rrset(Message.SECTION_AUTHORITY, self._zone_soa)
- return msg
- def _send_data(self, data):
- size = len(data)
- total_count = 0
- while total_count < size:
- count = self.send(data[total_count:])
- total_count += count
- def _send_query(self, query_type):
- '''Send query message over TCP. '''
- msg = self._create_query(query_type)
- render = MessageRenderer()
- # XXX Currently, python wrapper doesn't accept 'None' parameter in this case,
- # we should remove the if statement and use a universal interface later.
- if self._tsig_key is not None:
- self._tsig_ctx = self._tsig_ctx_creator(self._tsig_key)
- msg.to_wire(render, self._tsig_ctx)
- else:
- msg.to_wire(render)
- header_len = struct.pack('H', socket.htons(render.get_length()))
- self._send_data(header_len)
- self._send_data(render.get_data())
- def _asyncore_loop(self):
- '''
- This method is a trivial wrapper for asyncore.loop(). It's extracted from
- _get_request_response so that we can test the rest of the code without
- involving actual communication with a remote server.'''
- asyncore.loop(self._idle_timeout, map=self._sock_map, count=1)
- def _get_request_response(self, size):
- recv_size = 0
- data = b''
- while recv_size < size:
- self._recv_time_out = True
- self._need_recv_size = size - recv_size
- self._asyncore_loop()
- if self._recv_time_out:
- raise XfrinException('receive data from socket time out.')
- recv_size += self._recvd_size
- data += self._recvd_data
- return data
- def _check_response_tsig(self, msg, response_data):
- tsig_record = msg.get_tsig_record()
- if self._tsig_ctx is not None:
- tsig_error = self._tsig_ctx.verify(tsig_record, response_data)
- if tsig_error != TSIGError.NOERROR:
- raise XfrinProtocolError('TSIG verify fail: %s' %
- str(tsig_error))
- elif tsig_record is not None:
- # If the response includes a TSIG while we didn't sign the query,
- # we treat it as an error. RFC doesn't say anything about this
- # case, but it clearly states the server must not sign a response
- # to an unsigned request. Although we could be flexible, no sane
- # implementation would return such a response, and since this is
- # part of security mechanism, it's probably better to be more
- # strict.
- raise XfrinProtocolError('Unexpected TSIG in response')
- def __parse_soa_response(self, msg, response_data):
- '''Parse a response to SOA query and extract the SOA from answer.
- This is a subroutine of _check_soa_serial(). This method also
- validates message, and rejects bogus responses with XfrinProtocolError.
- If everything is okay, it returns the SOA RR from the answer section
- of the response.
- '''
- # Check TSIG integrity and validate the header. Unlike AXFR/IXFR,
- # we should be more strict for SOA queries and check the AA flag, too.
- self._check_response_tsig(msg, response_data)
- self._check_response_header(msg)
- if not msg.get_header_flag(Message.HEADERFLAG_AA):
- raise XfrinProtocolError('non-authoritative answer to SOA query')
- # Validate the question section
- n_question = msg.get_rr_count(Message.SECTION_QUESTION)
- if n_question != 1:
- raise XfrinProtocolError('Invalid response to SOA query: ' +
- '(' + str(n_question) + ' questions, 1 ' +
- 'expected)')
- resp_question = msg.get_question()[0]
- if resp_question.get_name() != self._zone_name or \
- resp_question.get_class() != self._rrclass or \
- resp_question.get_type() != RRType.SOA():
- raise XfrinProtocolError('Invalid response to SOA query: '
- 'question mismatch: ' +
- str(resp_question))
- # Look into the answer section for SOA
- soa = None
- for rr in msg.get_section(Message.SECTION_ANSWER):
- if rr.get_type() == RRType.SOA():
- if soa is not None:
- raise XfrinProtocolError('SOA response had multiple SOAs')
- soa = rr
- # There should not be a CNAME record at top of zone.
- if rr.get_type() == RRType.CNAME():
- raise XfrinProtocolError('SOA query resulted in CNAME')
- # If SOA is not found, try to figure out the reason then report it.
- if soa is None:
- # See if we have any SOA records in the authority section.
- for rr in msg.get_section(Message.SECTION_AUTHORITY):
- if rr.get_type() == RRType.NS():
- raise XfrinProtocolError('SOA query resulted in referral')
- if rr.get_type() == RRType.SOA():
- raise XfrinProtocolError('SOA query resulted in NODATA')
- raise XfrinProtocolError('No SOA record found in response to ' +
- 'SOA query')
- # Check if the SOA is really what we asked for
- if soa.get_name() != self._zone_name or \
- soa.get_class() != self._rrclass:
- raise XfrinProtocolError("SOA response doesn't match query: " +
- str(soa))
- # All okay, return it
- return soa
- def _check_soa_serial(self):
- '''Send SOA query and compare the local and remote serials.
- If we know our local serial and the remote serial isn't newer
- than ours, we abort the session with XfrinZoneUptodate.
- On success it returns XFRIN_OK for testing. The caller won't use it.
- '''
- self._send_query(RRType.SOA())
- data_len = self._get_request_response(2)
- msg_len = socket.htons(struct.unpack('H', data_len)[0])
- soa_response = self._get_request_response(msg_len)
- msg = Message(Message.PARSE)
- msg.from_wire(soa_response, Message.PRESERVE_ORDER)
- # Validate/parse the rest of the response, and extract the SOA
- # from the answer section
- soa = self.__parse_soa_response(msg, soa_response)
- # Compare the two serials. If ours is 'new', abort with ZoneUptodate.
- primary_serial = get_soa_serial(soa.get_rdata()[0])
- if self._request_serial is not None and \
- self._request_serial >= primary_serial:
- if self._request_serial != primary_serial:
- logger.info(XFRIN_ZONE_SERIAL_AHEAD, primary_serial,
- self.zone_str(),
- format_addrinfo(self._master_addrinfo),
- self._request_serial)
- raise XfrinZoneUptodate
- return XFRIN_OK
- def do_xfrin(self, check_soa, request_type=RRType.AXFR()):
- '''Do an xfr session by sending xfr request and parsing responses.'''
- try:
- ret = XFRIN_OK
- self._request_type = request_type
- # Right now RRType.[IA]XFR().to_text() is 'TYPExxx', so we need
- # to hardcode here.
- req_str = 'IXFR' if request_type == RRType.IXFR() else 'AXFR'
- if check_soa:
- self._check_soa_serial()
- logger.info(XFRIN_XFR_TRANSFER_STARTED, req_str, self.zone_str())
- self._send_query(self._request_type)
- self.__state = XfrinInitialSOA()
- self._handle_xfrin_responses()
- # Depending what data was found, we log different status reports
- # (In case of an AXFR-style IXFR, print the 'AXFR' message)
- if self._transfer_stats.axfr_rr_count == 0:
- logger.info(XFRIN_IXFR_TRANSFER_SUCCESS,
- self.zone_str(),
- self._transfer_stats.message_count,
- self._transfer_stats.ixfr_changeset_count,
- self._transfer_stats.ixfr_deletion_count,
- self._transfer_stats.ixfr_addition_count,
- self._transfer_stats.byte_count,
- "%.3f" % self._transfer_stats.get_running_time(),
- "%.f" % self._transfer_stats.get_bytes_per_second()
- )
- else:
- logger.info(XFRIN_TRANSFER_SUCCESS,
- req_str,
- self.zone_str(),
- self._transfer_stats.message_count,
- self._transfer_stats.axfr_rr_count,
- self._transfer_stats.byte_count,
- "%.3f" % self._transfer_stats.get_running_time(),
- "%.f" % self._transfer_stats.get_bytes_per_second()
- )
- except XfrinZoneUptodate:
- # Eventually we'll probably have to treat this case as a trigger
- # of trying another primary server, etc, but for now we treat it
- # as "success".
- pass
- except XfrinProtocolError as e:
- logger.info(XFRIN_XFR_TRANSFER_PROTOCOL_ERROR, req_str,
- self.zone_str(),
- format_addrinfo(self._master_addrinfo), str(e))
- ret = XFRIN_FAIL
- except XfrinException as e:
- logger.error(XFRIN_XFR_TRANSFER_FAILURE, req_str,
- self.zone_str(),
- format_addrinfo(self._master_addrinfo), str(e))
- ret = XFRIN_FAIL
- except Exception as e:
- # Catching all possible exceptions like this is generally not a
- # good practice, but handling an xfr session could result in
- # so many types of exceptions, including ones from the DNS library
- # or from the data source library. Eventually we'd introduce a
- # hierarchy for exception classes from a base "ISC exception" and
- # catch it here, but until then we need broadest coverage so that
- # we won't miss anything.
- logger.error(XFRIN_XFR_OTHER_FAILURE, req_str,
- self.zone_str(), str(e))
- ret = XFRIN_FAIL
- finally:
- # Make sure any remaining transaction in the diff is closed
- # (if not yet - possible in case of xfr-level exception) as soon
- # as possible
- self._diff = None
- return ret
- def _check_response_header(self, msg):
- '''Perform minimal validation on responses'''
- # It's not clear how strict we should be about response validation.
- # BIND 9 ignores some cases where it would normally be considered a
- # bogus response. For example, it accepts a response even if its
- # opcode doesn't match that of the corresponding request.
- # According to an original developer of BIND 9 some of the missing
- # checks are deliberate to be kind to old implementations that would
- # cause interoperability trouble with stricter checks.
- msg_rcode = msg.get_rcode()
- if msg_rcode != Rcode.NOERROR():
- raise XfrinProtocolError('error response: %s' %
- msg_rcode.to_text())
- if not msg.get_header_flag(Message.HEADERFLAG_QR):
- raise XfrinProtocolError('response is not a response')
- if msg.get_qid() != self._query_id:
- raise XfrinProtocolError('bad query id')
- def _check_response_status(self, msg):
- '''Check validation of xfr response. '''
- self._check_response_header(msg)
- if msg.get_rr_count(Message.SECTION_QUESTION) > 1:
- raise XfrinProtocolError('query section count greater than 1')
- def _handle_xfrin_responses(self):
- read_next_msg = True
- while read_next_msg:
- data_len = self._get_request_response(2)
- msg_len = socket.htons(struct.unpack('H', data_len)[0])
- self._transfer_stats.byte_count += msg_len + 2
- recvdata = self._get_request_response(msg_len)
- msg = Message(Message.PARSE)
- msg.from_wire(recvdata, Message.PRESERVE_ORDER)
- self._transfer_stats.message_count += 1
- # TSIG related checks, including an unexpected signed response
- self._check_response_tsig(msg, recvdata)
- # Perform response status validation
- self._check_response_status(msg)
- for rr in msg.get_section(Message.SECTION_ANSWER):
- rr_handled = False
- while not rr_handled:
- rr_handled = self.__state.handle_rr(self, rr)
- read_next_msg = self.__state.finish_message(self)
- if self._shutdown_event.is_set():
- raise XfrinException('xfrin is forced to stop')
- def handle_read(self):
- '''Read query's response from socket. '''
- self._recvd_data = self.recv(self._need_recv_size)
- self._recvd_size = len(self._recvd_data)
- self._recv_time_out = False
- def writable(self):
- '''Ignore the writable socket. '''
- return False
- def __process_xfrin(server, zone_name, rrclass, db_file,
- shutdown_event, master_addrinfo, check_soa, tsig_key,
- request_type, conn_class):
- conn = None
- exception = None
- ret = XFRIN_FAIL
- try:
- # Create a data source client used in this XFR session. Right now we
- # still assume an sqlite3-based data source, and use both the old and new
- # data source APIs. We also need to use a mock client for tests.
- # For a temporary workaround to deal with these situations, we skip the
- # creation when the given file is none (the test case). Eventually
- # this code will be much cleaner.
- datasrc_client = None
- if db_file is not None:
- # temporary hardcoded sqlite initialization. Once we decide on
- # the config specification, we need to update this (TODO)
- # this may depend on #1207, or any followup ticket created for #1207
- datasrc_type = "sqlite3"
- datasrc_config = "{ \"database_file\": \"" + db_file + "\"}"
- datasrc_client = DataSourceClient(datasrc_type, datasrc_config)
- # Create a TCP connection for the XFR session and perform the operation.
- sock_map = {}
- # In case we were asked to do IXFR and that one fails, we try again with
- # AXFR. But only if we could actually connect to the server.
- #
- # So we start with retry as True, which is set to false on each attempt.
- # In the case of connected but failed IXFR, we set it to true once again.
- retry = True
- while retry:
- retry = False
- conn = conn_class(sock_map, zone_name, rrclass, datasrc_client,
- shutdown_event, master_addrinfo, db_file,
- tsig_key)
- conn.init_socket()
- ret = XFRIN_FAIL
- if conn.connect_to_master():
- ret = conn.do_xfrin(check_soa, request_type)
- if ret == XFRIN_FAIL and request_type == RRType.IXFR():
- # IXFR failed for some reason. It might mean the server can't
- # handle it, or we don't have the zone or we are out of sync or
- # whatever else. So we retry with with AXFR, as it may succeed
- # in many such cases.
- retry = True
- request_type = RRType.AXFR()
- logger.warn(XFRIN_XFR_TRANSFER_FALLBACK, conn.zone_str())
- conn.close()
- conn = None
- except Exception as ex:
- # If exception happens, just remember it here so that we can re-raise
- # after cleaning up things. We don't log it here because we want
- # eliminate smallest possibility of having an exception in logging
- # itself.
- exception = ex
- # asyncore.dispatcher requires explicit close() unless its lifetime
- # from born to destruction is closed within asyncore.loop, which is not
- # the case for us. We always close() here, whether or not do_xfrin
- # succeeds, and even when we see an unexpected exception.
- if conn is not None:
- conn.close()
- # Publish the zone transfer result news, so zonemgr can reset the
- # zone timer, and xfrout can notify the zone's slaves if the result
- # is success.
- server.publish_xfrin_news(zone_name, rrclass, ret)
- if exception is not None:
- raise exception
- def process_xfrin(server, xfrin_recorder, zone_name, rrclass, db_file,
- shutdown_event, master_addrinfo, check_soa, tsig_key,
- request_type, conn_class=XfrinConnection):
- # Even if it should be rare, the main process of xfrin session can
- # raise an exception. In order to make sure the lock in xfrin_recorder
- # is released in any cases, we delegate the main part to the helper
- # function in the try block, catch any exceptions, then release the lock.
- xfrin_recorder.increment(zone_name)
- exception = None
- try:
- __process_xfrin(server, zone_name, rrclass, db_file,
- shutdown_event, master_addrinfo, check_soa, tsig_key,
- request_type, conn_class)
- except Exception as ex:
- # don't log it until we complete decrement().
- exception = ex
- xfrin_recorder.decrement(zone_name)
- if exception is not None:
- typestr = "AXFR" if request_type == RRType.AXFR() else "IXFR"
- logger.error(XFRIN_XFR_PROCESS_FAILURE, typestr, zone_name.to_text(),
- str(rrclass), str(exception))
- class XfrinRecorder:
- def __init__(self):
- self._lock = threading.Lock()
- self._zones = []
- def increment(self, zone_name):
- self._lock.acquire()
- self._zones.append(zone_name)
- self._lock.release()
- def decrement(self, zone_name):
- self._lock.acquire()
- if zone_name in self._zones:
- self._zones.remove(zone_name)
- self._lock.release()
- def xfrin_in_progress(self, zone_name):
- self._lock.acquire()
- ret = zone_name in self._zones
- self._lock.release()
- return ret
- def count(self):
- self._lock.acquire()
- ret = len(self._zones)
- self._lock.release()
- return ret
- class ZoneInfo:
- def __init__(self, config_data, module_cc):
- """Creates a zone_info with the config data element as
- specified by the 'zones' list in xfrin.spec. Module_cc is
- needed to get the defaults from the specification"""
- self._module_cc = module_cc
- self.set_name(config_data.get('name'))
- self.set_master_addr(config_data.get('master_addr'))
- self.set_master_port(config_data.get('master_port'))
- self.set_zone_class(config_data.get('class'))
- self.set_tsig_key(config_data.get('tsig_key'))
- self.set_use_ixfr(config_data.get('use_ixfr'))
- def set_name(self, name_str):
- """Set the name for this zone given a name string.
- Raises XfrinZoneInfoException if name_str is None or if it
- cannot be parsed."""
- if name_str is None:
- raise XfrinZoneInfoException("Configuration zones list "
- "element does not contain "
- "'name' attribute")
- else:
- self.name = _check_zone_name(name_str)
- def set_master_addr(self, master_addr_str):
- """Set the master address for this zone given an IP address
- string. Raises XfrinZoneInfoException if master_addr_str is
- None or if it cannot be parsed."""
- if master_addr_str is None:
- raise XfrinZoneInfoException("master address missing from config data")
- else:
- try:
- self.master_addr = isc.net.parse.addr_parse(master_addr_str)
- except ValueError:
- logger.error(XFRIN_BAD_MASTER_ADDR_FORMAT, master_addr_str)
- errmsg = "bad format for zone's master: " + master_addr_str
- raise XfrinZoneInfoException(errmsg)
- def set_master_port(self, master_port_str):
- """Set the master port given a port number string. If
- master_port_str is None, the default from the specification
- for this module will be used. Raises XfrinZoneInfoException if
- the string contains an invalid port number"""
- if master_port_str is None:
- self.master_port = self._module_cc.get_default_value("zones/master_port")
- else:
- try:
- self.master_port = isc.net.parse.port_parse(master_port_str)
- except ValueError:
- logger.error(XFRIN_BAD_MASTER_PORT_FORMAT, master_port_str)
- errmsg = "bad format for zone's master port: " + master_port_str
- raise XfrinZoneInfoException(errmsg)
- def set_zone_class(self, zone_class_str):
- """Set the zone class given an RR class str (e.g. "IN"). If
- zone_class_str is None, it will default to what is specified
- in the specification file for this module. Raises
- XfrinZoneInfoException if the string cannot be parsed."""
- # TODO: remove _str
- self.class_str = zone_class_str or self._module_cc.get_default_value("zones/class")
- if zone_class_str == None:
- #TODO rrclass->zone_class
- self.rrclass = RRClass(self._module_cc.get_default_value("zones/class"))
- else:
- try:
- self.rrclass = RRClass(zone_class_str)
- except InvalidRRClass:
- logger.error(XFRIN_BAD_ZONE_CLASS, zone_class_str)
- errmsg = "invalid zone class: " + zone_class_str
- raise XfrinZoneInfoException(errmsg)
- def set_tsig_key(self, tsig_key_str):
- """Set the tsig_key for this zone, given a TSIG key string
- representation. If tsig_key_str is None, no TSIG key will
- be set. Raises XfrinZoneInfoException if tsig_key_str cannot
- be parsed."""
- if tsig_key_str is None:
- self.tsig_key = None
- else:
- try:
- self.tsig_key = TSIGKey(tsig_key_str)
- except InvalidParameter as ipe:
- logger.error(XFRIN_BAD_TSIG_KEY_STRING, tsig_key_str)
- errmsg = "bad TSIG key string: " + tsig_key_str
- raise XfrinZoneInfoException(errmsg)
- def set_use_ixfr(self, use_ixfr):
- """Set use_ixfr. If set to True, it will use
- IXFR for incoming transfers. If set to False, it will use AXFR.
- At this moment there is no automatic fallback"""
- # TODO: http://bind10.isc.org/ticket/1279
- if use_ixfr is None:
- self.use_ixfr = \
- self._module_cc.get_default_value("zones/use_ixfr")
- else:
- self.use_ixfr = use_ixfr
- def get_master_addr_info(self):
- return (self.master_addr.family, socket.SOCK_STREAM,
- (str(self.master_addr), self.master_port))
- def _do_auth_loadzone(server, zone_name, zone_class):
- # On a successful zone transfer, if the zone is served by
- # b10-auth in the in-memory data source using sqlite3 as a
- # backend, send the "loadzone" command for the zone to auth.
- datasources, is_default =\
- server._module_cc.get_remote_config_value(AUTH_MODULE_NAME, "datasources")
- if is_default:
- return
- for d in datasources:
- if "type" not in d:
- continue
- try:
- if "class" in d:
- dclass = RRClass(d["class"])
- else:
- dclass = RRClass("IN")
- except InvalidRRClass as err:
- logger.info(XFRIN_AUTH_CONFIG_RRCLASS_ERROR, str(err))
- continue
- if d["type"].lower() == "memory" and dclass == zone_class:
- for zone in d["zones"]:
- if "filetype" not in zone:
- continue
- if "origin" not in zone:
- continue
- if "filetype" not in zone:
- continue
- try:
- name = Name(zone["origin"])
- except (EmptyLabel, TooLongLabel, BadLabelType, BadEscape, TooLongName, IncompleteName):
- logger.info(XFRIN_AUTH_CONFIG_NAME_PARSER_ERROR, str(err))
- continue
- if zone["filetype"].lower() == "sqlite3" and name == zone_name:
- param = {"origin": zone_name.to_text(),
- "class": zone_class.to_text(),
- "datasrc": d["type"]}
- logger.debug(DBG_XFRIN_TRACE, XFRIN_AUTH_LOADZONE,
- param["origin"], param["class"], param["datasrc"])
- msg = create_command("loadzone", param)
- seq = server._send_cc_session.group_sendmsg(msg, AUTH_MODULE_NAME)
- answer, env = server._send_cc_session.group_recvmsg(False, seq)
- class Xfrin:
- def __init__(self):
- self._max_transfers_in = 10
- self._zones = {}
- # This is a set of (zone/class) tuples (both as strings),
- # representing the in-memory zones maintaned by Xfrin. It
- # is used to trigger Auth/in-memory so that it reloads
- # zones when they have been transfered in
- self._memory_zones = set()
- self._cc_setup()
- self.recorder = XfrinRecorder()
- self._shutdown_event = threading.Event()
- def _cc_setup(self):
- '''This method is used only as part of initialization, but is
- implemented separately for convenience of unit tests; by letting
- the test code override this method we can test most of this class
- without requiring a command channel.'''
- # Create one session for sending command to other modules, because the
- # listening session will block the send operation.
- self._send_cc_session = isc.cc.Session()
- self._module_cc = isc.config.ModuleCCSession(SPECFILE_LOCATION,
- self.config_handler,
- self.command_handler)
- self._module_cc.start()
- config_data = self._module_cc.get_full_config()
- self.config_handler(config_data)
- self._module_cc.add_remote_config(AUTH_SPECFILE_LOCATION,
- self._auth_config_handler)
- def _cc_check_command(self):
- '''This is a straightforward wrapper for cc.check_command,
- but provided as a separate method for the convenience
- of unit tests.'''
- self._module_cc.check_command(False)
- def _get_zone_info(self, name, rrclass):
- """Returns the ZoneInfo object containing the configured data
- for the given zone name. If the zone name did not have any
- data, returns None"""
- return self._zones.get((name.to_text(), rrclass.to_text()))
- def _add_zone_info(self, zone_info):
- """Add the zone info. Raises a XfrinZoneInfoException if a zone
- with the same name and class is already configured"""
- key = (zone_info.name.to_text(), zone_info.class_str)
- if key in self._zones:
- raise XfrinZoneInfoException("zone " + str(key) +
- " configured multiple times")
- self._zones[key] = zone_info
- def _clear_zone_info(self):
- self._zones = {}
- def config_handler(self, new_config):
- # backup all config data (should there be a problem in the new
- # data)
- old_max_transfers_in = self._max_transfers_in
- old_zones = self._zones
- self._max_transfers_in = new_config.get("transfers_in") or self._max_transfers_in
- if 'zones' in new_config:
- self._clear_zone_info()
- for zone_config in new_config.get('zones'):
- try:
- zone_info = ZoneInfo(zone_config, self._module_cc)
- self._add_zone_info(zone_info)
- except XfrinZoneInfoException as xce:
- self._zones = old_zones
- self._max_transfers_in = old_max_transfers_in
- return create_answer(1, str(xce))
- return create_answer(0)
- def _auth_config_handler(self, new_config, config_data):
- # Config handler for changes in Auth configuration
- self._set_db_file()
- self._set_memory_zones(new_config, config_data)
- def _clear_memory_zones(self):
- """Clears the memory_zones set; called before processing the
- changed list of memory datasource zones that have file type
- sqlite3"""
- self._memory_zones.clear()
- def _is_memory_zone(self, zone_name_str, zone_class_str):
- """Returns true if the given zone/class combination is configured
- in the in-memory datasource of the Auth process with file type
- 'sqlite3'.
- Note: this method is not thread-safe. We are considering
- changing the threaded model here, but if we do not, take
- care in accessing and updating the memory zone set (or add
- locks)
- """
- # Normalize them first, if either conversion fails, return false
- # (they won't be in the set anyway)
- try:
- zone_name_str = Name(zone_name_str).to_text().lower()
- zone_class_str = RRClass(zone_class_str).to_text()
- except Exception:
- return False
- return (zone_name_str, zone_class_str) in self._memory_zones
- def _set_memory_zones(self, new_config, config_data):
- """Part of the _auth_config_handler function, keeps an internal set
- of zones in the datasources config subset that have 'sqlite3' as
- their file type.
- Note: this method is not thread-safe. We are considering
- changing the threaded model here, but if we do not, take
- care in accessing and updating the memory zone set (or add
- locks)
- """
- # walk through the data and collect the memory zones
- # If this causes any exception, assume we were passed bad data
- # and keep the original set
- new_memory_zones = set()
- try:
- if "datasources" in new_config:
- for datasource in new_config["datasources"]:
- if "class" in datasource:
- ds_class = RRClass(datasource["class"])
- else:
- # Get the default
- ds_class = RRClass(config_data.get_default_value(
- "datasources/class"))
- if datasource["type"] == "memory":
- for zone in datasource["zones"]:
- if "filetype" in zone and \
- zone["filetype"] == "sqlite3":
- zone_name = Name(zone["origin"])
- zone_name_str = zone_name.to_text().lower()
- new_memory_zones.add((zone_name_str,
- ds_class.to_text()))
- # Ok, we can use the data, update our list
- self._memory_zones = new_memory_zones
- except Exception:
- # Something is wrong with the data. If this data even reached us,
- # we cannot do more than assume the real module has logged and
- # reported an error. Keep the old set.
- return
- def shutdown(self):
- ''' shutdown the xfrin process. the thread which is doing xfrin should be
- terminated.
- '''
- self._module_cc.remove_remote_config(AUTH_SPECFILE_LOCATION)
- self._module_cc.send_stopping()
- self._shutdown_event.set()
- main_thread = threading.currentThread()
- for th in threading.enumerate():
- if th is main_thread:
- continue
- th.join()
- def command_handler(self, command, args):
- answer = create_answer(0)
- try:
- if command == 'shutdown':
- self._shutdown_event.set()
- elif command == 'notify' or command == REFRESH_FROM_ZONEMGR:
- # Xfrin receives the refresh/notify command from zone manager.
- # notify command maybe has the parameters which
- # specify the notifyfrom address and port, according the RFC1996, zone
- # transfer should starts first from the notifyfrom, but now, let 'TODO' it.
- # (using the value now, while we can only set one master address, would be
- # a security hole. Once we add the ability to have multiple master addresses,
- # we should check if it matches one of them, and then use it.)
- (zone_name, rrclass) = self._parse_zone_name_and_class(args)
- zone_str = format_zone_str(zone_name, rrclass)
- zone_info = self._get_zone_info(zone_name, rrclass)
- notify_addr = self._parse_master_and_port(args, zone_name,
- rrclass)
- if zone_info is None:
- # TODO what to do? no info known about zone. defaults?
- errmsg = "Got notification to retransfer unknown zone " + zone_str
- logger.info(XFRIN_RETRANSFER_UNKNOWN_ZONE, zone_str)
- answer = create_answer(1, errmsg)
- else:
- request_type = RRType.AXFR()
- if zone_info.use_ixfr:
- request_type = RRType.IXFR()
- master_addr = zone_info.get_master_addr_info()
- if notify_addr[0] == master_addr[0] and\
- notify_addr[2] == master_addr[2]:
- ret = self.xfrin_start(zone_name,
- rrclass,
- self._get_db_file(),
- master_addr,
- zone_info.tsig_key, request_type,
- True)
- answer = create_answer(ret[0], ret[1])
- else:
- notify_addr_str = format_addrinfo(notify_addr)
- master_addr_str = format_addrinfo(master_addr)
- errmsg = "Got notification for " + zone_str\
- + "from unknown address: " + notify_addr_str;
- logger.info(XFRIN_NOTIFY_UNKNOWN_MASTER, zone_str,
- notify_addr_str, master_addr_str)
- answer = create_answer(1, errmsg)
- elif command == 'retransfer' or command == 'refresh':
- # Xfrin receives the retransfer/refresh from cmdctl(sent by bindctl).
- # If the command has specified master address, do transfer from the
- # master address, or else do transfer from the configured masters.
- (zone_name, rrclass) = self._parse_zone_name_and_class(args)
- master_addr = self._parse_master_and_port(args, zone_name,
- rrclass)
- zone_info = self._get_zone_info(zone_name, rrclass)
- tsig_key = None
- request_type = RRType.AXFR()
- if zone_info:
- tsig_key = zone_info.tsig_key
- if zone_info.use_ixfr:
- request_type = RRType.IXFR()
- db_file = args.get('db_file') or self._get_db_file()
- ret = self.xfrin_start(zone_name,
- rrclass,
- db_file,
- master_addr,
- tsig_key, request_type,
- (False if command == 'retransfer' else True))
- answer = create_answer(ret[0], ret[1])
- else:
- answer = create_answer(1, 'unknown command: ' + command)
- except XfrinException as err:
- logger.error(XFRIN_COMMAND_ERROR, command, str(err))
- answer = create_answer(1, str(err))
- return answer
- def _parse_zone_name_and_class(self, args):
- zone_name_str = args.get('zone_name')
- if zone_name_str is None:
- raise XfrinException('zone name should be provided')
- return (_check_zone_name(zone_name_str), _check_zone_class(args.get('zone_class')))
- def _parse_master_and_port(self, args, zone_name, zone_class):
- """
- Return tuple (family, socktype, sockaddr) for address and port in given
- args dict.
- IPv4 and IPv6 are the only supported addresses now, so sockaddr will be
- (address, port). The socktype is socket.SOCK_STREAM for now.
- """
- # check if we have configured info about this zone, in case
- # port or master are not specified
- zone_info = self._get_zone_info(zone_name, zone_class)
- addr_str = args.get('master')
- if addr_str is None:
- if zone_info is not None:
- addr = zone_info.master_addr
- else:
- raise XfrinException("Master address not given or "
- "configured for " + zone_name.to_text())
- else:
- try:
- addr = isc.net.parse.addr_parse(addr_str)
- except ValueError as err:
- raise XfrinException("failed to resolve master address %s: %s" %
- (addr_str, str(err)))
- port_str = args.get('port')
- if port_str is None:
- if zone_info is not None:
- port = zone_info.master_port
- else:
- port = DEFAULT_MASTER_PORT
- else:
- try:
- port = isc.net.parse.port_parse(port_str)
- except ValueError as err:
- raise XfrinException("failed to parse port=%s: %s" %
- (port_str, str(err)))
- return (addr.family, socket.SOCK_STREAM, (str(addr), port))
- def _get_db_file(self):
- return self._db_file
- def _set_db_file(self):
- db_file, is_default =\
- self._module_cc.get_remote_config_value(AUTH_MODULE_NAME, "database_file")
- if is_default and "B10_FROM_BUILD" in os.environ:
- # override the local database setting if it is default and we
- # are running from the source tree
- # This should be hidden inside the data source library and/or
- # done as a configuration, and this special case should be gone).
- db_file = os.environ["B10_FROM_BUILD"] + os.sep +\
- "bind10_zones.sqlite3"
- self._db_file = db_file
- def publish_xfrin_news(self, zone_name, zone_class, xfr_result):
- '''Send command to xfrout/zone manager module.
- If xfrin has finished successfully for one zone, tell the good
- news(command: zone_new_data_ready) to zone manager and xfrout.
- if xfrin failed, just tell the bad news to zone manager, so that
- it can reset the refresh timer for that zone. '''
- param = {'zone_name': zone_name.to_text(),
- 'zone_class': zone_class.to_text()}
- if xfr_result == XFRIN_OK:
- _do_auth_loadzone(self, zone_name, zone_class)
- msg = create_command(notify_out.ZONE_NEW_DATA_READY_CMD, param)
- # catch the exception, in case msgq has been killed.
- try:
- seq = self._send_cc_session.group_sendmsg(msg,
- XFROUT_MODULE_NAME)
- try:
- answer, env = self._send_cc_session.group_recvmsg(False,
- seq)
- except isc.cc.session.SessionTimeout:
- pass # for now we just ignore the failure
- seq = self._send_cc_session.group_sendmsg(msg, ZONE_MANAGER_MODULE_NAME)
- try:
- answer, env = self._send_cc_session.group_recvmsg(False,
- seq)
- except isc.cc.session.SessionTimeout:
- pass # for now we just ignore the failure
- except socket.error as err:
- logger.error(XFRIN_MSGQ_SEND_ERROR, XFROUT_MODULE_NAME, ZONE_MANAGER_MODULE_NAME)
- else:
- msg = create_command(ZONE_XFRIN_FAILED, param)
- # catch the exception, in case msgq has been killed.
- try:
- seq = self._send_cc_session.group_sendmsg(msg, ZONE_MANAGER_MODULE_NAME)
- try:
- answer, env = self._send_cc_session.group_recvmsg(False,
- seq)
- except isc.cc.session.SessionTimeout:
- pass # for now we just ignore the failure
- except socket.error as err:
- logger.error(XFRIN_MSGQ_SEND_ERROR_ZONE_MANAGER, ZONE_MANAGER_MODULE_NAME)
- def startup(self):
- logger.debug(DBG_PROCESS, XFRIN_STARTED)
- while not self._shutdown_event.is_set():
- self._cc_check_command()
- def xfrin_start(self, zone_name, rrclass, db_file, master_addrinfo,
- tsig_key, request_type, check_soa=True):
- if "pydnspp" not in sys.modules:
- return (1, "xfrin failed, can't load dns message python library: 'pydnspp'")
- # check max_transfer_in, else return quota error
- if self.recorder.count() >= self._max_transfers_in:
- return (1, 'xfrin quota error')
- if self.recorder.xfrin_in_progress(zone_name):
- return (1, 'zone xfrin is in progress')
- xfrin_thread = threading.Thread(target = process_xfrin,
- args = (self,
- self.recorder,
- zone_name,
- rrclass,
- db_file,
- self._shutdown_event,
- master_addrinfo, check_soa,
- tsig_key, request_type))
- xfrin_thread.start()
- return (0, 'zone xfrin is started')
- xfrind = None
- def signal_handler(signal, frame):
- if xfrind:
- xfrind.shutdown()
- sys.exit(0)
- def set_signal_handler():
- signal.signal(signal.SIGTERM, signal_handler)
- signal.signal(signal.SIGINT, signal_handler)
- def set_cmd_options(parser):
- parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
- help="This option is obsolete and has no effect.")
- def main(xfrin_class, use_signal=True):
- """The main loop of the Xfrin daemon.
- @param xfrin_class: A class of the Xfrin object. This is normally Xfrin,
- but can be a subclass of it for customization.
- @param use_signal: True if this process should catch signals. This is
- normally True, but may be disabled when this function is called in a
- testing context."""
- global xfrind
- try:
- parser = OptionParser(version = __version__)
- set_cmd_options(parser)
- (options, args) = parser.parse_args()
- if use_signal:
- set_signal_handler()
- xfrind = xfrin_class()
- xfrind.startup()
- except KeyboardInterrupt:
- logger.info(XFRIN_STOPPED_BY_KEYBOARD)
- except isc.cc.session.SessionError as e:
- logger.error(XFRIN_CC_SESSION_ERROR, str(e))
- except Exception as e:
- logger.error(XFRIN_UNKNOWN_ERROR, str(e))
- if xfrind:
- xfrind.shutdown()
- if __name__ == '__main__':
- main(Xfrin)
|