xfrout.py.in 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824
  1. #!@PYTHON@
  2. # Copyright (C) 2010 Internet Systems Consortium.
  3. #
  4. # Permission to use, copy, modify, and distribute this software for any
  5. # purpose with or without fee is hereby granted, provided that the above
  6. # copyright notice and this permission notice appear in all copies.
  7. #
  8. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  9. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  10. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  11. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  12. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  13. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  14. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  15. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  16. import sys; sys.path.append ('@@PYTHONPATH@@')
  17. import isc
  18. import isc.cc
  19. import threading
  20. import struct
  21. import signal
  22. from isc.datasrc import sqlite3_ds
  23. from socketserver import *
  24. import os
  25. from isc.config.ccsession import *
  26. from isc.cc import SessionError, SessionTimeout
  27. from isc.notify import notify_out
  28. import isc.util.process
  29. import socket
  30. import select
  31. import errno
  32. from optparse import OptionParser, OptionValueError
  33. from isc.util import socketserver_mixin
  34. from isc.log_messages.xfrout_messages import *
  35. isc.log.init("b10-xfrout")
  36. logger = isc.log.Logger("xfrout")
  37. try:
  38. from libutil_io_python import *
  39. from pydnspp import *
  40. except ImportError as e:
  41. # C++ loadable module may not be installed; even so the xfrout process
  42. # must keep running, so we warn about it and move forward.
  43. log.error(XFROUT_IMPORT, str(e))
  44. from isc.acl.acl import ACCEPT, REJECT, DROP, LoaderError
  45. from isc.acl.dns import REQUEST_LOADER
  46. isc.util.process.rename()
  47. class XfroutConfigError(Exception):
  48. """An exception indicating an error in updating xfrout configuration.
  49. This exception is raised when the xfrout process encouters an error in
  50. handling configuration updates. Not all syntax error can be caught
  51. at the module-CC layer, so xfrout needs to (explicitly or implicitly)
  52. validate the given configuration data itself. When it finds an error
  53. it raises this exception (either directly or by converting an exception
  54. from other modules) as a unified error in configuration.
  55. """
  56. pass
  57. def init_paths():
  58. global SPECFILE_PATH
  59. global AUTH_SPECFILE_PATH
  60. global UNIX_SOCKET_FILE
  61. if "B10_FROM_BUILD" in os.environ:
  62. SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/xfrout"
  63. AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth"
  64. if "B10_FROM_SOURCE_LOCALSTATEDIR" in os.environ:
  65. UNIX_SOCKET_FILE = os.environ["B10_FROM_SOURCE_LOCALSTATEDIR"] + \
  66. "/auth_xfrout_conn"
  67. else:
  68. UNIX_SOCKET_FILE = os.environ["B10_FROM_BUILD"] + "/auth_xfrout_conn"
  69. else:
  70. PREFIX = "@prefix@"
  71. DATAROOTDIR = "@datarootdir@"
  72. SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
  73. AUTH_SPECFILE_PATH = SPECFILE_PATH
  74. if "BIND10_XFROUT_SOCKET_FILE" in os.environ:
  75. UNIX_SOCKET_FILE = os.environ["BIND10_XFROUT_SOCKET_FILE"]
  76. else:
  77. UNIX_SOCKET_FILE = "@@LOCALSTATEDIR@@/@PACKAGE_NAME@/auth_xfrout_conn"
  78. init_paths()
  79. SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec"
  80. AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + os.sep + "auth.spec"
  81. VERBOSE_MODE = False
  82. # tsig sign every N axfr packets.
  83. TSIG_SIGN_EVERY_NTH = 96
  84. XFROUT_MAX_MESSAGE_SIZE = 65535
  85. def get_rrset_len(rrset):
  86. """Returns the wire length of the given RRset"""
  87. bytes = bytearray()
  88. rrset.to_wire(bytes)
  89. return len(bytes)
  90. class XfroutSession():
  91. def __init__(self, sock_fd, request_data, server, tsig_key_ring, remote,
  92. default_acl, zone_config):
  93. self._sock_fd = sock_fd
  94. self._request_data = request_data
  95. self._server = server
  96. self._tsig_key_ring = tsig_key_ring
  97. self._tsig_ctx = None
  98. self._tsig_len = 0
  99. self._remote = remote
  100. self._acl = default_acl
  101. self._zone_config = zone_config
  102. self.handle()
  103. def create_tsig_ctx(self, tsig_record, tsig_key_ring):
  104. return TSIGContext(tsig_record.get_name(), tsig_record.get_rdata().get_algorithm(),
  105. tsig_key_ring)
  106. def handle(self):
  107. ''' Handle a xfrout query, send xfrout response '''
  108. try:
  109. self.dns_xfrout_start(self._sock_fd, self._request_data)
  110. #TODO, avoid catching all exceptions
  111. except Exception as e:
  112. logger.error(XFROUT_HANDLE_QUERY_ERROR, e)
  113. pass
  114. os.close(self._sock_fd)
  115. def _check_request_tsig(self, msg, request_data):
  116. ''' If request has a tsig record, perform tsig related checks '''
  117. tsig_record = msg.get_tsig_record()
  118. if tsig_record is not None:
  119. self._tsig_len = tsig_record.get_length()
  120. self._tsig_ctx = self.create_tsig_ctx(tsig_record, self._tsig_key_ring)
  121. tsig_error = self._tsig_ctx.verify(tsig_record, request_data)
  122. if tsig_error != TSIGError.NOERROR:
  123. return Rcode.NOTAUTH()
  124. return Rcode.NOERROR()
  125. def _parse_query_message(self, mdata):
  126. ''' parse query message to [socket,message]'''
  127. #TODO, need to add parseHeader() in case the message header is invalid
  128. try:
  129. msg = Message(Message.PARSE)
  130. Message.from_wire(msg, mdata)
  131. except Exception as err: # Exception is too broad
  132. logger.error(XFROUT_PARSE_QUERY_ERROR, err)
  133. return Rcode.FORMERR(), None
  134. # TSIG related checks
  135. rcode = self._check_request_tsig(msg, mdata)
  136. if rcode == Rcode.NOERROR():
  137. # ACL checks
  138. zone_name = msg.get_question()[0].get_name()
  139. zone_class = msg.get_question()[0].get_class()
  140. acl = self._get_transfer_acl(zone_name, zone_class)
  141. acl_result = acl.execute(
  142. isc.acl.dns.RequestContext(self._remote,
  143. msg.get_tsig_record()))
  144. if acl_result == DROP:
  145. logger.info(XFROUT_QUERY_DROPPED, zone_name, zone_class,
  146. self._remote[0], self._remote[1])
  147. return None, None
  148. elif acl_result == REJECT:
  149. logger.info(XFROUT_QUERY_REJECTED, zone_name, zone_class,
  150. self._remote[0], self._remote[1])
  151. return Rcode.REFUSED(), msg
  152. return rcode, msg
  153. def _get_transfer_acl(self, zone_name, zone_class):
  154. '''Return the ACL that should be applied for a given zone.
  155. The zone is identified by a tuple of name and RR class.
  156. If a per zone configuration for the zone exists and contains
  157. transfer_acl, that ACL will be used; otherwise, the default
  158. ACL will be used.
  159. '''
  160. # Internally zone names are managed in lower cased label characters,
  161. # so we first need to convert the name.
  162. zone_name_lower = Name(zone_name.to_text(), True)
  163. config_key = (zone_class.to_text(), zone_name_lower.to_text())
  164. if config_key in self._zone_config and \
  165. 'transfer_acl' in self._zone_config[config_key]:
  166. return self._zone_config[config_key]['transfer_acl']
  167. return self._acl
  168. def _get_query_zone_name(self, msg):
  169. question = msg.get_question()[0]
  170. return question.get_name().to_text()
  171. def _get_query_zone_class(self, msg):
  172. question = msg.get_question()[0]
  173. return question.get_class().to_text()
  174. def _send_data(self, sock_fd, data):
  175. size = len(data)
  176. total_count = 0
  177. while total_count < size:
  178. count = os.write(sock_fd, data[total_count:])
  179. total_count += count
  180. def _send_message(self, sock_fd, msg, tsig_ctx=None):
  181. render = MessageRenderer()
  182. # As defined in RFC5936 section3.4, perform case-preserving name
  183. # compression for AXFR message.
  184. render.set_compress_mode(MessageRenderer.CASE_SENSITIVE)
  185. render.set_length_limit(XFROUT_MAX_MESSAGE_SIZE)
  186. # XXX Currently, python wrapper doesn't accept 'None' parameter in this case,
  187. # we should remove the if statement and use a universal interface later.
  188. if tsig_ctx is not None:
  189. msg.to_wire(render, tsig_ctx)
  190. else:
  191. msg.to_wire(render)
  192. header_len = struct.pack('H', socket.htons(render.get_length()))
  193. self._send_data(sock_fd, header_len)
  194. self._send_data(sock_fd, render.get_data())
  195. def _reply_query_with_error_rcode(self, msg, sock_fd, rcode_):
  196. if not msg:
  197. return # query message is invalid. send nothing back.
  198. msg.make_response()
  199. msg.set_rcode(rcode_)
  200. self._send_message(sock_fd, msg, self._tsig_ctx)
  201. def _zone_has_soa(self, zone):
  202. '''Judge if the zone has an SOA record.'''
  203. # In some sense, the SOA defines a zone.
  204. # If the current name server has authority for the
  205. # specific zone, we need to judge if the zone has an SOA record;
  206. # if not, we consider the zone has incomplete data, so xfrout can't
  207. # serve for it.
  208. if sqlite3_ds.get_zone_soa(zone, self._server.get_db_file()):
  209. return True
  210. return False
  211. def _zone_exist(self, zonename):
  212. '''Judge if the zone is configured by config manager.'''
  213. # Currently, if we find the zone in datasource successfully, we
  214. # consider the zone is configured, and the current name server has
  215. # authority for the specific zone.
  216. # TODO: should get zone's configuration from cfgmgr or other place
  217. # in future.
  218. return sqlite3_ds.zone_exist(zonename, self._server.get_db_file())
  219. def _check_xfrout_available(self, zone_name):
  220. '''Check if xfr request can be responsed.
  221. TODO, Get zone's configuration from cfgmgr or some other place
  222. eg. check allow_transfer setting,
  223. '''
  224. # If the current name server does not have authority for the
  225. # zone, xfrout can't serve for it, return rcode NOTAUTH.
  226. if not self._zone_exist(zone_name):
  227. return Rcode.NOTAUTH()
  228. # If we are an authoritative name server for the zone, but fail
  229. # to find the zone's SOA record in datasource, xfrout can't
  230. # provide zone transfer for it.
  231. if not self._zone_has_soa(zone_name):
  232. return Rcode.SERVFAIL()
  233. #TODO, check allow_transfer
  234. if not self._server.increase_transfers_counter():
  235. return Rcode.REFUSED()
  236. return Rcode.NOERROR()
  237. def dns_xfrout_start(self, sock_fd, msg_query):
  238. rcode_, msg = self._parse_query_message(msg_query)
  239. #TODO. create query message and parse header
  240. if rcode_ is None: # Dropped by ACL
  241. return
  242. elif rcode_ == Rcode.NOTAUTH() or rcode_ == Rcode.REFUSED():
  243. return self._reply_query_with_error_rcode(msg, sock_fd, rcode_)
  244. elif rcode_ != Rcode.NOERROR():
  245. return self._reply_query_with_error_rcode(msg, sock_fd,
  246. Rcode.FORMERR())
  247. zone_name = self._get_query_zone_name(msg)
  248. zone_class_str = self._get_query_zone_class(msg)
  249. # TODO: should we not also include class in the check?
  250. rcode_ = self._check_xfrout_available(zone_name)
  251. if rcode_ != Rcode.NOERROR():
  252. logger.info(XFROUT_AXFR_TRANSFER_FAILED, zone_name,
  253. zone_class_str, rcode_.to_text())
  254. return self._reply_query_with_error_rcode(msg, sock_fd, rcode_)
  255. try:
  256. logger.info(XFROUT_AXFR_TRANSFER_STARTED, zone_name, zone_class_str)
  257. self._reply_xfrout_query(msg, sock_fd, zone_name)
  258. except Exception as err:
  259. logger.error(XFROUT_AXFR_TRANSFER_ERROR, zone_name,
  260. zone_class_str, str(err))
  261. pass
  262. logger.info(XFROUT_AXFR_TRANSFER_DONE, zone_name, zone_class_str)
  263. self._server.decrease_transfers_counter()
  264. return
  265. def _clear_message(self, msg):
  266. qid = msg.get_qid()
  267. opcode = msg.get_opcode()
  268. rcode = msg.get_rcode()
  269. msg.clear(Message.RENDER)
  270. msg.set_qid(qid)
  271. msg.set_opcode(opcode)
  272. msg.set_rcode(rcode)
  273. msg.set_header_flag(Message.HEADERFLAG_AA)
  274. msg.set_header_flag(Message.HEADERFLAG_QR)
  275. return msg
  276. def _create_rrset_from_db_record(self, record):
  277. '''Create one rrset from one record of datasource, if the schema of record is changed,
  278. This function should be updated first.
  279. '''
  280. rrtype_ = RRType(record[5])
  281. rdata_ = Rdata(rrtype_, RRClass("IN"), " ".join(record[7:]))
  282. rrset_ = RRset(Name(record[2]), RRClass("IN"), rrtype_, RRTTL( int(record[4])))
  283. rrset_.add_rdata(rdata_)
  284. return rrset_
  285. def _send_message_with_last_soa(self, msg, sock_fd, rrset_soa, message_upper_len,
  286. count_since_last_tsig_sign):
  287. '''Add the SOA record to the end of message. If it can't be
  288. added, a new message should be created to send out the last soa .
  289. '''
  290. rrset_len = get_rrset_len(rrset_soa)
  291. if (count_since_last_tsig_sign == TSIG_SIGN_EVERY_NTH and
  292. message_upper_len + rrset_len >= XFROUT_MAX_MESSAGE_SIZE):
  293. # If tsig context exist, sign the packet with serial number TSIG_SIGN_EVERY_NTH
  294. self._send_message(sock_fd, msg, self._tsig_ctx)
  295. msg = self._clear_message(msg)
  296. elif (count_since_last_tsig_sign != TSIG_SIGN_EVERY_NTH and
  297. message_upper_len + rrset_len + self._tsig_len >= XFROUT_MAX_MESSAGE_SIZE):
  298. self._send_message(sock_fd, msg)
  299. msg = self._clear_message(msg)
  300. # If tsig context exist, sign the last packet
  301. msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
  302. self._send_message(sock_fd, msg, self._tsig_ctx)
  303. def _reply_xfrout_query(self, msg, sock_fd, zone_name):
  304. #TODO, there should be a better way to insert rrset.
  305. count_since_last_tsig_sign = TSIG_SIGN_EVERY_NTH
  306. msg.make_response()
  307. msg.set_header_flag(Message.HEADERFLAG_AA)
  308. soa_record = sqlite3_ds.get_zone_soa(zone_name, self._server.get_db_file())
  309. rrset_soa = self._create_rrset_from_db_record(soa_record)
  310. msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
  311. message_upper_len = get_rrset_len(rrset_soa) + self._tsig_len
  312. for rr_data in sqlite3_ds.get_zone_datas(zone_name, self._server.get_db_file()):
  313. if self._server._shutdown_event.is_set(): # Check if xfrout is shutdown
  314. logger.info(XFROUT_STOPPING)
  315. return
  316. # TODO: RRType.SOA() ?
  317. if RRType(rr_data[5]) == RRType("SOA"): #ignore soa record
  318. continue
  319. rrset_ = self._create_rrset_from_db_record(rr_data)
  320. # We calculate the maximum size of the RRset (i.e. the
  321. # size without compression) and use that to see if we
  322. # may have reached the limit
  323. rrset_len = get_rrset_len(rrset_)
  324. if message_upper_len + rrset_len < XFROUT_MAX_MESSAGE_SIZE:
  325. msg.add_rrset(Message.SECTION_ANSWER, rrset_)
  326. message_upper_len += rrset_len
  327. continue
  328. # If tsig context exist, sign every N packets
  329. if count_since_last_tsig_sign == TSIG_SIGN_EVERY_NTH:
  330. count_since_last_tsig_sign = 0
  331. self._send_message(sock_fd, msg, self._tsig_ctx)
  332. else:
  333. self._send_message(sock_fd, msg)
  334. count_since_last_tsig_sign += 1
  335. msg = self._clear_message(msg)
  336. msg.add_rrset(Message.SECTION_ANSWER, rrset_) # Add the rrset to the new message
  337. # Reserve tsig space for signed packet
  338. if count_since_last_tsig_sign == TSIG_SIGN_EVERY_NTH:
  339. message_upper_len = rrset_len + self._tsig_len
  340. else:
  341. message_upper_len = rrset_len
  342. self._send_message_with_last_soa(msg, sock_fd, rrset_soa, message_upper_len,
  343. count_since_last_tsig_sign)
  344. class UnixSockServer(socketserver_mixin.NoPollMixIn,
  345. ThreadingUnixStreamServer):
  346. '''The unix domain socket server which accept xfr query sent from auth server.'''
  347. def __init__(self, sock_file, handle_class, shutdown_event, config_data,
  348. cc):
  349. self._remove_unused_sock_file(sock_file)
  350. self._sock_file = sock_file
  351. socketserver_mixin.NoPollMixIn.__init__(self)
  352. ThreadingUnixStreamServer.__init__(self, sock_file, handle_class)
  353. self._shutdown_event = shutdown_event
  354. self._write_sock, self._read_sock = socket.socketpair()
  355. self._common_init()
  356. self._cc = cc
  357. self.update_config_data(config_data)
  358. def _common_init(self):
  359. '''Initialization shared with the mock server class used for tests'''
  360. self._lock = threading.Lock()
  361. self._transfers_counter = 0
  362. self._zone_config = {}
  363. self._acl = None # this will be initialized in update_config_data()
  364. def _receive_query_message(self, sock):
  365. ''' receive request message from sock'''
  366. # receive data length
  367. data_len = sock.recv(2)
  368. if not data_len:
  369. return None
  370. msg_len = struct.unpack('!H', data_len)[0]
  371. # receive data
  372. recv_size = 0
  373. msgdata = b''
  374. while recv_size < msg_len:
  375. data = sock.recv(msg_len - recv_size)
  376. if not data:
  377. return None
  378. recv_size += len(data)
  379. msgdata += data
  380. return msgdata
  381. def handle_request(self):
  382. ''' Enable server handle a request until shutdown or auth is closed.'''
  383. try:
  384. request, client_address = self.get_request()
  385. except socket.error:
  386. logger.error(XFROUT_FETCH_REQUEST_ERROR)
  387. return
  388. # Check self._shutdown_event to ensure the real shutdown comes.
  389. # Linux could trigger a spurious readable event on the _read_sock
  390. # due to a bug, so we need perform a double check.
  391. while not self._shutdown_event.is_set(): # Check if xfrout is shutdown
  392. try:
  393. (rlist, wlist, xlist) = select.select([self._read_sock, request], [], [])
  394. except select.error as e:
  395. if e.args[0] == errno.EINTR:
  396. (rlist, wlist, xlist) = ([], [], [])
  397. continue
  398. else:
  399. logger.error(XFROUT_SOCKET_SELECT_ERROR, str(e))
  400. break
  401. # self.server._shutdown_event will be set by now, if it is not a false
  402. # alarm
  403. if self._read_sock in rlist:
  404. continue
  405. try:
  406. self.process_request(request)
  407. except Exception as pre:
  408. log.error(XFROUT_PROCESS_REQUEST_ERROR, str(pre))
  409. break
  410. def _handle_request_noblock(self):
  411. """Override the function _handle_request_noblock(), it creates a new
  412. thread to handle requests for each auth"""
  413. td = threading.Thread(target=self.handle_request)
  414. td.setDaemon(True)
  415. td.start()
  416. def process_request(self, request):
  417. """Receive socket fd and query message from auth, then
  418. start a new thread to process the request."""
  419. sock_fd = recv_fd(request.fileno())
  420. if sock_fd < 0:
  421. # This may happen when one xfrout process try to connect to
  422. # xfrout unix socket server, to check whether there is another
  423. # xfrout running.
  424. if sock_fd == FD_COMM_ERROR:
  425. logger.error(XFROUT_RECEIVE_FILE_DESCRIPTOR_ERROR)
  426. return
  427. # receive request msg
  428. request_data = self._receive_query_message(request)
  429. if not request_data:
  430. return
  431. t = threading.Thread(target=self.finish_request,
  432. args = (sock_fd, request_data))
  433. if self.daemon_threads:
  434. t.daemon = True
  435. t.start()
  436. def _guess_remote(self, sock_fd):
  437. """
  438. Guess remote address and port of the socket. The sock_fd must be a
  439. socket
  440. """
  441. # This uses a trick. If the socket is IPv4 in reality and we pretend
  442. # it to be IPv6, it returns IPv4 address anyway. This doesn't seem
  443. # to care about the SOCK_STREAM parameter at all (which it really is,
  444. # except for testing)
  445. if socket.has_ipv6:
  446. sock = socket.fromfd(sock_fd, socket.AF_INET6, socket.SOCK_STREAM)
  447. else:
  448. # To make it work even on hosts without IPv6 support
  449. # (Any idea how to simulate this in test?)
  450. sock = socket.fromfd(sock_fd, socket.AF_INET, socket.SOCK_STREAM)
  451. return sock.getpeername()
  452. def finish_request(self, sock_fd, request_data):
  453. '''Finish one request by instantiating RequestHandlerClass.
  454. This method creates a XfroutSession object.
  455. '''
  456. self._lock.acquire()
  457. acl = self._acl
  458. zone_config = self._zone_config
  459. self._lock.release()
  460. self.RequestHandlerClass(sock_fd, request_data, self,
  461. self.tsig_key_ring,
  462. self._guess_remote(sock_fd), acl, zone_config)
  463. def _remove_unused_sock_file(self, sock_file):
  464. '''Try to remove the socket file. If the file is being used
  465. by one running xfrout process, exit from python.
  466. If it's not a socket file or nobody is listening
  467. , it will be removed. If it can't be removed, exit from python. '''
  468. if self._sock_file_in_use(sock_file):
  469. logger.error(XFROUT_UNIX_SOCKET_FILE_IN_USE, sock_file)
  470. sys.exit(0)
  471. else:
  472. if not os.path.exists(sock_file):
  473. return
  474. try:
  475. os.unlink(sock_file)
  476. except OSError as err:
  477. logger.error(XFROUT_REMOVE_OLD_UNIX_SOCKET_FILE_ERROR, sock_file, str(err))
  478. sys.exit(0)
  479. def _sock_file_in_use(self, sock_file):
  480. '''Check whether the socket file 'sock_file' exists and
  481. is being used by one running xfrout process. If it is,
  482. return True, or else return False. '''
  483. try:
  484. sock = socket.socket(socket.AF_UNIX)
  485. sock.connect(sock_file)
  486. except socket.error as err:
  487. return False
  488. else:
  489. return True
  490. def shutdown(self):
  491. self._write_sock.send(b"shutdown") #terminate the xfrout session thread
  492. super().shutdown() # call the shutdown() of class socketserver_mixin.NoPollMixIn
  493. try:
  494. os.unlink(self._sock_file)
  495. except Exception as e:
  496. logger.error(XFROUT_REMOVE_UNIX_SOCKET_FILE_ERROR, self._sock_file, str(e))
  497. pass
  498. def update_config_data(self, new_config):
  499. '''Apply the new config setting of xfrout module.
  500. '''
  501. self._lock.acquire()
  502. try:
  503. logger.info(XFROUT_NEW_CONFIG)
  504. new_acl = self._acl
  505. if 'transfer_acl' in new_config:
  506. try:
  507. new_acl = REQUEST_LOADER.load(new_config['transfer_acl'])
  508. except LoaderError as e:
  509. raise XfroutConfigError('Failed to parse transfer_acl: ' +
  510. str(e))
  511. new_zone_config = self._zone_config
  512. zconfig_data = new_config.get('zone_config')
  513. if zconfig_data is not None:
  514. new_zone_config = self.__create_zone_config(zconfig_data)
  515. self._acl = new_acl
  516. self._zone_config = new_zone_config
  517. self._max_transfers_out = new_config.get('transfers_out')
  518. self.set_tsig_key_ring(new_config.get('tsig_key_ring'))
  519. except Exception as e:
  520. self._lock.release()
  521. raise e
  522. self._lock.release()
  523. logger.info(XFROUT_NEW_CONFIG_DONE)
  524. def __create_zone_config(self, zone_config_list):
  525. new_config = {}
  526. for zconf in zone_config_list:
  527. # convert the class, origin (name) pair. First build pydnspp
  528. # object to reject invalid input.
  529. zclass_str = zconf.get('class')
  530. if zclass_str is None:
  531. #zclass_str = 'IN' # temporary
  532. zclass_str = self._cc.get_default_value('zone_config/class')
  533. zclass = RRClass(zclass_str)
  534. zorigin = Name(zconf['origin'], True)
  535. config_key = (zclass.to_text(), zorigin.to_text())
  536. # reject duplicate config
  537. if config_key in new_config:
  538. raise XfroutConfigError('Duplicate zone_config for ' +
  539. str(zorigin) + '/' + str(zclass))
  540. # create a new config entry, build any given (and known) config
  541. new_config[config_key] = {}
  542. if 'transfer_acl' in zconf:
  543. try:
  544. new_config[config_key]['transfer_acl'] = \
  545. REQUEST_LOADER.load(zconf['transfer_acl'])
  546. except LoaderError as e:
  547. raise XfroutConfigError('Failed to parse transfer_acl ' +
  548. 'for ' + zorigin.to_text() + '/' +
  549. zclass_str + ': ' + str(e))
  550. return new_config
  551. def set_tsig_key_ring(self, key_list):
  552. """Set the tsig_key_ring , given a TSIG key string list representation. """
  553. # XXX add values to configure zones/tsig options
  554. self.tsig_key_ring = TSIGKeyRing()
  555. # If key string list is empty, create a empty tsig_key_ring
  556. if not key_list:
  557. return
  558. for key_item in key_list:
  559. try:
  560. self.tsig_key_ring.add(TSIGKey(key_item))
  561. except InvalidParameter as ipe:
  562. logger.error(XFROUT_BAD_TSIG_KEY_STRING, str(key_item))
  563. def get_db_file(self):
  564. file, is_default = self._cc.get_remote_config_value("Auth", "database_file")
  565. # this too should be unnecessary, but currently the
  566. # 'from build' override isn't stored in the config
  567. # (and we don't have indirect python access to datasources yet)
  568. if is_default and "B10_FROM_BUILD" in os.environ:
  569. file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3"
  570. return file
  571. def increase_transfers_counter(self):
  572. '''Return False, if counter + 1 > max_transfers_out, or else
  573. return True
  574. '''
  575. ret = False
  576. self._lock.acquire()
  577. if self._transfers_counter < self._max_transfers_out:
  578. self._transfers_counter += 1
  579. ret = True
  580. self._lock.release()
  581. return ret
  582. def decrease_transfers_counter(self):
  583. self._lock.acquire()
  584. self._transfers_counter -= 1
  585. self._lock.release()
  586. class XfroutServer:
  587. def __init__(self):
  588. self._unix_socket_server = None
  589. self._listen_sock_file = UNIX_SOCKET_FILE
  590. self._shutdown_event = threading.Event()
  591. self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
  592. self._config_data = self._cc.get_full_config()
  593. self._cc.start()
  594. self._cc.add_remote_config(AUTH_SPECFILE_LOCATION);
  595. self._start_xfr_query_listener()
  596. self._start_notifier()
  597. def _start_xfr_query_listener(self):
  598. '''Start a new thread to accept xfr query. '''
  599. self._unix_socket_server = UnixSockServer(self._listen_sock_file,
  600. XfroutSession,
  601. self._shutdown_event,
  602. self._config_data,
  603. self._cc)
  604. listener = threading.Thread(target=self._unix_socket_server.serve_forever)
  605. listener.start()
  606. def _start_notifier(self):
  607. datasrc = self._unix_socket_server.get_db_file()
  608. self._notifier = notify_out.NotifyOut(datasrc)
  609. self._notifier.dispatcher()
  610. def send_notify(self, zone_name, zone_class):
  611. self._notifier.send_notify(zone_name, zone_class)
  612. def config_handler(self, new_config):
  613. '''Update config data. TODO. Do error check'''
  614. answer = create_answer(0)
  615. for key in new_config:
  616. if key not in self._config_data:
  617. answer = create_answer(1, "Unknown config data: " + str(key))
  618. continue
  619. self._config_data[key] = new_config[key]
  620. if self._unix_socket_server:
  621. try:
  622. self._unix_socket_server.update_config_data(self._config_data)
  623. except Exception as e:
  624. answer = create_answer(1,
  625. "Failed to handle new configuration: " +
  626. str(e))
  627. return answer
  628. def shutdown(self):
  629. ''' shutdown the xfrout process. The thread which is doing zone transfer-out should be
  630. terminated.
  631. '''
  632. global xfrout_server
  633. xfrout_server = None #Avoid shutdown is called twice
  634. self._shutdown_event.set()
  635. self._notifier.shutdown()
  636. if self._unix_socket_server:
  637. self._unix_socket_server.shutdown()
  638. # Wait for all threads to terminate
  639. main_thread = threading.currentThread()
  640. for th in threading.enumerate():
  641. if th is main_thread:
  642. continue
  643. th.join()
  644. def command_handler(self, cmd, args):
  645. if cmd == "shutdown":
  646. logger.info(XFROUT_RECEIVED_SHUTDOWN_COMMAND)
  647. self.shutdown()
  648. answer = create_answer(0)
  649. elif cmd == notify_out.ZONE_NEW_DATA_READY_CMD:
  650. zone_name = args.get('zone_name')
  651. zone_class = args.get('zone_class')
  652. if zone_name and zone_class:
  653. logger.info(XFROUT_NOTIFY_COMMAND, zone_name, zone_class)
  654. self.send_notify(zone_name, zone_class)
  655. answer = create_answer(0)
  656. else:
  657. answer = create_answer(1, "Bad command parameter:" + str(args))
  658. else:
  659. answer = create_answer(1, "Unknown command:" + str(cmd))
  660. return answer
  661. def run(self):
  662. '''Get and process all commands sent from cfgmgr or other modules. '''
  663. while not self._shutdown_event.is_set():
  664. self._cc.check_command(False)
  665. xfrout_server = None
  666. def signal_handler(signal, frame):
  667. if xfrout_server:
  668. xfrout_server.shutdown()
  669. sys.exit(0)
  670. def set_signal_handler():
  671. signal.signal(signal.SIGTERM, signal_handler)
  672. signal.signal(signal.SIGINT, signal_handler)
  673. def set_cmd_options(parser):
  674. parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
  675. help="display more about what is going on")
  676. if '__main__' == __name__:
  677. try:
  678. parser = OptionParser()
  679. set_cmd_options(parser)
  680. (options, args) = parser.parse_args()
  681. VERBOSE_MODE = options.verbose
  682. set_signal_handler()
  683. xfrout_server = XfroutServer()
  684. xfrout_server.run()
  685. except KeyboardInterrupt:
  686. logger.INFO(XFROUT_STOPPED_BY_KEYBOARD)
  687. except SessionError as e:
  688. logger.error(XFROUT_CC_SESSION_ERROR, str(e))
  689. except ModuleCCSessionError as e:
  690. logger.error(XFROUT_MODULECC_SESSION_ERROR, str(e))
  691. except XfroutConfigError as e:
  692. logger.error(XFROUT_CONFIG_ERROR, str(e))
  693. except SessionTimeout as e:
  694. logger.error(XFROUT_CC_SESSION_TIMEOUT_ERROR)
  695. if xfrout_server:
  696. xfrout_server.shutdown()