xfrout.py.in 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730
  1. #!@PYTHON@
  2. # Copyright (C) 2010 Internet Systems Consortium.
  3. #
  4. # Permission to use, copy, modify, and distribute this software for any
  5. # purpose with or without fee is hereby granted, provided that the above
  6. # copyright notice and this permission notice appear in all copies.
  7. #
  8. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  9. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  10. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  11. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  12. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  13. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  14. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  15. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  16. import sys; sys.path.append ('@@PYTHONPATH@@')
  17. import isc
  18. import isc.cc
  19. import threading
  20. import struct
  21. import signal
  22. from isc.datasrc import sqlite3_ds
  23. from socketserver import *
  24. import os
  25. from isc.config.ccsession import *
  26. from isc.cc import SessionError, SessionTimeout
  27. from isc.notify import notify_out
  28. import isc.util.process
  29. import socket
  30. import select
  31. import errno
  32. from optparse import OptionParser, OptionValueError
  33. from isc.util import socketserver_mixin
  34. from xfrout_messages import *
  35. isc.log.init("b10-xfrout")
  36. logger = isc.log.Logger("xfrout")
  37. try:
  38. from libutil_io_python import *
  39. from pydnspp import *
  40. except ImportError as e:
  41. # C++ loadable module may not be installed; even so the xfrout process
  42. # must keep running, so we warn about it and move forward.
  43. log.error(XFROUT_IMPORT, str(e))
  44. from isc.acl.acl import ACCEPT, REJECT, DROP
  45. from isc.acl.dns import REQUEST_LOADER
  46. isc.util.process.rename()
  47. def init_paths():
  48. global SPECFILE_PATH
  49. global AUTH_SPECFILE_PATH
  50. global UNIX_SOCKET_FILE
  51. if "B10_FROM_BUILD" in os.environ:
  52. SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/xfrout"
  53. AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth"
  54. if "B10_FROM_SOURCE_LOCALSTATEDIR" in os.environ:
  55. UNIX_SOCKET_FILE = os.environ["B10_FROM_SOURCE_LOCALSTATEDIR"] + \
  56. "/auth_xfrout_conn"
  57. else:
  58. UNIX_SOCKET_FILE = os.environ["B10_FROM_BUILD"] + "/auth_xfrout_conn"
  59. else:
  60. PREFIX = "@prefix@"
  61. DATAROOTDIR = "@datarootdir@"
  62. SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
  63. AUTH_SPECFILE_PATH = SPECFILE_PATH
  64. if "BIND10_XFROUT_SOCKET_FILE" in os.environ:
  65. UNIX_SOCKET_FILE = os.environ["BIND10_XFROUT_SOCKET_FILE"]
  66. else:
  67. UNIX_SOCKET_FILE = "@@LOCALSTATEDIR@@/auth_xfrout_conn"
  68. init_paths()
  69. SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec"
  70. AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + os.sep + "auth.spec"
  71. MAX_TRANSFERS_OUT = 10
  72. VERBOSE_MODE = False
  73. # tsig sign every N axfr packets.
  74. TSIG_SIGN_EVERY_NTH = 96
  75. XFROUT_MAX_MESSAGE_SIZE = 65535
  76. def get_rrset_len(rrset):
  77. """Returns the wire length of the given RRset"""
  78. bytes = bytearray()
  79. rrset.to_wire(bytes)
  80. return len(bytes)
  81. class XfroutSession():
  82. def __init__(self, sock_fd, request_data, server, tsig_key_ring, remote,
  83. acl):
  84. self._sock_fd = sock_fd
  85. self._request_data = request_data
  86. self._server = server
  87. self._tsig_key_ring = tsig_key_ring
  88. self._tsig_ctx = None
  89. self._tsig_len = 0
  90. self._remote = remote
  91. self._acl = acl
  92. self.handle()
  93. def create_tsig_ctx(self, tsig_record, tsig_key_ring):
  94. return TSIGContext(tsig_record.get_name(), tsig_record.get_rdata().get_algorithm(),
  95. tsig_key_ring)
  96. def handle(self):
  97. ''' Handle a xfrout query, send xfrout response '''
  98. try:
  99. self.dns_xfrout_start(self._sock_fd, self._request_data)
  100. #TODO, avoid catching all exceptions
  101. except Exception as e:
  102. logger.error(XFROUT_HANDLE_QUERY_ERROR, e)
  103. pass
  104. os.close(self._sock_fd)
  105. def _check_request_tsig(self, msg, request_data):
  106. ''' If request has a tsig record, perform tsig related checks '''
  107. tsig_record = msg.get_tsig_record()
  108. if tsig_record is not None:
  109. self._tsig_len = tsig_record.get_length()
  110. self._tsig_ctx = self.create_tsig_ctx(tsig_record, self._tsig_key_ring)
  111. tsig_error = self._tsig_ctx.verify(tsig_record, request_data)
  112. if tsig_error != TSIGError.NOERROR:
  113. return Rcode.NOTAUTH()
  114. return Rcode.NOERROR()
  115. def _parse_query_message(self, mdata):
  116. ''' parse query message to [socket,message]'''
  117. #TODO, need to add parseHeader() in case the message header is invalid
  118. try:
  119. msg = Message(Message.PARSE)
  120. Message.from_wire(msg, mdata)
  121. # TSIG related checks
  122. rcode = self._check_request_tsig(msg, mdata)
  123. if rcode == Rcode.NOERROR():
  124. # ACL checks
  125. acl_result = self._acl.execute(
  126. isc.acl.dns.RequestContext(self._remote))
  127. if acl_result == DROP:
  128. logger.info(XFROUT_QUERY_DROPPED,
  129. self._get_query_zone_name(msg),
  130. self._get_query_zone_class(msg),
  131. self._remote[0], self._remote[1])
  132. return None, None
  133. elif acl_result == REJECT:
  134. logger.info(XFROUT_QUERY_REJECTED,
  135. self._get_query_zone_name(msg),
  136. self._get_query_zone_class(msg),
  137. self._remote[0], self._remote[1])
  138. return Rcode.REFUSED(), msg
  139. except Exception as err:
  140. logger.error(XFROUT_PARSE_QUERY_ERROR, err)
  141. return Rcode.FORMERR(), None
  142. return rcode, msg
  143. def _get_query_zone_name(self, msg):
  144. question = msg.get_question()[0]
  145. return question.get_name().to_text()
  146. def _get_query_zone_class(self, msg):
  147. question = msg.get_question()[0]
  148. return question.get_class().to_text()
  149. def _send_data(self, sock_fd, data):
  150. size = len(data)
  151. total_count = 0
  152. while total_count < size:
  153. count = os.write(sock_fd, data[total_count:])
  154. total_count += count
  155. def _send_message(self, sock_fd, msg, tsig_ctx=None):
  156. render = MessageRenderer()
  157. # As defined in RFC5936 section3.4, perform case-preserving name
  158. # compression for AXFR message.
  159. render.set_compress_mode(MessageRenderer.CASE_SENSITIVE)
  160. render.set_length_limit(XFROUT_MAX_MESSAGE_SIZE)
  161. # XXX Currently, python wrapper doesn't accept 'None' parameter in this case,
  162. # we should remove the if statement and use a universal interface later.
  163. if tsig_ctx is not None:
  164. msg.to_wire(render, tsig_ctx)
  165. else:
  166. msg.to_wire(render)
  167. header_len = struct.pack('H', socket.htons(render.get_length()))
  168. self._send_data(sock_fd, header_len)
  169. self._send_data(sock_fd, render.get_data())
  170. def _reply_query_with_error_rcode(self, msg, sock_fd, rcode_):
  171. if not msg:
  172. return # query message is invalid. send nothing back.
  173. msg.make_response()
  174. msg.set_rcode(rcode_)
  175. self._send_message(sock_fd, msg, self._tsig_ctx)
  176. def _zone_has_soa(self, zone):
  177. '''Judge if the zone has an SOA record.'''
  178. # In some sense, the SOA defines a zone.
  179. # If the current name server has authority for the
  180. # specific zone, we need to judge if the zone has an SOA record;
  181. # if not, we consider the zone has incomplete data, so xfrout can't
  182. # serve for it.
  183. if sqlite3_ds.get_zone_soa(zone, self._server.get_db_file()):
  184. return True
  185. return False
  186. def _zone_exist(self, zonename):
  187. '''Judge if the zone is configured by config manager.'''
  188. # Currently, if we find the zone in datasource successfully, we
  189. # consider the zone is configured, and the current name server has
  190. # authority for the specific zone.
  191. # TODO: should get zone's configuration from cfgmgr or other place
  192. # in future.
  193. return sqlite3_ds.zone_exist(zonename, self._server.get_db_file())
  194. def _check_xfrout_available(self, zone_name):
  195. '''Check if xfr request can be responsed.
  196. TODO, Get zone's configuration from cfgmgr or some other place
  197. eg. check allow_transfer setting,
  198. '''
  199. # If the current name server does not have authority for the
  200. # zone, xfrout can't serve for it, return rcode NOTAUTH.
  201. if not self._zone_exist(zone_name):
  202. return Rcode.NOTAUTH()
  203. # If we are an authoritative name server for the zone, but fail
  204. # to find the zone's SOA record in datasource, xfrout can't
  205. # provide zone transfer for it.
  206. if not self._zone_has_soa(zone_name):
  207. return Rcode.SERVFAIL()
  208. #TODO, check allow_transfer
  209. if not self._server.increase_transfers_counter():
  210. return Rcode.REFUSED()
  211. return Rcode.NOERROR()
  212. def dns_xfrout_start(self, sock_fd, msg_query):
  213. rcode_, msg = self._parse_query_message(msg_query)
  214. #TODO. create query message and parse header
  215. if rcode_ is None: # Dropped by ACL
  216. return
  217. elif rcode_ == Rcode.NOTAUTH() or rcode_ == Rcode.REFUSED():
  218. return self._reply_query_with_error_rcode(msg, sock_fd, rcode_)
  219. elif rcode_ != Rcode.NOERROR():
  220. return self._reply_query_with_error_rcode(msg, sock_fd,
  221. Rcode.FORMERR())
  222. zone_name = self._get_query_zone_name(msg)
  223. zone_class_str = self._get_query_zone_class(msg)
  224. # TODO: should we not also include class in the check?
  225. rcode_ = self._check_xfrout_available(zone_name)
  226. if rcode_ != Rcode.NOERROR():
  227. logger.info(XFROUT_AXFR_TRANSFER_FAILED, zone_name,
  228. zone_class_str, rcode_.to_text())
  229. return self._reply_query_with_error_rcode(msg, sock_fd, rcode_)
  230. try:
  231. logger.info(XFROUT_AXFR_TRANSFER_STARTED, zone_name, zone_class_str)
  232. self._reply_xfrout_query(msg, sock_fd, zone_name)
  233. except Exception as err:
  234. logger.error(XFROUT_AXFR_TRANSFER_ERROR, zone_name,
  235. zone_class_str, str(err))
  236. pass
  237. logger.info(XFROUT_AXFR_TRANSFER_DONE, zone_name, zone_class_str)
  238. self._server.decrease_transfers_counter()
  239. return
  240. def _clear_message(self, msg):
  241. qid = msg.get_qid()
  242. opcode = msg.get_opcode()
  243. rcode = msg.get_rcode()
  244. msg.clear(Message.RENDER)
  245. msg.set_qid(qid)
  246. msg.set_opcode(opcode)
  247. msg.set_rcode(rcode)
  248. msg.set_header_flag(Message.HEADERFLAG_AA)
  249. msg.set_header_flag(Message.HEADERFLAG_QR)
  250. return msg
  251. def _create_rrset_from_db_record(self, record):
  252. '''Create one rrset from one record of datasource, if the schema of record is changed,
  253. This function should be updated first.
  254. '''
  255. rrtype_ = RRType(record[5])
  256. rdata_ = Rdata(rrtype_, RRClass("IN"), " ".join(record[7:]))
  257. rrset_ = RRset(Name(record[2]), RRClass("IN"), rrtype_, RRTTL( int(record[4])))
  258. rrset_.add_rdata(rdata_)
  259. return rrset_
  260. def _send_message_with_last_soa(self, msg, sock_fd, rrset_soa, message_upper_len,
  261. count_since_last_tsig_sign):
  262. '''Add the SOA record to the end of message. If it can't be
  263. added, a new message should be created to send out the last soa .
  264. '''
  265. rrset_len = get_rrset_len(rrset_soa)
  266. if (count_since_last_tsig_sign == TSIG_SIGN_EVERY_NTH and
  267. message_upper_len + rrset_len >= XFROUT_MAX_MESSAGE_SIZE):
  268. # If tsig context exist, sign the packet with serial number TSIG_SIGN_EVERY_NTH
  269. self._send_message(sock_fd, msg, self._tsig_ctx)
  270. msg = self._clear_message(msg)
  271. elif (count_since_last_tsig_sign != TSIG_SIGN_EVERY_NTH and
  272. message_upper_len + rrset_len + self._tsig_len >= XFROUT_MAX_MESSAGE_SIZE):
  273. self._send_message(sock_fd, msg)
  274. msg = self._clear_message(msg)
  275. # If tsig context exist, sign the last packet
  276. msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
  277. self._send_message(sock_fd, msg, self._tsig_ctx)
  278. def _reply_xfrout_query(self, msg, sock_fd, zone_name):
  279. #TODO, there should be a better way to insert rrset.
  280. count_since_last_tsig_sign = TSIG_SIGN_EVERY_NTH
  281. msg.make_response()
  282. msg.set_header_flag(Message.HEADERFLAG_AA)
  283. soa_record = sqlite3_ds.get_zone_soa(zone_name, self._server.get_db_file())
  284. rrset_soa = self._create_rrset_from_db_record(soa_record)
  285. msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
  286. message_upper_len = get_rrset_len(rrset_soa) + self._tsig_len
  287. for rr_data in sqlite3_ds.get_zone_datas(zone_name, self._server.get_db_file()):
  288. if self._server._shutdown_event.is_set(): # Check if xfrout is shutdown
  289. logger.info(XFROUT_STOPPING)
  290. return
  291. # TODO: RRType.SOA() ?
  292. if RRType(rr_data[5]) == RRType("SOA"): #ignore soa record
  293. continue
  294. rrset_ = self._create_rrset_from_db_record(rr_data)
  295. # We calculate the maximum size of the RRset (i.e. the
  296. # size without compression) and use that to see if we
  297. # may have reached the limit
  298. rrset_len = get_rrset_len(rrset_)
  299. if message_upper_len + rrset_len < XFROUT_MAX_MESSAGE_SIZE:
  300. msg.add_rrset(Message.SECTION_ANSWER, rrset_)
  301. message_upper_len += rrset_len
  302. continue
  303. # If tsig context exist, sign every N packets
  304. if count_since_last_tsig_sign == TSIG_SIGN_EVERY_NTH:
  305. count_since_last_tsig_sign = 0
  306. self._send_message(sock_fd, msg, self._tsig_ctx)
  307. else:
  308. self._send_message(sock_fd, msg)
  309. count_since_last_tsig_sign += 1
  310. msg = self._clear_message(msg)
  311. msg.add_rrset(Message.SECTION_ANSWER, rrset_) # Add the rrset to the new message
  312. # Reserve tsig space for signed packet
  313. if count_since_last_tsig_sign == TSIG_SIGN_EVERY_NTH:
  314. message_upper_len = rrset_len + self._tsig_len
  315. else:
  316. message_upper_len = rrset_len
  317. self._send_message_with_last_soa(msg, sock_fd, rrset_soa, message_upper_len,
  318. count_since_last_tsig_sign)
  319. class UnixSockServer(socketserver_mixin.NoPollMixIn, ThreadingUnixStreamServer):
  320. '''The unix domain socket server which accept xfr query sent from auth server.'''
  321. def __init__(self, sock_file, handle_class, shutdown_event, config_data, cc):
  322. self._remove_unused_sock_file(sock_file)
  323. self._sock_file = sock_file
  324. socketserver_mixin.NoPollMixIn.__init__(self)
  325. ThreadingUnixStreamServer.__init__(self, sock_file, handle_class)
  326. self._shutdown_event = shutdown_event
  327. self._write_sock, self._read_sock = socket.socketpair()
  328. self._common_init()
  329. self.update_config_data(config_data)
  330. self._cc = cc
  331. def _common_init(self):
  332. self._lock = threading.Lock()
  333. self._transfers_counter = 0
  334. self._acl = REQUEST_LOADER.load('[{"action": "ACCEPT"}]')
  335. def _receive_query_message(self, sock):
  336. ''' receive request message from sock'''
  337. # receive data length
  338. data_len = sock.recv(2)
  339. if not data_len:
  340. return None
  341. msg_len = struct.unpack('!H', data_len)[0]
  342. # receive data
  343. recv_size = 0
  344. msgdata = b''
  345. while recv_size < msg_len:
  346. data = sock.recv(msg_len - recv_size)
  347. if not data:
  348. return None
  349. recv_size += len(data)
  350. msgdata += data
  351. return msgdata
  352. def handle_request(self):
  353. ''' Enable server handle a request until shutdown or auth is closed.'''
  354. try:
  355. request, client_address = self.get_request()
  356. except socket.error:
  357. logger.error(XFROUT_FETCH_REQUEST_ERROR)
  358. return
  359. # Check self._shutdown_event to ensure the real shutdown comes.
  360. # Linux could trigger a spurious readable event on the _read_sock
  361. # due to a bug, so we need perform a double check.
  362. while not self._shutdown_event.is_set(): # Check if xfrout is shutdown
  363. try:
  364. (rlist, wlist, xlist) = select.select([self._read_sock, request], [], [])
  365. except select.error as e:
  366. if e.args[0] == errno.EINTR:
  367. (rlist, wlist, xlist) = ([], [], [])
  368. continue
  369. else:
  370. logger.error(XFROUT_SOCKET_SELECT_ERROR, str(e))
  371. break
  372. # self.server._shutdown_event will be set by now, if it is not a false
  373. # alarm
  374. if self._read_sock in rlist:
  375. continue
  376. try:
  377. self.process_request(request)
  378. except Exception as pre:
  379. log.error(XFROUT_PROCESS_REQUEST_ERROR, str(pre))
  380. break
  381. def _handle_request_noblock(self):
  382. """Override the function _handle_request_noblock(), it creates a new
  383. thread to handle requests for each auth"""
  384. td = threading.Thread(target=self.handle_request)
  385. td.setDaemon(True)
  386. td.start()
  387. def process_request(self, request):
  388. """Receive socket fd and query message from auth, then
  389. start a new thread to process the request."""
  390. sock_fd = recv_fd(request.fileno())
  391. if sock_fd < 0:
  392. # This may happen when one xfrout process try to connect to
  393. # xfrout unix socket server, to check whether there is another
  394. # xfrout running.
  395. if sock_fd == FD_COMM_ERROR:
  396. logger.error(XFROUT_RECEIVE_FILE_DESCRIPTOR_ERROR)
  397. return
  398. # receive request msg
  399. request_data = self._receive_query_message(request)
  400. if not request_data:
  401. return
  402. t = threading.Thread(target = self.finish_request,
  403. args = (sock_fd, request_data))
  404. if self.daemon_threads:
  405. t.daemon = True
  406. t.start()
  407. def _guess_remote(self, sock_fd):
  408. """
  409. Guess remote address and port of the socket. The sock_fd must be a
  410. socket
  411. """
  412. # This uses a trick. If the socket is IPv4 in reality and we pretend
  413. # it to be IPv6, it returns IPv4 address anyway. This doesn't seem
  414. # to care about the SOCK_STREAM parameter at all (which it really is,
  415. # except for testing)
  416. if socket.has_ipv6:
  417. sock = socket.fromfd(sock_fd, socket.AF_INET6, socket.SOCK_STREAM)
  418. else:
  419. # To make it work even on hosts without IPv6 support
  420. # (Any idea how to simulate this in test?)
  421. sock = socket.fromfd(sock_fd, socket.AF_INET, socket.SOCK_STREAM)
  422. return sock.getpeername()
  423. def finish_request(self, sock_fd, request_data):
  424. '''Finish one request by instantiating RequestHandlerClass.'''
  425. self.RequestHandlerClass(sock_fd, request_data, self,
  426. self.tsig_key_ring,
  427. self._guess_remote(sock_fd), self._acl)
  428. def _remove_unused_sock_file(self, sock_file):
  429. '''Try to remove the socket file. If the file is being used
  430. by one running xfrout process, exit from python.
  431. If it's not a socket file or nobody is listening
  432. , it will be removed. If it can't be removed, exit from python. '''
  433. if self._sock_file_in_use(sock_file):
  434. logger.error(XFROUT_UNIX_SOCKET_FILE_IN_USE, sock_file)
  435. sys.exit(0)
  436. else:
  437. if not os.path.exists(sock_file):
  438. return
  439. try:
  440. os.unlink(sock_file)
  441. except OSError as err:
  442. logger.error(XFROUT_REMOVE_OLD_UNIX_SOCKET_FILE_ERROR, sock_file, str(err))
  443. sys.exit(0)
  444. def _sock_file_in_use(self, sock_file):
  445. '''Check whether the socket file 'sock_file' exists and
  446. is being used by one running xfrout process. If it is,
  447. return True, or else return False. '''
  448. try:
  449. sock = socket.socket(socket.AF_UNIX)
  450. sock.connect(sock_file)
  451. except socket.error as err:
  452. return False
  453. else:
  454. return True
  455. def shutdown(self):
  456. self._write_sock.send(b"shutdown") #terminate the xfrout session thread
  457. super().shutdown() # call the shutdown() of class socketserver_mixin.NoPollMixIn
  458. try:
  459. os.unlink(self._sock_file)
  460. except Exception as e:
  461. logger.error(XFROUT_REMOVE_UNIX_SOCKET_FILE_ERROR, self._sock_file, str(e))
  462. pass
  463. def update_config_data(self, new_config):
  464. '''Apply the new config setting of xfrout module. '''
  465. logger.info(XFROUT_NEW_CONFIG)
  466. if 'query_acl' in new_config:
  467. self._acl = REQUEST_LOADER.load(new_config['query_acl'])
  468. self._lock.acquire()
  469. self._max_transfers_out = new_config.get('transfers_out')
  470. self.set_tsig_key_ring(new_config.get('tsig_key_ring'))
  471. self._lock.release()
  472. logger.info(XFROUT_NEW_CONFIG_DONE)
  473. def set_tsig_key_ring(self, key_list):
  474. """Set the tsig_key_ring , given a TSIG key string list representation. """
  475. # XXX add values to configure zones/tsig options
  476. self.tsig_key_ring = TSIGKeyRing()
  477. # If key string list is empty, create a empty tsig_key_ring
  478. if not key_list:
  479. return
  480. for key_item in key_list:
  481. try:
  482. self.tsig_key_ring.add(TSIGKey(key_item))
  483. except InvalidParameter as ipe:
  484. logger.error(XFROUT_BAD_TSIG_KEY_STRING, str(key_item))
  485. def get_db_file(self):
  486. file, is_default = self._cc.get_remote_config_value("Auth", "database_file")
  487. # this too should be unnecessary, but currently the
  488. # 'from build' override isn't stored in the config
  489. # (and we don't have indirect python access to datasources yet)
  490. if is_default and "B10_FROM_BUILD" in os.environ:
  491. file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3"
  492. return file
  493. def increase_transfers_counter(self):
  494. '''Return False, if counter + 1 > max_transfers_out, or else
  495. return True
  496. '''
  497. ret = False
  498. self._lock.acquire()
  499. if self._transfers_counter < self._max_transfers_out:
  500. self._transfers_counter += 1
  501. ret = True
  502. self._lock.release()
  503. return ret
  504. def decrease_transfers_counter(self):
  505. self._lock.acquire()
  506. self._transfers_counter -= 1
  507. self._lock.release()
  508. class XfroutServer:
  509. def __init__(self):
  510. self._unix_socket_server = None
  511. self._listen_sock_file = UNIX_SOCKET_FILE
  512. self._shutdown_event = threading.Event()
  513. self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler, None, True)
  514. self._config_data = self._cc.get_full_config()
  515. self._cc.start()
  516. self._cc.add_remote_config(AUTH_SPECFILE_LOCATION);
  517. self._start_xfr_query_listener()
  518. self._start_notifier()
  519. def _start_xfr_query_listener(self):
  520. '''Start a new thread to accept xfr query. '''
  521. self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession,
  522. self._shutdown_event, self._config_data,
  523. self._cc)
  524. listener = threading.Thread(target=self._unix_socket_server.serve_forever)
  525. listener.start()
  526. def _start_notifier(self):
  527. datasrc = self._unix_socket_server.get_db_file()
  528. self._notifier = notify_out.NotifyOut(datasrc)
  529. self._notifier.dispatcher()
  530. def send_notify(self, zone_name, zone_class):
  531. self._notifier.send_notify(zone_name, zone_class)
  532. def config_handler(self, new_config):
  533. '''Update config data. TODO. Do error check'''
  534. answer = create_answer(0)
  535. for key in new_config:
  536. if key not in self._config_data:
  537. answer = create_answer(1, "Unknown config data: " + str(key))
  538. continue
  539. self._config_data[key] = new_config[key]
  540. if self._unix_socket_server:
  541. try:
  542. self._unix_socket_server.update_config_data(self._config_data)
  543. except Exception as e:
  544. answer = create_answer(1,
  545. "Failed to handle new configuration: " +
  546. str(e))
  547. return answer
  548. def shutdown(self):
  549. ''' shutdown the xfrout process. The thread which is doing zone transfer-out should be
  550. terminated.
  551. '''
  552. global xfrout_server
  553. xfrout_server = None #Avoid shutdown is called twice
  554. self._shutdown_event.set()
  555. self._notifier.shutdown()
  556. if self._unix_socket_server:
  557. self._unix_socket_server.shutdown()
  558. # Wait for all threads to terminate
  559. main_thread = threading.currentThread()
  560. for th in threading.enumerate():
  561. if th is main_thread:
  562. continue
  563. th.join()
  564. def command_handler(self, cmd, args):
  565. if cmd == "shutdown":
  566. logger.info(XFROUT_RECEIVED_SHUTDOWN_COMMAND)
  567. self.shutdown()
  568. answer = create_answer(0)
  569. elif cmd == notify_out.ZONE_NEW_DATA_READY_CMD:
  570. zone_name = args.get('zone_name')
  571. zone_class = args.get('zone_class')
  572. if zone_name and zone_class:
  573. logger.info(XFROUT_NOTIFY_COMMAND, zone_name, zone_class)
  574. self.send_notify(zone_name, zone_class)
  575. answer = create_answer(0)
  576. else:
  577. answer = create_answer(1, "Bad command parameter:" + str(args))
  578. else:
  579. answer = create_answer(1, "Unknown command:" + str(cmd))
  580. return answer
  581. def run(self):
  582. '''Get and process all commands sent from cfgmgr or other modules. '''
  583. while not self._shutdown_event.is_set():
  584. self._cc.check_command(False)
  585. xfrout_server = None
  586. def signal_handler(signal, frame):
  587. if xfrout_server:
  588. xfrout_server.shutdown()
  589. sys.exit(0)
  590. def set_signal_handler():
  591. signal.signal(signal.SIGTERM, signal_handler)
  592. signal.signal(signal.SIGINT, signal_handler)
  593. def set_cmd_options(parser):
  594. parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
  595. help="display more about what is going on")
  596. if '__main__' == __name__:
  597. try:
  598. parser = OptionParser()
  599. set_cmd_options(parser)
  600. (options, args) = parser.parse_args()
  601. VERBOSE_MODE = options.verbose
  602. set_signal_handler()
  603. xfrout_server = XfroutServer()
  604. xfrout_server.run()
  605. except KeyboardInterrupt:
  606. logger.INFO(XFROUT_STOPPED_BY_KEYBOARD)
  607. except SessionError as e:
  608. logger.error(XFROUT_CC_SESSION_ERROR, str(e))
  609. except SessionTimeout as e:
  610. logger.error(XFROUT_CC_SESSION_TIMEOUT_ERROR)
  611. if xfrout_server:
  612. xfrout_server.shutdown()