xfrout.py.in 53 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290
  1. #!@PYTHON@
  2. # Copyright (C) 2010-2012 Internet Systems Consortium.
  3. #
  4. # Permission to use, copy, modify, and distribute this software for any
  5. # purpose with or without fee is hereby granted, provided that the above
  6. # copyright notice and this permission notice appear in all copies.
  7. #
  8. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  9. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  10. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  11. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  12. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  13. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  14. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  15. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  16. import sys; sys.path.append ('@@PYTHONPATH@@')
  17. import isc
  18. import isc.cc
  19. import threading
  20. import struct
  21. import signal
  22. from isc.datasrc import DataSourceClient, ZoneFinder, ZoneJournalReader
  23. from socketserver import *
  24. import os
  25. from isc.config.ccsession import *
  26. from isc.cc import SessionError, SessionTimeout
  27. from isc.notify import notify_out
  28. import isc.util.process
  29. import socket
  30. import select
  31. import errno
  32. from optparse import OptionParser, OptionValueError
  33. from isc.util import socketserver_mixin
  34. import isc.server_common.tsig_keyring
  35. from isc.log_messages.xfrout_messages import *
  36. isc.log.init("b10-xfrout", buffer=True)
  37. logger = isc.log.Logger("xfrout")
  38. # Pending system-wide debug level definitions, the ones we
  39. # use here are hardcoded for now
  40. DBG_PROCESS = logger.DBGLVL_TRACE_BASIC
  41. DBG_COMMANDS = logger.DBGLVL_TRACE_DETAIL
  42. DBG_XFROUT_TRACE = logger.DBGLVL_TRACE_BASIC
  43. try:
  44. from libutil_io_python import *
  45. from pydnspp import *
  46. except ImportError as e:
  47. # C++ loadable module may not be installed; even so the xfrout process
  48. # must keep running, so we warn about it and move forward.
  49. logger.error(XFROUT_IMPORT, str(e))
  50. from isc.acl.acl import ACCEPT, REJECT, DROP, LoaderError
  51. from isc.acl.dns import REQUEST_LOADER
  52. isc.util.process.rename()
  53. class XfroutConfigError(Exception):
  54. """An exception indicating an error in updating xfrout configuration.
  55. This exception is raised when the xfrout process encouters an error in
  56. handling configuration updates. Not all syntax error can be caught
  57. at the module-CC layer, so xfrout needs to (explicitly or implicitly)
  58. validate the given configuration data itself. When it finds an error
  59. it raises this exception (either directly or by converting an exception
  60. from other modules) as a unified error in configuration.
  61. """
  62. pass
  63. class XfroutSessionError(Exception):
  64. '''An exception raised for some unexpected events during an xfrout session.
  65. '''
  66. pass
  67. def init_paths():
  68. global SPECFILE_PATH
  69. global AUTH_SPECFILE_PATH
  70. global UNIX_SOCKET_FILE
  71. if "B10_FROM_BUILD" in os.environ:
  72. SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/xfrout"
  73. AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth"
  74. if "B10_FROM_SOURCE_LOCALSTATEDIR" in os.environ:
  75. UNIX_SOCKET_FILE = os.environ["B10_FROM_SOURCE_LOCALSTATEDIR"] + \
  76. "/auth_xfrout_conn"
  77. else:
  78. UNIX_SOCKET_FILE = os.environ["B10_FROM_BUILD"] + "/auth_xfrout_conn"
  79. else:
  80. PREFIX = "@prefix@"
  81. DATAROOTDIR = "@datarootdir@"
  82. SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
  83. AUTH_SPECFILE_PATH = SPECFILE_PATH
  84. if "BIND10_XFROUT_SOCKET_FILE" in os.environ:
  85. UNIX_SOCKET_FILE = os.environ["BIND10_XFROUT_SOCKET_FILE"]
  86. else:
  87. UNIX_SOCKET_FILE = "@@LOCALSTATEDIR@@/@PACKAGE_NAME@/auth_xfrout_conn"
  88. init_paths()
  89. SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec"
  90. AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + os.sep + "auth.spec"
  91. VERBOSE_MODE = False
  92. XFROUT_DNS_HEADER_SIZE = 12 # protocol constant
  93. XFROUT_MAX_MESSAGE_SIZE = 65535 # ditto
  94. # borrowed from xfrin.py @ #1298. We should eventually unify it.
  95. def format_zone_str(zone_name, zone_class):
  96. """Helper function to format a zone name and class as a string of
  97. the form '<name>/<class>'.
  98. Parameters:
  99. zone_name (isc.dns.Name) name to format
  100. zone_class (isc.dns.RRClass) class to format
  101. """
  102. return zone_name.to_text(True) + '/' + str(zone_class)
  103. # borrowed from xfrin.py @ #1298.
  104. def format_addrinfo(addrinfo):
  105. """Helper function to format the addrinfo as a string of the form
  106. <addr>:<port> (for IPv4) or [<addr>]:port (for IPv6). For unix domain
  107. sockets, and unknown address families, it returns a basic string
  108. conversion of the third element of the passed tuple.
  109. Parameters:
  110. addrinfo: a 3-tuple consisting of address family, socket type, and,
  111. depending on the family, either a 2-tuple with the address
  112. and port, or a filename
  113. """
  114. try:
  115. if addrinfo[0] == socket.AF_INET:
  116. return str(addrinfo[2][0]) + ":" + str(addrinfo[2][1])
  117. elif addrinfo[0] == socket.AF_INET6:
  118. return "[" + str(addrinfo[2][0]) + "]:" + str(addrinfo[2][1])
  119. else:
  120. return str(addrinfo[2])
  121. except IndexError:
  122. raise TypeError("addrinfo argument to format_addrinfo() does not "
  123. "appear to be consisting of (family, socktype, (addr, port))")
  124. def get_rrset_len(rrset):
  125. """Returns the wire length of the given RRset"""
  126. bytes = bytearray()
  127. rrset.to_wire(bytes)
  128. return len(bytes)
  129. def get_soa_serial(soa_rdata):
  130. '''Extract the serial field of an SOA RDATA and returns it as an Serial object.
  131. '''
  132. return Serial(int(soa_rdata.to_text().split()[2]))
  133. class XfroutSession():
  134. def __init__(self, sock_fd, request_data, server, tsig_key_ring, remote,
  135. default_acl, zone_config, client_class=DataSourceClient,
  136. **counters):
  137. self._sock_fd = sock_fd
  138. self._request_data = request_data
  139. self._server = server
  140. self._tsig_key_ring = tsig_key_ring
  141. self._tsig_ctx = None
  142. self._tsig_len = 0
  143. self._remote = remote
  144. self._request_type = None
  145. self._request_typestr = None
  146. self._acl = default_acl
  147. self._zone_config = zone_config
  148. self.ClientClass = client_class # parameterize this for testing
  149. self._soa = None # will be set in _xfrout_setup or in tests
  150. self._jnl_reader = None # will be set to a reader for IXFR
  151. # Extract counter handler from the `counters` argument and add
  152. # it to the class attribute of the name whose prefix is
  153. # '_counter_' '_inc_' or '_dec_'
  154. for (k, v) in counters.items():
  155. if k.find('counter_') == 0 or k.find('inc_') == 0 \
  156. or k.find('dec_') == 0:
  157. setattr(self, "_%s" % k, v)
  158. self._handle()
  159. def create_tsig_ctx(self, tsig_record, tsig_key_ring):
  160. return TSIGContext(tsig_record.get_name(), tsig_record.get_rdata().get_algorithm(),
  161. tsig_key_ring)
  162. def _handle(self):
  163. ''' Handle a xfrout query, send xfrout response(s).
  164. This is separated from the constructor so that we can override
  165. it from tests.
  166. '''
  167. # Check the xfrout quota. We do both increase/decrease in this
  168. # method so it's clear we always release it once acuired.
  169. quota_ok = self._server.increase_transfers_counter()
  170. ex = None
  171. try:
  172. self.dns_xfrout_start(self._sock_fd, self._request_data, quota_ok)
  173. except Exception as e:
  174. # To avoid resource leak we need catch all possible exceptions
  175. # We log it later to exclude the case where even logger raises
  176. # an exception.
  177. ex = e
  178. # Release any critical resources
  179. if quota_ok:
  180. self._server.decrease_transfers_counter()
  181. self._close_socket()
  182. if ex is not None:
  183. logger.error(XFROUT_HANDLE_QUERY_ERROR, ex)
  184. def _close_socket(self):
  185. '''Simply close the socket via the given FD.
  186. This is a dedicated subroutine of handle() and is sepsarated from it
  187. for the convenience of tests.
  188. '''
  189. os.close(self._sock_fd)
  190. def _check_request_tsig(self, msg, request_data):
  191. ''' If request has a tsig record, perform tsig related checks '''
  192. tsig_record = msg.get_tsig_record()
  193. if tsig_record is not None:
  194. self._tsig_len = tsig_record.get_length()
  195. self._tsig_ctx = self.create_tsig_ctx(tsig_record,
  196. self._tsig_key_ring)
  197. tsig_error = self._tsig_ctx.verify(tsig_record, request_data)
  198. if tsig_error != TSIGError.NOERROR:
  199. return Rcode.NOTAUTH()
  200. return Rcode.NOERROR()
  201. def _parse_query_message(self, mdata):
  202. ''' parse query message to [socket,message]'''
  203. #TODO, need to add parseHeader() in case the message header is invalid
  204. try:
  205. msg = Message(Message.PARSE)
  206. Message.from_wire(msg, mdata)
  207. except Exception as err: # Exception is too broad
  208. logger.error(XFROUT_PARSE_QUERY_ERROR, err)
  209. return Rcode.FORMERR(), None
  210. # TSIG related checks
  211. rcode = self._check_request_tsig(msg, mdata)
  212. if rcode != Rcode.NOERROR():
  213. return rcode, msg
  214. # Make sure the question is valid. This should be ensured by
  215. # the auth server, but since it's far from xfrout itself, we check
  216. # it by ourselves. A viloation would be an internal bug, so we
  217. # raise and stop here rather than returning a FORMERR or SERVFAIL.
  218. if msg.get_rr_count(Message.SECTION_QUESTION) != 1:
  219. raise RuntimeError('Invalid number of question for XFR: ' +
  220. str(msg.get_rr_count(Message.SECTION_QUESTION)))
  221. question = msg.get_question()[0]
  222. # Identify the request type
  223. self._request_type = question.get_type()
  224. if self._request_type == RRType.AXFR():
  225. self._request_typestr = 'AXFR'
  226. elif self._request_type == RRType.IXFR():
  227. self._request_typestr = 'IXFR'
  228. else:
  229. # Likewise, this should be impossible.
  230. raise RuntimeError('Unexpected XFR type: ' +
  231. str(self._request_type))
  232. # ACL checks
  233. zone_name = question.get_name()
  234. zone_class = question.get_class()
  235. acl = self._get_transfer_acl(zone_name, zone_class)
  236. acl_result = acl.execute(
  237. isc.acl.dns.RequestContext(self._remote[2], msg.get_tsig_record()))
  238. if acl_result == DROP:
  239. logger.debug(DBG_XFROUT_TRACE, XFROUT_QUERY_DROPPED,
  240. self._request_type, format_addrinfo(self._remote),
  241. format_zone_str(zone_name, zone_class))
  242. return None, None
  243. elif acl_result == REJECT:
  244. # count rejected Xfr request by each zone name
  245. self._counter_xfrrej(zone_name.to_text())
  246. logger.debug(DBG_XFROUT_TRACE, XFROUT_QUERY_REJECTED,
  247. self._request_type, format_addrinfo(self._remote),
  248. format_zone_str(zone_name, zone_class))
  249. return Rcode.REFUSED(), msg
  250. return rcode, msg
  251. def _get_transfer_acl(self, zone_name, zone_class):
  252. '''Return the ACL that should be applied for a given zone.
  253. The zone is identified by a tuple of name and RR class.
  254. If a per zone configuration for the zone exists and contains
  255. transfer_acl, that ACL will be used; otherwise, the default
  256. ACL will be used.
  257. '''
  258. # Internally zone names are managed in lower cased label characters,
  259. # so we first need to convert the name.
  260. zone_name_lower = Name(zone_name.to_text(), True)
  261. config_key = (zone_class.to_text(), zone_name_lower.to_text())
  262. if config_key in self._zone_config and \
  263. 'transfer_acl' in self._zone_config[config_key]:
  264. return self._zone_config[config_key]['transfer_acl']
  265. return self._acl
  266. def _send_data(self, sock_fd, data):
  267. size = len(data)
  268. total_count = 0
  269. while total_count < size:
  270. count = os.write(sock_fd, data[total_count:])
  271. total_count += count
  272. def _send_message(self, sock_fd, msg, tsig_ctx=None):
  273. render = MessageRenderer()
  274. # As defined in RFC5936 section3.4, perform case-preserving name
  275. # compression for AXFR message.
  276. render.set_compress_mode(MessageRenderer.CASE_SENSITIVE)
  277. render.set_length_limit(XFROUT_MAX_MESSAGE_SIZE)
  278. # XXX Currently, python wrapper doesn't accept 'None' parameter in this case,
  279. # we should remove the if statement and use a universal interface later.
  280. if tsig_ctx is not None:
  281. msg.to_wire(render, tsig_ctx)
  282. else:
  283. msg.to_wire(render)
  284. header_len = struct.pack('H', socket.htons(render.get_length()))
  285. self._send_data(sock_fd, header_len)
  286. self._send_data(sock_fd, render.get_data())
  287. def _reply_query_with_error_rcode(self, msg, sock_fd, rcode_):
  288. if not msg:
  289. return # query message is invalid. send nothing back.
  290. msg.make_response()
  291. msg.set_rcode(rcode_)
  292. self._send_message(sock_fd, msg, self._tsig_ctx)
  293. def _get_zone_soa(self, zone_name):
  294. '''Retrieve the SOA RR of the given zone.
  295. It returns a pair of RCODE and the SOA (in the form of RRset).
  296. On success RCODE is NOERROR and returned SOA is not None;
  297. on failure RCODE indicates the appropriate code in the context of
  298. xfr processing, and the returned SOA is None.
  299. '''
  300. result, finder = self._datasrc_client.find_zone(zone_name)
  301. if result != DataSourceClient.SUCCESS:
  302. return (Rcode.NOTAUTH(), None)
  303. result, soa_rrset, _ = finder.find(zone_name, RRType.SOA())
  304. if result != ZoneFinder.SUCCESS:
  305. return (Rcode.SERVFAIL(), None)
  306. # Especially for database-based zones, a working zone may be in
  307. # a broken state where it has more than one SOA RR. We proactively
  308. # check the condition and abort the xfr attempt if we identify it.
  309. if soa_rrset.get_rdata_count() != 1:
  310. return (Rcode.SERVFAIL(), None)
  311. return (Rcode.NOERROR(), soa_rrset)
  312. def __axfr_setup(self, zone_name):
  313. '''Setup a zone iterator for AXFR or AXFR-style IXFR.
  314. '''
  315. try:
  316. # Note that we enable 'separate_rrs'. In xfr-out we need to
  317. # preserve as many things as possible (even if it's half broken)
  318. # stored in the zone.
  319. self._iterator = self._datasrc_client.get_iterator(zone_name,
  320. True)
  321. except isc.datasrc.Error:
  322. # If the current name server does not have authority for the
  323. # zone, xfrout can't serve for it, return rcode NOTAUTH.
  324. # Note: this exception can happen for other reasons. We should
  325. # update get_iterator() API so that we can distinguish "no such
  326. # zone" and other cases (#1373). For now we consider all these
  327. # cases as NOTAUTH.
  328. return Rcode.NOTAUTH()
  329. # If we are an authoritative name server for the zone, but fail
  330. # to find the zone's SOA record in datasource, xfrout can't
  331. # provide zone transfer for it.
  332. self._soa = self._iterator.get_soa()
  333. if self._soa is None or self._soa.get_rdata_count() != 1:
  334. return Rcode.SERVFAIL()
  335. return Rcode.NOERROR()
  336. def __ixfr_setup(self, request_msg, zone_name, zone_class):
  337. '''Setup a zone journal reader for IXFR.
  338. If the underlying data source does not know the requested range
  339. of zone differences it automatically falls back to AXFR-style
  340. IXFR by setting up a zone iterator instead of a journal reader.
  341. '''
  342. # Check the authority section. Look for a SOA record with
  343. # the same name and class as the question.
  344. remote_soa = None
  345. for auth_rrset in request_msg.get_section(Message.SECTION_AUTHORITY):
  346. # Ignore data whose owner name is not the zone apex, and
  347. # ignore non-SOA or different class of records.
  348. if auth_rrset.get_name() != zone_name or \
  349. auth_rrset.get_type() != RRType.SOA() or \
  350. auth_rrset.get_class() != zone_class:
  351. continue
  352. if auth_rrset.get_rdata_count() != 1:
  353. logger.info(XFROUT_IXFR_MULTIPLE_SOA,
  354. format_addrinfo(self._remote))
  355. return Rcode.FORMERR()
  356. remote_soa = auth_rrset
  357. if remote_soa is None:
  358. logger.info(XFROUT_IXFR_NO_SOA, format_addrinfo(self._remote))
  359. return Rcode.FORMERR()
  360. # Retrieve the local SOA
  361. rcode, self._soa = self._get_zone_soa(zone_name)
  362. if rcode != Rcode.NOERROR():
  363. return rcode
  364. # RFC1995 says "If an IXFR query with the same or newer version
  365. # number than that of the server is received, it is replied to with
  366. # a single SOA record of the server's current version, just as
  367. # in AXFR". The claim about AXFR is incorrect, but other than that,
  368. # we do as the RFC says.
  369. begin_serial = get_soa_serial(remote_soa.get_rdata()[0])
  370. end_serial = get_soa_serial(self._soa.get_rdata()[0])
  371. if begin_serial >= end_serial:
  372. # clear both iterator and jnl_reader to signal we won't do
  373. # iteration in response generation
  374. self._iterator = None
  375. self._jnl_reader = None
  376. logger.info(XFROUT_IXFR_UPTODATE, format_addrinfo(self._remote),
  377. format_zone_str(zone_name, zone_class),
  378. begin_serial, end_serial)
  379. return Rcode.NOERROR()
  380. # Set up the journal reader or fall back to AXFR-style IXFR
  381. try:
  382. code, self._jnl_reader = self._datasrc_client.get_journal_reader(
  383. zone_name, begin_serial.get_value(), end_serial.get_value())
  384. except isc.datasrc.NotImplemented as ex:
  385. # The underlying data source doesn't support journaling.
  386. # Fall back to AXFR-style IXFR.
  387. logger.info(XFROUT_IXFR_NO_JOURNAL_SUPPORT,
  388. format_addrinfo(self._remote),
  389. format_zone_str(zone_name, zone_class))
  390. return self.__axfr_setup(zone_name)
  391. if code == ZoneJournalReader.NO_SUCH_VERSION:
  392. logger.info(XFROUT_IXFR_NO_VERSION, format_addrinfo(self._remote),
  393. format_zone_str(zone_name, zone_class),
  394. begin_serial, end_serial)
  395. return self.__axfr_setup(zone_name)
  396. if code == ZoneJournalReader.NO_SUCH_ZONE:
  397. # this is quite unexpected as we know zone's SOA exists.
  398. # It might be a bug or the data source is somehow broken,
  399. # but it can still happen if someone has removed the zone
  400. # between these two operations. We treat it as NOTAUTH.
  401. logger.warn(XFROUT_IXFR_NO_ZONE, format_addrinfo(self._remote),
  402. format_zone_str(zone_name, zone_class))
  403. return Rcode.NOTAUTH()
  404. # Use the reader as the iterator to generate the response.
  405. self._iterator = self._jnl_reader
  406. return Rcode.NOERROR()
  407. def _xfrout_setup(self, request_msg, zone_name, zone_class):
  408. '''Setup a context for xfr responses according to the request type.
  409. This method identifies the most appropriate data source for the
  410. request and set up a zone iterator or journal reader depending on
  411. whether the request is AXFR or IXFR. If it identifies any protocol
  412. level error it returns an RCODE other than NOERROR.
  413. '''
  414. # Identify the data source for the requested zone and see if it has
  415. # SOA while initializing objects used for request processing later.
  416. # We should eventually generalize this so that we can choose the
  417. # appropriate data source from (possible) multiple candidates.
  418. # We should eventually take into account the RR class here.
  419. # For now, we hardcode a particular type (SQLite3-based), and only
  420. # consider that one.
  421. datasrc_config = '{ "database_file": "' + \
  422. self._server.get_db_file() + '"}'
  423. self._datasrc_client = self.ClientClass('sqlite3', datasrc_config)
  424. if self._request_type == RRType.AXFR():
  425. return self.__axfr_setup(zone_name)
  426. else:
  427. return self.__ixfr_setup(request_msg, zone_name, zone_class)
  428. def dns_xfrout_start(self, sock_fd, msg_query, quota_ok=True):
  429. rcode_, msg = self._parse_query_message(msg_query)
  430. #TODO. create query message and parse header
  431. if rcode_ is None: # Dropped by ACL
  432. return
  433. elif rcode_ == Rcode.NOTAUTH() or rcode_ == Rcode.REFUSED():
  434. return self._reply_query_with_error_rcode(msg, sock_fd, rcode_)
  435. elif rcode_ != Rcode.NOERROR():
  436. return self._reply_query_with_error_rcode(msg, sock_fd,
  437. Rcode.FORMERR())
  438. elif not quota_ok:
  439. logger.warn(XFROUT_QUERY_QUOTA_EXCCEEDED, self._request_typestr,
  440. format_addrinfo(self._remote),
  441. self._server._max_transfers_out)
  442. return self._reply_query_with_error_rcode(msg, sock_fd,
  443. Rcode.REFUSED())
  444. question = msg.get_question()[0]
  445. zone_name = question.get_name()
  446. zone_class = question.get_class()
  447. zone_str = format_zone_str(zone_name, zone_class) # for logging
  448. try:
  449. rcode_ = self._xfrout_setup(msg, zone_name, zone_class)
  450. except Exception as ex:
  451. logger.error(XFROUT_XFR_TRANSFER_CHECK_ERROR, self._request_typestr,
  452. format_addrinfo(self._remote), zone_str, ex)
  453. rcode_ = Rcode.SERVFAIL()
  454. if rcode_ != Rcode.NOERROR():
  455. logger.info(XFROUT_XFR_TRANSFER_FAILED, self._request_typestr,
  456. format_addrinfo(self._remote), zone_str, rcode_)
  457. return self._reply_query_with_error_rcode(msg, sock_fd, rcode_)
  458. try:
  459. # increment Xfr starts by RRType
  460. if self._request_type == RRType.AXFR():
  461. self._inc_axfr_running()
  462. else:
  463. self._inc_ixfr_running()
  464. logger.info(XFROUT_XFR_TRANSFER_STARTED, self._request_typestr,
  465. format_addrinfo(self._remote), zone_str)
  466. self._reply_xfrout_query(msg, sock_fd)
  467. except Exception as err:
  468. logger.error(XFROUT_XFR_TRANSFER_ERROR, self._request_typestr,
  469. format_addrinfo(self._remote), zone_str, err)
  470. finally:
  471. # decrement Xfr starts by RRType
  472. if self._request_type == RRType.AXFR():
  473. self._dec_axfr_running()
  474. else:
  475. self._dec_ixfr_running()
  476. # count done Xfr requests by each zone name
  477. self._counter_xfrreqdone(zone_name.to_text())
  478. logger.info(XFROUT_XFR_TRANSFER_DONE, self._request_typestr,
  479. format_addrinfo(self._remote), zone_str)
  480. def _clear_message(self, msg):
  481. qid = msg.get_qid()
  482. opcode = msg.get_opcode()
  483. rcode = msg.get_rcode()
  484. msg.clear(Message.RENDER)
  485. msg.set_qid(qid)
  486. msg.set_opcode(opcode)
  487. msg.set_rcode(rcode)
  488. msg.set_header_flag(Message.HEADERFLAG_AA)
  489. msg.set_header_flag(Message.HEADERFLAG_QR)
  490. return msg
  491. def _send_message_with_last_soa(self, msg, sock_fd, rrset_soa,
  492. message_upper_len):
  493. '''Add the SOA record to the end of message.
  494. If it would exceed the maximum allowable size of a message, a new
  495. message will be created to send out the last SOA.
  496. We assume a message with a single SOA can always fit the buffer
  497. with or without TSIG. In theory this could be wrong if TSIG is
  498. stupidly large, but in practice this assumption should be reasonable.
  499. '''
  500. if message_upper_len + get_rrset_len(rrset_soa) > \
  501. XFROUT_MAX_MESSAGE_SIZE:
  502. self._send_message(sock_fd, msg, self._tsig_ctx)
  503. msg = self._clear_message(msg)
  504. msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
  505. self._send_message(sock_fd, msg, self._tsig_ctx)
  506. def _reply_xfrout_query(self, msg, sock_fd):
  507. msg.make_response()
  508. msg.set_header_flag(Message.HEADERFLAG_AA)
  509. # Reserved space for the fixed header size, the size of the question
  510. # section, and TSIG size (when included). The size of the question
  511. # section is the sum of the qname length and the size of the
  512. # fixed-length fields (type and class, 2 bytes each).
  513. message_upper_len = XFROUT_DNS_HEADER_SIZE + \
  514. msg.get_question()[0].get_name().get_length() + 4 + \
  515. self._tsig_len
  516. # If the iterator is None, we are responding to IXFR with a single
  517. # SOA RR.
  518. if self._iterator is None:
  519. self._send_message_with_last_soa(msg, sock_fd, self._soa,
  520. message_upper_len)
  521. return
  522. # Add the beginning SOA
  523. msg.add_rrset(Message.SECTION_ANSWER, self._soa)
  524. message_upper_len += get_rrset_len(self._soa)
  525. # Add the rest of the zone/diff contets
  526. for rrset in self._iterator:
  527. # Check if xfrout is shutdown
  528. if self._server._shutdown_event.is_set():
  529. logger.info(XFROUT_STOPPING)
  530. return
  531. # For AXFR (or AXFR-style IXFR), in which case _jnl_reader is None,
  532. # we should skip SOAs from the iterator.
  533. if self._jnl_reader is None and rrset.get_type() == RRType.SOA():
  534. continue
  535. # We calculate the maximum size of the RRset (i.e. the
  536. # size without compression) and use that to see if we
  537. # may have reached the limit
  538. rrset_len = get_rrset_len(rrset)
  539. if message_upper_len + rrset_len <= XFROUT_MAX_MESSAGE_SIZE:
  540. msg.add_rrset(Message.SECTION_ANSWER, rrset)
  541. message_upper_len += rrset_len
  542. continue
  543. # RR would not fit. If there are other RRs in the buffer, send
  544. # them now and leave this RR to the next message.
  545. self._send_message(sock_fd, msg, self._tsig_ctx)
  546. # Create a new message and reserve space for the carried-over
  547. # RR (and TSIG space in case it's to be TSIG signed)
  548. msg = self._clear_message(msg)
  549. message_upper_len = XFROUT_DNS_HEADER_SIZE + rrset_len + \
  550. self._tsig_len
  551. # If this RR overflows the buffer all by itself, fail. In theory
  552. # some RRs might fit in a TCP message when compressed even if they
  553. # do not fit when uncompressed, but surely we don't want to send
  554. # such monstrosities to an unsuspecting slave.
  555. if message_upper_len > XFROUT_MAX_MESSAGE_SIZE:
  556. raise XfroutSessionError('RR too large for zone transfer (' +
  557. str(rrset_len) + ' bytes)')
  558. # Add the RRset to the new message
  559. msg.add_rrset(Message.SECTION_ANSWER, rrset)
  560. # Add and send the trailing SOA
  561. self._send_message_with_last_soa(msg, sock_fd, self._soa,
  562. message_upper_len)
  563. class UnixSockServer(socketserver_mixin.NoPollMixIn,
  564. ThreadingUnixStreamServer):
  565. '''The unix domain socket server which accept xfr query sent from auth server.'''
  566. def __init__(self, sock_file, handle_class, shutdown_event, config_data,
  567. cc, **counters):
  568. self._remove_unused_sock_file(sock_file)
  569. self._sock_file = sock_file
  570. socketserver_mixin.NoPollMixIn.__init__(self)
  571. ThreadingUnixStreamServer.__init__(self, sock_file, handle_class)
  572. self._shutdown_event = shutdown_event
  573. self._write_sock, self._read_sock = socket.socketpair()
  574. self._common_init()
  575. self._cc = cc
  576. self.update_config_data(config_data)
  577. # handlers for statistics use
  578. self._counters = counters
  579. def _common_init(self):
  580. '''Initialization shared with the mock server class used for tests'''
  581. self._lock = threading.Lock()
  582. self._transfers_counter = 0
  583. self._zone_config = {}
  584. self._acl = None # this will be initialized in update_config_data()
  585. def _receive_query_message(self, sock):
  586. ''' receive request message from sock'''
  587. # receive data length
  588. data_len = sock.recv(2)
  589. if not data_len:
  590. return None
  591. msg_len = struct.unpack('!H', data_len)[0]
  592. # receive data
  593. recv_size = 0
  594. msgdata = b''
  595. while recv_size < msg_len:
  596. data = sock.recv(msg_len - recv_size)
  597. if not data:
  598. return None
  599. recv_size += len(data)
  600. msgdata += data
  601. return msgdata
  602. def handle_request(self):
  603. ''' Enable server handle a request until shutdown or auth is closed.'''
  604. try:
  605. request, client_address = self.get_request()
  606. except socket.error:
  607. logger.error(XFROUT_FETCH_REQUEST_ERROR)
  608. return
  609. self._select_loop(request)
  610. def _select_loop(self, request_sock):
  611. '''Main loop for a single session between xfrout and auth.
  612. This is a dedicated subroutine of handle_request(), but is defined
  613. as a separate "protected" method for the convenience of tests.
  614. '''
  615. # Check self._shutdown_event to ensure the real shutdown comes.
  616. # Linux could trigger a spurious readable event on the _read_sock
  617. # due to a bug, so we need perform a double check.
  618. while not self._shutdown_event.is_set(): # Check if xfrout is shutdown
  619. try:
  620. (rlist, wlist, xlist) = select.select([self._read_sock,
  621. request_sock], [], [])
  622. except select.error as e:
  623. if e.args[0] == errno.EINTR:
  624. (rlist, wlist, xlist) = ([], [], [])
  625. continue
  626. else:
  627. logger.error(XFROUT_SOCKET_SELECT_ERROR, e)
  628. break
  629. # self.server._shutdown_event will be set by now, if it is not a
  630. # false alarm
  631. if self._read_sock in rlist:
  632. continue
  633. try:
  634. if not self.process_request(request_sock):
  635. break
  636. except Exception as pre:
  637. logger.error(XFROUT_PROCESS_REQUEST_ERROR, pre)
  638. break
  639. def _handle_request_noblock(self):
  640. """Override the function _handle_request_noblock(), it creates a new
  641. thread to handle requests for each auth"""
  642. td = threading.Thread(target=self.handle_request)
  643. td.setDaemon(True)
  644. td.start()
  645. def process_request(self, request):
  646. """Receive socket fd and query message from auth, then
  647. start a new thread to process the request.
  648. Return: True if everything is okay; otherwise False, in which case
  649. the calling thread will terminate.
  650. """
  651. sock_fd = recv_fd(request.fileno())
  652. if sock_fd < 0:
  653. logger.warn(XFROUT_RECEIVE_FILE_DESCRIPTOR_ERROR)
  654. return False
  655. # receive request msg. If it fails we simply terminate the thread;
  656. # it might be possible to recover from this state, but it's more likely
  657. # that auth and xfrout are in inconsistent states. So it will make
  658. # more sense to restart in a new session.
  659. request_data = self._receive_query_message(request)
  660. if request_data is None:
  661. # The specific exception type doesn't matter so we use session
  662. # error.
  663. raise XfroutSessionError('Failed to get complete xfr request')
  664. t = threading.Thread(target=self.finish_request,
  665. args=(sock_fd, request_data))
  666. if self.daemon_threads:
  667. t.daemon = True
  668. t.start()
  669. return True
  670. def _guess_remote(self, sock_fd):
  671. """Guess remote address and port of the socket.
  672. The sock_fd must be a file descriptor of a socket.
  673. This method retuns a 3-tuple consisting of address family,
  674. socket type, and a 2-tuple with the address (string) and port (int).
  675. """
  676. # This uses a trick. If the socket is IPv4 in reality and we pretend
  677. # it to be IPv6, it returns IPv4 address anyway. This doesn't seem
  678. # to care about the SOCK_STREAM parameter at all (which it really is,
  679. # except for testing)
  680. if socket.has_ipv6:
  681. sock_domain = socket.AF_INET6
  682. else:
  683. # To make it work even on hosts without IPv6 support
  684. # (Any idea how to simulate this in test?)
  685. sock_domain = socket.AF_INET
  686. sock = socket.fromfd(sock_fd, sock_domain, socket.SOCK_STREAM)
  687. peer = sock.getpeername()
  688. sock.close()
  689. # Identify the correct socket family. Due to the above "trick",
  690. # we cannot simply use sock.family.
  691. family = socket.AF_INET6
  692. try:
  693. socket.inet_pton(socket.AF_INET6, peer[0])
  694. except socket.error:
  695. family = socket.AF_INET
  696. return (family, socket.SOCK_STREAM, peer)
  697. def finish_request(self, sock_fd, request_data):
  698. '''Finish one request by instantiating RequestHandlerClass.
  699. This is an entry point of a separate thread spawned in
  700. UnixSockServer.process_request().
  701. This method creates a XfroutSession object.
  702. '''
  703. self._lock.acquire()
  704. acl = self._acl
  705. zone_config = self._zone_config
  706. self._lock.release()
  707. self.RequestHandlerClass(sock_fd, request_data, self,
  708. isc.server_common.tsig_keyring.get_keyring(),
  709. self._guess_remote(sock_fd), acl, zone_config,
  710. **self._counters)
  711. def _remove_unused_sock_file(self, sock_file):
  712. '''Try to remove the socket file. If the file is being used
  713. by one running xfrout process, exit from python.
  714. If it's not a socket file or nobody is listening
  715. , it will be removed. If it can't be removed, exit from python. '''
  716. if self._sock_file_in_use(sock_file):
  717. logger.error(XFROUT_UNIX_SOCKET_FILE_IN_USE, sock_file)
  718. sys.exit(0)
  719. else:
  720. if not os.path.exists(sock_file):
  721. return
  722. try:
  723. os.unlink(sock_file)
  724. except OSError as err:
  725. logger.error(XFROUT_REMOVE_OLD_UNIX_SOCKET_FILE_ERROR, sock_file, str(err))
  726. sys.exit(0)
  727. def _sock_file_in_use(self, sock_file):
  728. '''Check whether the socket file 'sock_file' exists and
  729. is being used by one running xfrout process. If it is,
  730. return True, or else return False. '''
  731. try:
  732. sock = socket.socket(socket.AF_UNIX)
  733. sock.connect(sock_file)
  734. except socket.error as err:
  735. sock.close()
  736. return False
  737. else:
  738. sock.close()
  739. return True
  740. def shutdown(self):
  741. self._write_sock.send(b"shutdown") #terminate the xfrout session thread
  742. super().shutdown() # call the shutdown() of class socketserver_mixin.NoPollMixIn
  743. try:
  744. os.unlink(self._sock_file)
  745. except Exception as e:
  746. logger.error(XFROUT_REMOVE_UNIX_SOCKET_FILE_ERROR, self._sock_file, str(e))
  747. def update_config_data(self, new_config):
  748. '''Apply the new config setting of xfrout module.
  749. '''
  750. self._lock.acquire()
  751. try:
  752. logger.info(XFROUT_NEW_CONFIG)
  753. new_acl = self._acl
  754. if 'transfer_acl' in new_config:
  755. try:
  756. new_acl = REQUEST_LOADER.load(new_config['transfer_acl'])
  757. except LoaderError as e:
  758. raise XfroutConfigError('Failed to parse transfer_acl: ' +
  759. str(e))
  760. new_zone_config = self._zone_config
  761. zconfig_data = new_config.get('zone_config')
  762. if zconfig_data is not None:
  763. new_zone_config = self.__create_zone_config(zconfig_data)
  764. self._acl = new_acl
  765. self._zone_config = new_zone_config
  766. self._max_transfers_out = new_config.get('transfers_out')
  767. except Exception as e:
  768. self._lock.release()
  769. raise e
  770. self._lock.release()
  771. logger.info(XFROUT_NEW_CONFIG_DONE)
  772. def __create_zone_config(self, zone_config_list):
  773. new_config = {}
  774. for zconf in zone_config_list:
  775. # convert the class, origin (name) pair. First build pydnspp
  776. # object to reject invalid input.
  777. zclass_str = zconf.get('class')
  778. if zclass_str is None:
  779. #zclass_str = 'IN' # temporary
  780. zclass_str = self._cc.get_default_value('zone_config/class')
  781. zclass = RRClass(zclass_str)
  782. zorigin = Name(zconf['origin'], True)
  783. config_key = (zclass.to_text(), zorigin.to_text())
  784. # reject duplicate config
  785. if config_key in new_config:
  786. raise XfroutConfigError('Duplicate zone_config for ' +
  787. str(zorigin) + '/' + str(zclass))
  788. # create a new config entry, build any given (and known) config
  789. new_config[config_key] = {}
  790. if 'transfer_acl' in zconf:
  791. try:
  792. new_config[config_key]['transfer_acl'] = \
  793. REQUEST_LOADER.load(zconf['transfer_acl'])
  794. except LoaderError as e:
  795. raise XfroutConfigError('Failed to parse transfer_acl ' +
  796. 'for ' + zorigin.to_text() + '/' +
  797. zclass_str + ': ' + str(e))
  798. return new_config
  799. def get_db_file(self):
  800. file, is_default = self._cc.get_remote_config_value("Auth", "database_file")
  801. # this too should be unnecessary, but currently the
  802. # 'from build' override isn't stored in the config
  803. # (and we don't have indirect python access to datasources yet)
  804. if is_default and "B10_FROM_BUILD" in os.environ:
  805. file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3"
  806. return file
  807. def increase_transfers_counter(self):
  808. '''Return False, if counter + 1 > max_transfers_out, or else
  809. return True
  810. '''
  811. ret = False
  812. self._lock.acquire()
  813. if self._transfers_counter < self._max_transfers_out:
  814. self._transfers_counter += 1
  815. ret = True
  816. self._lock.release()
  817. return ret
  818. def decrease_transfers_counter(self):
  819. self._lock.acquire()
  820. self._transfers_counter -= 1
  821. self._lock.release()
  822. class XfroutCounter:
  823. """A class for handling all statistics counters of Xfrout. In
  824. this class, the structure of per-zone counters is assumed to be
  825. like this:
  826. zones/example.com./notifyoutv4
  827. zones/example.com./notifyoutv6
  828. zones/example.com./xfrrej
  829. zones/example.com./xfrreqdone
  830. ixfr_running
  831. axfr_running
  832. """
  833. # '_SERVER_' is a special zone name representing an entire
  834. # count. It doesn't mean a specific zone, but it means an
  835. # entire count in the server.
  836. entire_server = '_SERVER_'
  837. # zone names are contained under this dirname in the spec file.
  838. perzone_prefix = 'zones'
  839. def __init__(self, statistics_spec):
  840. self._statistics_spec = statistics_spec
  841. # holding statistics data for Xfrout module
  842. self._statistics_data = {}
  843. self._counters_for_xfroutsession = {}
  844. self._counters_for_notifyout = {}
  845. self._xfrrunning_names = [
  846. n for n in isc.config.spec_name_list\
  847. (self._statistics_spec) \
  848. if n.find('xfr_running') == 1 ]
  849. self._lock = threading.RLock()
  850. self._create_perzone_incrementers()
  851. self._create_xfrrunning_incdecrementers()
  852. def get_statistics(self):
  853. """Calculates an entire server counts, and returns statistics
  854. data format to send out the stats module including each
  855. counter. If there is no counts, then it returns an empty
  856. dictionary. Locks the thread because it is considered to be
  857. invoked by a multi-threading caller."""
  858. # If self._statistics_data contains nothing of zone name, it
  859. # returns an empty dict.
  860. if len(self._statistics_data) == 0: return {}
  861. # for per-zone counter
  862. zones = {}
  863. zones = self._statistics_data[self.perzone_prefix]
  864. # Start calculation for '_SERVER_' counts
  865. attrs = self._get_default_statistics_data()[self.perzone_prefix][self.entire_server]
  866. statistics_data = {self.perzone_prefix: {}}
  867. for attr in attrs:
  868. sum_ = 0
  869. for name in zones:
  870. if name == self.entire_server: continue
  871. if attr in zones[name]:
  872. if name not in statistics_data[self.perzone_prefix]:
  873. statistics_data[self.perzone_prefix][name] = {}
  874. statistics_data[self.perzone_prefix][name].update(
  875. {attr: zones[name][attr]}
  876. )
  877. sum_ += zones[name][attr]
  878. if sum_ > 0:
  879. if self.entire_server not in statistics_data[self.perzone_prefix]:
  880. statistics_data[self.perzone_prefix][self.entire_server] = {}
  881. statistics_data[self.perzone_prefix][self.entire_server]\
  882. .update({attr:sum_})
  883. # for xfrrunning incrementer/decrementer
  884. for name in self._xfrrunning_names:
  885. if name in self._statistics_data:
  886. statistics_data[name] = self._statistics_data[name]
  887. return statistics_data
  888. def _get_default_statistics_data(self):
  889. """Returns default statistics data from the spec file"""
  890. statistics_data = {}
  891. for id_ in isc.config.spec_name_list(self._statistics_spec):
  892. spec = isc.config.find_spec_part(self._statistics_spec, id_)
  893. statistics_data.update({id_: spec['item_default']})
  894. return statistics_data
  895. def _create_perzone_incrementers(self):
  896. """Creates increment method of each per-zone counter based on
  897. the spec file. Incrementer can be accessed by name
  898. "inc_${item_name}".Incrementers are passed to the
  899. XfroutSession and NotifyOut class as counter handlers."""
  900. # add a new element under the named_set item for the zone
  901. zones_spec = isc.config.find_spec_part(
  902. self._statistics_spec, self.perzone_prefix)
  903. item_list = isc.config.spec_name_list(\
  904. zones_spec['named_set_item_spec']['map_item_spec'])
  905. # can be accessed by the name 'inc_xxx'
  906. for item in item_list:
  907. def __perzone_incrementer(zone_name, counter_name=item, step=1):
  908. """A per-zone incrementer for counter_name. Locks the thread
  909. because it is considered to be invoked by a multi-threading
  910. caller."""
  911. with self._lock:
  912. self._add_perzone_counter(zone_name)
  913. self._statistics_data[self.perzone_prefix][zone_name][counter_name] += step
  914. if 'notifyout' in item:
  915. self._counters_for_notifyout['counter_%s' % item] \
  916. = __perzone_incrementer
  917. else:
  918. self._counters_for_xfroutsession['counter_%s' % item] \
  919. = __perzone_incrementer
  920. def _create_xfrrunning_incdecrementers(self):
  921. """Creates increment/decrement method of (a|i)xfr_running
  922. based on the spec file. Incrementer can be accessed by name
  923. "inc_${item_name}". Decrementer can be accessed by name
  924. "dec_${item_name}". Both of them are passed to the
  925. XfroutSession as counter handlers."""
  926. # can be accessed by the name 'inc_xxx' or 'dec_xxx'
  927. for item in self._xfrrunning_names:
  928. def __xfrrunning_incrementer(counter_name=item, step=1):
  929. """A incrementer for axfr or ixfr running. Locks the thread
  930. because it is considered to be invoked by a multi-threading
  931. caller."""
  932. with self._lock:
  933. self._add_xfrrunning_counter(counter_name)
  934. self._statistics_data[counter_name] += step
  935. def __xfrrunning_decrementer(counter_name=item, step=-1):
  936. """A decrementer for axfr or ixfr running. Locks the thread
  937. because it is considered to be invoked by a multi-threading
  938. caller."""
  939. with self._lock:
  940. self._statistics_data[counter_name] += step
  941. self._counters_for_xfroutsession['inc_%s' % item] \
  942. = __xfrrunning_incrementer
  943. self._counters_for_xfroutsession['dec_%s' % item] \
  944. = __xfrrunning_decrementer
  945. def get_counters_for_xfroutsession(self):
  946. """Returns counters, incrementers, and decrementers to be
  947. passed to XfroutSession/UnixSockServer class."""
  948. return self._counters_for_xfroutsession
  949. def get_counters_for_notifyout(self):
  950. """Returns counters handlers to be passed to NotifyOut
  951. class."""
  952. return self._counters_for_notifyout
  953. def _add_perzone_counter(self, zone):
  954. """Adds a named_set-type counter for each zone name."""
  955. try:
  956. self._statistics_data[self.perzone_prefix][zone]
  957. except KeyError:
  958. # add a new element under the named_set item for the zone
  959. map_spec = isc.config.find_spec_part(
  960. self._statistics_spec, '%s/%s' % \
  961. (self.perzone_prefix, zone))['map_item_spec']
  962. id_list = isc.config.spec_name_list(map_spec)
  963. for id_ in id_list:
  964. spec = isc.config.find_spec_part(map_spec, id_)
  965. isc.cc.data.set(self._statistics_data,
  966. '%s/%s/%s' % \
  967. (self.perzone_prefix, zone, id_),
  968. spec['item_default'])
  969. def _add_xfrrunning_counter(self, counter_name):
  970. """Adds a counter for counting (a|i)xfr_running"""
  971. try:
  972. self._statistics_data[counter_name]
  973. except KeyError:
  974. # examines the names of xfer running
  975. for n in self._xfrrunning_names:
  976. spec = isc.config.find_spec_part(self._statistics_spec, n)
  977. isc.cc.data.set(self._statistics_data, n, \
  978. spec['item_default'])
  979. class XfroutServer:
  980. def __init__(self):
  981. self._unix_socket_server = None
  982. self._listen_sock_file = UNIX_SOCKET_FILE
  983. self._shutdown_event = threading.Event()
  984. self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
  985. self._config_data = self._cc.get_full_config()
  986. self._counter = XfroutCounter(
  987. self._cc.get_module_spec().get_statistics_spec())
  988. self._cc.start()
  989. self._cc.add_remote_config(AUTH_SPECFILE_LOCATION)
  990. isc.server_common.tsig_keyring.init_keyring(self._cc)
  991. self._start_xfr_query_listener()
  992. self._start_notifier()
  993. def _start_xfr_query_listener(self):
  994. '''Start a new thread to accept xfr query. '''
  995. self._unix_socket_server = UnixSockServer(
  996. self._listen_sock_file,
  997. XfroutSession,
  998. self._shutdown_event,
  999. self._config_data,
  1000. self._cc,
  1001. **self._counter.get_counters_for_xfroutsession()
  1002. )
  1003. listener = threading.Thread(target=self._unix_socket_server.serve_forever)
  1004. listener.start()
  1005. def _start_notifier(self):
  1006. datasrc = self._unix_socket_server.get_db_file()
  1007. self._notifier = notify_out.NotifyOut(
  1008. datasrc,
  1009. **self._counter.get_counters_for_notifyout()
  1010. )
  1011. if 'also_notify' in self._config_data:
  1012. for slave in self._config_data['also_notify']:
  1013. self._notifier.add_slave(slave['address'], slave['port'])
  1014. self._notifier.dispatcher()
  1015. def send_notify(self, zone_name, zone_class):
  1016. return self._notifier.send_notify(zone_name, zone_class)
  1017. def config_handler(self, new_config):
  1018. '''Update config data. TODO. Do error check'''
  1019. answer = create_answer(0)
  1020. for key in new_config:
  1021. if key not in self._config_data:
  1022. answer = create_answer(1, "Unknown config data: " + str(key))
  1023. continue
  1024. self._config_data[key] = new_config[key]
  1025. if self._unix_socket_server:
  1026. try:
  1027. self._unix_socket_server.update_config_data(self._config_data)
  1028. except Exception as e:
  1029. answer = create_answer(1,
  1030. "Failed to handle new configuration: " +
  1031. str(e))
  1032. return answer
  1033. def shutdown(self):
  1034. ''' shutdown the xfrout process. The thread which is doing zone transfer-out should be
  1035. terminated.
  1036. '''
  1037. global xfrout_server
  1038. xfrout_server = None #Avoid shutdown is called twice
  1039. self._cc.send_stopping()
  1040. self._shutdown_event.set()
  1041. self._notifier.shutdown()
  1042. if self._unix_socket_server:
  1043. self._unix_socket_server.shutdown()
  1044. self._wait_for_threads()
  1045. def _wait_for_threads(self):
  1046. # Wait for all threads to terminate. this is a call that is only used
  1047. # in shutdown(), but it has its own method, so we can test shutdown
  1048. # without involving thread operations (the test would override this
  1049. # method)
  1050. main_thread = threading.currentThread()
  1051. for th in threading.enumerate():
  1052. if th is main_thread:
  1053. continue
  1054. th.join()
  1055. def command_handler(self, cmd, args):
  1056. if cmd == "shutdown":
  1057. logger.info(XFROUT_RECEIVED_SHUTDOWN_COMMAND)
  1058. self.shutdown()
  1059. answer = create_answer(0)
  1060. elif cmd == "notify":
  1061. zone_name = args.get('zone_name')
  1062. zone_class = args.get('zone_class')
  1063. if not zone_class:
  1064. zone_class = str(RRClass.IN())
  1065. if zone_name:
  1066. logger.info(XFROUT_NOTIFY_COMMAND, zone_name, zone_class)
  1067. if self.send_notify(zone_name, zone_class):
  1068. answer = create_answer(0)
  1069. else:
  1070. zonestr = notify_out.format_zone_str(Name(zone_name),
  1071. zone_class)
  1072. answer = create_answer(1, "Unknown zone: " + zonestr)
  1073. else:
  1074. answer = create_answer(1, "Bad command parameter:" + str(args))
  1075. # return statistics data to the stats daemon
  1076. elif cmd == "getstats":
  1077. # The log level is here set to debug in order to avoid
  1078. # that a log becomes too verbose. Because the b10-stats
  1079. # daemon is periodically asking to the b10-xfrout daemon.
  1080. logger.debug(DBG_XFROUT_TRACE, \
  1081. XFROUT_RECEIVED_GETSTATS_COMMAND)
  1082. answer = create_answer(0, self._counter.get_statistics())
  1083. else:
  1084. answer = create_answer(1, "Unknown command:" + str(cmd))
  1085. return answer
  1086. def run(self):
  1087. '''Get and process all commands sent from cfgmgr or other modules. '''
  1088. logger.debug(DBG_PROCESS, XFROUT_STARTED)
  1089. while not self._shutdown_event.is_set():
  1090. self._cc.check_command(False)
  1091. xfrout_server = None
  1092. def signal_handler(signal, frame):
  1093. if xfrout_server:
  1094. xfrout_server.shutdown()
  1095. sys.exit(0)
  1096. def set_signal_handler():
  1097. signal.signal(signal.SIGTERM, signal_handler)
  1098. signal.signal(signal.SIGINT, signal_handler)
  1099. def set_cmd_options(parser):
  1100. parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
  1101. help="display more about what is going on")
  1102. if '__main__' == __name__:
  1103. try:
  1104. parser = OptionParser()
  1105. set_cmd_options(parser)
  1106. (options, args) = parser.parse_args()
  1107. VERBOSE_MODE = options.verbose
  1108. set_signal_handler()
  1109. xfrout_server = XfroutServer()
  1110. xfrout_server.run()
  1111. except KeyboardInterrupt:
  1112. logger.info(XFROUT_STOPPED_BY_KEYBOARD)
  1113. except SessionError as e:
  1114. logger.error(XFROUT_CC_SESSION_ERROR, str(e))
  1115. except ModuleCCSessionError as e:
  1116. logger.error(XFROUT_MODULECC_SESSION_ERROR, str(e))
  1117. except XfroutConfigError as e:
  1118. logger.error(XFROUT_CONFIG_ERROR, str(e))
  1119. except SessionTimeout as e:
  1120. logger.error(XFROUT_CC_SESSION_TIMEOUT_ERROR)
  1121. if xfrout_server:
  1122. xfrout_server.shutdown()
  1123. logger.info(XFROUT_EXITING)