xfrout.py.in 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719
  1. #!@PYTHON@
  2. # Copyright (C) 2010 Internet Systems Consortium.
  3. #
  4. # Permission to use, copy, modify, and distribute this software for any
  5. # purpose with or without fee is hereby granted, provided that the above
  6. # copyright notice and this permission notice appear in all copies.
  7. #
  8. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  9. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  10. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  11. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  12. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  13. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  14. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  15. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  16. import sys; sys.path.append ('@@PYTHONPATH@@')
  17. import isc
  18. import isc.cc
  19. import threading
  20. import struct
  21. import signal
  22. from isc.datasrc import sqlite3_ds
  23. from socketserver import *
  24. import os
  25. from isc.config.ccsession import *
  26. from isc.cc import SessionError, SessionTimeout
  27. from isc.notify import notify_out
  28. import isc.util.process
  29. import socket
  30. import select
  31. import errno
  32. from optparse import OptionParser, OptionValueError
  33. from isc.util import socketserver_mixin
  34. from xfrout_messages import *
  35. isc.log.init("b10-xfrout")
  36. logger = isc.log.Logger("xfrout")
  37. try:
  38. from libutil_io_python import *
  39. from pydnspp import *
  40. except ImportError as e:
  41. # C++ loadable module may not be installed; even so the xfrout process
  42. # must keep running, so we warn about it and move forward.
  43. log.error(XFROUT_IMPORT, str(e))
  44. from isc.acl.acl import ACCEPT, REJECT, DROP
  45. from isc.acl.dns import REQUEST_LOADER
  46. isc.util.process.rename()
  47. def init_paths():
  48. global SPECFILE_PATH
  49. global AUTH_SPECFILE_PATH
  50. global UNIX_SOCKET_FILE
  51. if "B10_FROM_BUILD" in os.environ:
  52. SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/xfrout"
  53. AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth"
  54. if "B10_FROM_SOURCE_LOCALSTATEDIR" in os.environ:
  55. UNIX_SOCKET_FILE = os.environ["B10_FROM_SOURCE_LOCALSTATEDIR"] + \
  56. "/auth_xfrout_conn"
  57. else:
  58. UNIX_SOCKET_FILE = os.environ["B10_FROM_BUILD"] + "/auth_xfrout_conn"
  59. else:
  60. PREFIX = "@prefix@"
  61. DATAROOTDIR = "@datarootdir@"
  62. SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
  63. AUTH_SPECFILE_PATH = SPECFILE_PATH
  64. if "BIND10_XFROUT_SOCKET_FILE" in os.environ:
  65. UNIX_SOCKET_FILE = os.environ["BIND10_XFROUT_SOCKET_FILE"]
  66. else:
  67. UNIX_SOCKET_FILE = "@@LOCALSTATEDIR@@/auth_xfrout_conn"
  68. init_paths()
  69. SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec"
  70. AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + os.sep + "auth.spec"
  71. MAX_TRANSFERS_OUT = 10
  72. VERBOSE_MODE = False
  73. # tsig sign every N axfr packets.
  74. TSIG_SIGN_EVERY_NTH = 96
  75. XFROUT_MAX_MESSAGE_SIZE = 65535
  76. def get_rrset_len(rrset):
  77. """Returns the wire length of the given RRset"""
  78. bytes = bytearray()
  79. rrset.to_wire(bytes)
  80. return len(bytes)
  81. class XfroutSession():
  82. def __init__(self, sock_fd, request_data, server, tsig_key_ring, remote):
  83. self._sock_fd = sock_fd
  84. self._request_data = request_data
  85. self._server = server
  86. self._tsig_key_ring = tsig_key_ring
  87. self._tsig_ctx = None
  88. self._tsig_len = 0
  89. self._remote = remote
  90. self.handle()
  91. def create_tsig_ctx(self, tsig_record, tsig_key_ring):
  92. return TSIGContext(tsig_record.get_name(), tsig_record.get_rdata().get_algorithm(),
  93. tsig_key_ring)
  94. def handle(self):
  95. ''' Handle a xfrout query, send xfrout response '''
  96. try:
  97. self.dns_xfrout_start(self._sock_fd, self._request_data)
  98. #TODO, avoid catching all exceptions
  99. except Exception as e:
  100. logger.error(XFROUT_HANDLE_QUERY_ERROR, str(e))
  101. pass
  102. os.close(self._sock_fd)
  103. def _check_request_tsig(self, msg, request_data):
  104. ''' If request has a tsig record, perform tsig related checks '''
  105. tsig_record = msg.get_tsig_record()
  106. if tsig_record is not None:
  107. self._tsig_len = tsig_record.get_length()
  108. self._tsig_ctx = self.create_tsig_ctx(tsig_record, self._tsig_key_ring)
  109. tsig_error = self._tsig_ctx.verify(tsig_record, request_data)
  110. if tsig_error != TSIGError.NOERROR:
  111. return Rcode.NOTAUTH()
  112. return Rcode.NOERROR()
  113. def _parse_query_message(self, mdata):
  114. ''' parse query message to [socket,message]'''
  115. #TODO, need to add parseHeader() in case the message header is invalid
  116. try:
  117. msg = Message(Message.PARSE)
  118. Message.from_wire(msg, mdata)
  119. # TSIG related checks
  120. rcode = self._check_request_tsig(msg, mdata)
  121. # TODO The ACL check comes here
  122. except Exception as err:
  123. logger.error(XFROUT_PARSE_QUERY_ERROR, str(err))
  124. return Rcode.FORMERR(), None
  125. return rcode, msg
  126. def _get_query_zone_name(self, msg):
  127. question = msg.get_question()[0]
  128. return question.get_name().to_text()
  129. def _get_query_zone_class(self, msg):
  130. question = msg.get_question()[0]
  131. return question.get_class().to_text()
  132. def _send_data(self, sock_fd, data):
  133. size = len(data)
  134. total_count = 0
  135. while total_count < size:
  136. count = os.write(sock_fd, data[total_count:])
  137. total_count += count
  138. def _send_message(self, sock_fd, msg, tsig_ctx=None):
  139. render = MessageRenderer()
  140. # As defined in RFC5936 section3.4, perform case-preserving name
  141. # compression for AXFR message.
  142. render.set_compress_mode(MessageRenderer.CASE_SENSITIVE)
  143. render.set_length_limit(XFROUT_MAX_MESSAGE_SIZE)
  144. # XXX Currently, python wrapper doesn't accept 'None' parameter in this case,
  145. # we should remove the if statement and use a universal interface later.
  146. if tsig_ctx is not None:
  147. msg.to_wire(render, tsig_ctx)
  148. else:
  149. msg.to_wire(render)
  150. header_len = struct.pack('H', socket.htons(render.get_length()))
  151. self._send_data(sock_fd, header_len)
  152. self._send_data(sock_fd, render.get_data())
  153. def _reply_query_with_error_rcode(self, msg, sock_fd, rcode_):
  154. msg.make_response()
  155. msg.set_rcode(rcode_)
  156. self._send_message(sock_fd, msg, self._tsig_ctx)
  157. def _reply_query_with_format_error(self, msg, sock_fd):
  158. '''query message format isn't legal.'''
  159. if not msg:
  160. return # query message is invalid. send nothing back.
  161. msg.make_response()
  162. msg.set_rcode(Rcode.FORMERR())
  163. self._send_message(sock_fd, msg, self._tsig_ctx)
  164. def _zone_has_soa(self, zone):
  165. '''Judge if the zone has an SOA record.'''
  166. # In some sense, the SOA defines a zone.
  167. # If the current name server has authority for the
  168. # specific zone, we need to judge if the zone has an SOA record;
  169. # if not, we consider the zone has incomplete data, so xfrout can't
  170. # serve for it.
  171. if sqlite3_ds.get_zone_soa(zone, self._server.get_db_file()):
  172. return True
  173. return False
  174. def _zone_exist(self, zonename):
  175. '''Judge if the zone is configured by config manager.'''
  176. # Currently, if we find the zone in datasource successfully, we
  177. # consider the zone is configured, and the current name server has
  178. # authority for the specific zone.
  179. # TODO: should get zone's configuration from cfgmgr or other place
  180. # in future.
  181. return sqlite3_ds.zone_exist(zonename, self._server.get_db_file())
  182. def _check_xfrout_available(self, zone_name):
  183. '''Check if xfr request can be responsed.
  184. TODO, Get zone's configuration from cfgmgr or some other place
  185. eg. check allow_transfer setting,
  186. '''
  187. # If the current name server does not have authority for the
  188. # zone, xfrout can't serve for it, return rcode NOTAUTH.
  189. if not self._zone_exist(zone_name):
  190. return Rcode.NOTAUTH()
  191. # If we are an authoritative name server for the zone, but fail
  192. # to find the zone's SOA record in datasource, xfrout can't
  193. # provide zone transfer for it.
  194. if not self._zone_has_soa(zone_name):
  195. return Rcode.SERVFAIL()
  196. #TODO, check allow_transfer
  197. if not self._server.increase_transfers_counter():
  198. return Rcode.REFUSED()
  199. return Rcode.NOERROR()
  200. def dns_xfrout_start(self, sock_fd, msg_query):
  201. rcode_, msg = self._parse_query_message(msg_query)
  202. #TODO. create query message and parse header
  203. if rcode_ == Rcode.NOTAUTH():
  204. return self._reply_query_with_error_rcode(msg, sock_fd, rcode_)
  205. elif rcode_ != Rcode.NOERROR():
  206. return self._reply_query_with_format_error(msg, sock_fd)
  207. zone_name = self._get_query_zone_name(msg)
  208. zone_class_str = self._get_query_zone_class(msg)
  209. # TODO: should we not also include class in the check?
  210. rcode_ = self._check_xfrout_available(zone_name)
  211. if rcode_ != Rcode.NOERROR():
  212. logger.info(XFROUT_AXFR_TRANSFER_FAILED, zone_name,
  213. zone_class_str, rcode_.to_text())
  214. return self. _reply_query_with_error_rcode(msg, sock_fd, rcode_)
  215. try:
  216. logger.info(XFROUT_AXFR_TRANSFER_STARTED, zone_name, zone_class_str)
  217. self._reply_xfrout_query(msg, sock_fd, zone_name)
  218. except Exception as err:
  219. logger.error(XFROUT_AXFR_TRANSFER_ERROR, zone_name,
  220. zone_class_str, str(err))
  221. pass
  222. logger.info(XFROUT_AXFR_TRANSFER_DONE, zone_name, zone_class_str)
  223. self._server.decrease_transfers_counter()
  224. return
  225. def _clear_message(self, msg):
  226. qid = msg.get_qid()
  227. opcode = msg.get_opcode()
  228. rcode = msg.get_rcode()
  229. msg.clear(Message.RENDER)
  230. msg.set_qid(qid)
  231. msg.set_opcode(opcode)
  232. msg.set_rcode(rcode)
  233. msg.set_header_flag(Message.HEADERFLAG_AA)
  234. msg.set_header_flag(Message.HEADERFLAG_QR)
  235. return msg
  236. def _create_rrset_from_db_record(self, record):
  237. '''Create one rrset from one record of datasource, if the schema of record is changed,
  238. This function should be updated first.
  239. '''
  240. rrtype_ = RRType(record[5])
  241. rdata_ = Rdata(rrtype_, RRClass("IN"), " ".join(record[7:]))
  242. rrset_ = RRset(Name(record[2]), RRClass("IN"), rrtype_, RRTTL( int(record[4])))
  243. rrset_.add_rdata(rdata_)
  244. return rrset_
  245. def _send_message_with_last_soa(self, msg, sock_fd, rrset_soa, message_upper_len,
  246. count_since_last_tsig_sign):
  247. '''Add the SOA record to the end of message. If it can't be
  248. added, a new message should be created to send out the last soa .
  249. '''
  250. rrset_len = get_rrset_len(rrset_soa)
  251. if (count_since_last_tsig_sign == TSIG_SIGN_EVERY_NTH and
  252. message_upper_len + rrset_len >= XFROUT_MAX_MESSAGE_SIZE):
  253. # If tsig context exist, sign the packet with serial number TSIG_SIGN_EVERY_NTH
  254. self._send_message(sock_fd, msg, self._tsig_ctx)
  255. msg = self._clear_message(msg)
  256. elif (count_since_last_tsig_sign != TSIG_SIGN_EVERY_NTH and
  257. message_upper_len + rrset_len + self._tsig_len >= XFROUT_MAX_MESSAGE_SIZE):
  258. self._send_message(sock_fd, msg)
  259. msg = self._clear_message(msg)
  260. # If tsig context exist, sign the last packet
  261. msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
  262. self._send_message(sock_fd, msg, self._tsig_ctx)
  263. def _reply_xfrout_query(self, msg, sock_fd, zone_name):
  264. #TODO, there should be a better way to insert rrset.
  265. count_since_last_tsig_sign = TSIG_SIGN_EVERY_NTH
  266. msg.make_response()
  267. msg.set_header_flag(Message.HEADERFLAG_AA)
  268. soa_record = sqlite3_ds.get_zone_soa(zone_name, self._server.get_db_file())
  269. rrset_soa = self._create_rrset_from_db_record(soa_record)
  270. msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
  271. message_upper_len = get_rrset_len(rrset_soa) + self._tsig_len
  272. for rr_data in sqlite3_ds.get_zone_datas(zone_name, self._server.get_db_file()):
  273. if self._server._shutdown_event.is_set(): # Check if xfrout is shutdown
  274. logger.info(XFROUT_STOPPING)
  275. return
  276. # TODO: RRType.SOA() ?
  277. if RRType(rr_data[5]) == RRType("SOA"): #ignore soa record
  278. continue
  279. rrset_ = self._create_rrset_from_db_record(rr_data)
  280. # We calculate the maximum size of the RRset (i.e. the
  281. # size without compression) and use that to see if we
  282. # may have reached the limit
  283. rrset_len = get_rrset_len(rrset_)
  284. if message_upper_len + rrset_len < XFROUT_MAX_MESSAGE_SIZE:
  285. msg.add_rrset(Message.SECTION_ANSWER, rrset_)
  286. message_upper_len += rrset_len
  287. continue
  288. # If tsig context exist, sign every N packets
  289. if count_since_last_tsig_sign == TSIG_SIGN_EVERY_NTH:
  290. count_since_last_tsig_sign = 0
  291. self._send_message(sock_fd, msg, self._tsig_ctx)
  292. else:
  293. self._send_message(sock_fd, msg)
  294. count_since_last_tsig_sign += 1
  295. msg = self._clear_message(msg)
  296. msg.add_rrset(Message.SECTION_ANSWER, rrset_) # Add the rrset to the new message
  297. # Reserve tsig space for signed packet
  298. if count_since_last_tsig_sign == TSIG_SIGN_EVERY_NTH:
  299. message_upper_len = rrset_len + self._tsig_len
  300. else:
  301. message_upper_len = rrset_len
  302. self._send_message_with_last_soa(msg, sock_fd, rrset_soa, message_upper_len,
  303. count_since_last_tsig_sign)
  304. class UnixSockServer(socketserver_mixin.NoPollMixIn, ThreadingUnixStreamServer):
  305. '''The unix domain socket server which accept xfr query sent from auth server.'''
  306. def __init__(self, sock_file, handle_class, shutdown_event, config_data, cc):
  307. self._remove_unused_sock_file(sock_file)
  308. self._sock_file = sock_file
  309. socketserver_mixin.NoPollMixIn.__init__(self)
  310. ThreadingUnixStreamServer.__init__(self, sock_file, handle_class)
  311. self._shutdown_event = shutdown_event
  312. self._write_sock, self._read_sock = socket.socketpair()
  313. self._common_init()
  314. #self._log = log
  315. self.update_config_data(config_data)
  316. self._cc = cc
  317. def _common_init(self):
  318. self._lock = threading.Lock()
  319. self._transfers_counter = 0
  320. self._acl = REQUEST_LOADER.load("[]")
  321. def _receive_query_message(self, sock):
  322. ''' receive request message from sock'''
  323. # receive data length
  324. data_len = sock.recv(2)
  325. if not data_len:
  326. return None
  327. msg_len = struct.unpack('!H', data_len)[0]
  328. # receive data
  329. recv_size = 0
  330. msgdata = b''
  331. while recv_size < msg_len:
  332. data = sock.recv(msg_len - recv_size)
  333. if not data:
  334. return None
  335. recv_size += len(data)
  336. msgdata += data
  337. return msgdata
  338. def handle_request(self):
  339. ''' Enable server handle a request until shutdown or auth is closed.'''
  340. try:
  341. request, client_address = self.get_request()
  342. except socket.error:
  343. logger.error(XFROUT_FETCH_REQUEST_ERROR)
  344. return
  345. # Check self._shutdown_event to ensure the real shutdown comes.
  346. # Linux could trigger a spurious readable event on the _read_sock
  347. # due to a bug, so we need perform a double check.
  348. while not self._shutdown_event.is_set(): # Check if xfrout is shutdown
  349. try:
  350. (rlist, wlist, xlist) = select.select([self._read_sock, request], [], [])
  351. except select.error as e:
  352. if e.args[0] == errno.EINTR:
  353. (rlist, wlist, xlist) = ([], [], [])
  354. continue
  355. else:
  356. logger.error(XFROUT_SOCKET_SELECT_ERROR, str(e))
  357. break
  358. # self.server._shutdown_event will be set by now, if it is not a false
  359. # alarm
  360. if self._read_sock in rlist:
  361. continue
  362. try:
  363. self.process_request(request)
  364. except Exception as pre:
  365. log.error(XFROUT_PROCESS_REQUEST_ERROR, str(pre))
  366. break
  367. def _handle_request_noblock(self):
  368. """Override the function _handle_request_noblock(), it creates a new
  369. thread to handle requests for each auth"""
  370. td = threading.Thread(target=self.handle_request)
  371. td.setDaemon(True)
  372. td.start()
  373. def process_request(self, request):
  374. """Receive socket fd and query message from auth, then
  375. start a new thread to process the request."""
  376. sock_fd = recv_fd(request.fileno())
  377. if sock_fd < 0:
  378. # This may happen when one xfrout process try to connect to
  379. # xfrout unix socket server, to check whether there is another
  380. # xfrout running.
  381. if sock_fd == FD_COMM_ERROR:
  382. logger.error(XFROUT_RECEIVE_FILE_DESCRIPTOR_ERROR)
  383. return
  384. # receive request msg
  385. request_data = self._receive_query_message(request)
  386. if not request_data:
  387. return
  388. t = threading.Thread(target = self.finish_request,
  389. args = (sock_fd, request_data))
  390. if self.daemon_threads:
  391. t.daemon = True
  392. t.start()
  393. def _guess_remote(self, sock_fd):
  394. """
  395. Guess remote address and port of the socket. The socket must be a
  396. socket
  397. """
  398. # This uses a trick. If the socket is IPv4 in reality and we pretend
  399. # it to to be IPv6, it returns IPv4 address anyway. This doesn't seem
  400. # to care about the SOCK_STREAM parameter at all (which it really is,
  401. # except for testing)
  402. if socket.has_ipv6:
  403. sock = socket.fromfd(sock_fd, socket.AF_INET6, socket.SOCK_STREAM)
  404. else:
  405. # To make it work even on hosts without IPv6 support
  406. # (Any idea how to simulate this in test?)
  407. sock = socket.fromfd(sock_fd, socket.AF_INET, socket.SOCK_STREAM)
  408. return sock.getpeername()
  409. def finish_request(self, sock_fd, request_data):
  410. '''Finish one request by instantiating RequestHandlerClass.'''
  411. self.RequestHandlerClass(sock_fd, request_data, self,
  412. self.tsig_key_ring,
  413. self._guess_remote(sock_fd))
  414. def _remove_unused_sock_file(self, sock_file):
  415. '''Try to remove the socket file. If the file is being used
  416. by one running xfrout process, exit from python.
  417. If it's not a socket file or nobody is listening
  418. , it will be removed. If it can't be removed, exit from python. '''
  419. if self._sock_file_in_use(sock_file):
  420. logger.error(XFROUT_UNIX_SOCKET_FILE_IN_USE, sock_file)
  421. sys.exit(0)
  422. else:
  423. if not os.path.exists(sock_file):
  424. return
  425. try:
  426. os.unlink(sock_file)
  427. except OSError as err:
  428. logger.error(XFROUT_REMOVE_OLD_UNIX_SOCKET_FILE_ERROR, sock_file, str(err))
  429. sys.exit(0)
  430. def _sock_file_in_use(self, sock_file):
  431. '''Check whether the socket file 'sock_file' exists and
  432. is being used by one running xfrout process. If it is,
  433. return True, or else return False. '''
  434. try:
  435. sock = socket.socket(socket.AF_UNIX)
  436. sock.connect(sock_file)
  437. except socket.error as err:
  438. return False
  439. else:
  440. return True
  441. def shutdown(self):
  442. self._write_sock.send(b"shutdown") #terminate the xfrout session thread
  443. super().shutdown() # call the shutdown() of class socketserver_mixin.NoPollMixIn
  444. try:
  445. os.unlink(self._sock_file)
  446. except Exception as e:
  447. logger.error(XFROUT_REMOVE_UNIX_SOCKET_FILE_ERROR, self._sock_file, str(e))
  448. pass
  449. def update_config_data(self, new_config):
  450. '''Apply the new config setting of xfrout module. '''
  451. if 'ACL' in new_config:
  452. self._acl = REQUEST_LOADER.load(new_config['ACL'])
  453. logger.info(XFROUT_NEW_CONFIG)
  454. self._lock.acquire()
  455. self._max_transfers_out = new_config.get('transfers_out')
  456. self.set_tsig_key_ring(new_config.get('tsig_key_ring'))
  457. self._lock.release()
  458. logger.info(XFROUT_NEW_CONFIG_DONE)
  459. def set_tsig_key_ring(self, key_list):
  460. """Set the tsig_key_ring , given a TSIG key string list representation. """
  461. # XXX add values to configure zones/tsig options
  462. self.tsig_key_ring = TSIGKeyRing()
  463. # If key string list is empty, create a empty tsig_key_ring
  464. if not key_list:
  465. return
  466. for key_item in key_list:
  467. try:
  468. self.tsig_key_ring.add(TSIGKey(key_item))
  469. except InvalidParameter as ipe:
  470. logger.error(XFROUT_BAD_TSIG_KEY_STRING, str(key_item))
  471. def get_db_file(self):
  472. file, is_default = self._cc.get_remote_config_value("Auth", "database_file")
  473. # this too should be unnecessary, but currently the
  474. # 'from build' override isn't stored in the config
  475. # (and we don't have indirect python access to datasources yet)
  476. if is_default and "B10_FROM_BUILD" in os.environ:
  477. file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3"
  478. return file
  479. def increase_transfers_counter(self):
  480. '''Return False, if counter + 1 > max_transfers_out, or else
  481. return True
  482. '''
  483. ret = False
  484. self._lock.acquire()
  485. if self._transfers_counter < self._max_transfers_out:
  486. self._transfers_counter += 1
  487. ret = True
  488. self._lock.release()
  489. return ret
  490. def decrease_transfers_counter(self):
  491. self._lock.acquire()
  492. self._transfers_counter -= 1
  493. self._lock.release()
  494. class XfroutServer:
  495. def __init__(self):
  496. self._unix_socket_server = None
  497. self._listen_sock_file = UNIX_SOCKET_FILE
  498. self._shutdown_event = threading.Event()
  499. self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler, None, True)
  500. self._config_data = self._cc.get_full_config()
  501. self._cc.start()
  502. self._cc.add_remote_config(AUTH_SPECFILE_LOCATION);
  503. #self._log = isc.log.NSLogger(self._config_data.get('log_name'), self._config_data.get('log_file'),
  504. # self._config_data.get('log_severity'), self._config_data.get('log_versions'),
  505. # self._config_data.get('log_max_bytes'), True)
  506. self._start_xfr_query_listener()
  507. self._start_notifier()
  508. def _start_xfr_query_listener(self):
  509. '''Start a new thread to accept xfr query. '''
  510. self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession,
  511. self._shutdown_event, self._config_data,
  512. self._cc)
  513. listener = threading.Thread(target=self._unix_socket_server.serve_forever)
  514. listener.start()
  515. def _start_notifier(self):
  516. datasrc = self._unix_socket_server.get_db_file()
  517. self._notifier = notify_out.NotifyOut(datasrc)
  518. self._notifier.dispatcher()
  519. def send_notify(self, zone_name, zone_class):
  520. self._notifier.send_notify(zone_name, zone_class)
  521. def config_handler(self, new_config):
  522. '''Update config data. TODO. Do error check'''
  523. answer = create_answer(0)
  524. for key in new_config:
  525. if key not in self._config_data:
  526. answer = create_answer(1, "Unknown config data: " + str(key))
  527. continue
  528. self._config_data[key] = new_config[key]
  529. if self._unix_socket_server:
  530. try:
  531. self._unix_socket_server.update_config_data(self._config_data)
  532. except Exception as e:
  533. answer = create_answer(1, "Bad configuration: " + str(e))
  534. return answer
  535. def shutdown(self):
  536. ''' shutdown the xfrout process. The thread which is doing zone transfer-out should be
  537. terminated.
  538. '''
  539. global xfrout_server
  540. xfrout_server = None #Avoid shutdown is called twice
  541. self._shutdown_event.set()
  542. self._notifier.shutdown()
  543. if self._unix_socket_server:
  544. self._unix_socket_server.shutdown()
  545. # Wait for all threads to terminate
  546. main_thread = threading.currentThread()
  547. for th in threading.enumerate():
  548. if th is main_thread:
  549. continue
  550. th.join()
  551. def command_handler(self, cmd, args):
  552. if cmd == "shutdown":
  553. logger.info(XFROUT_RECEIVED_SHUTDOWN_COMMAND)
  554. self.shutdown()
  555. answer = create_answer(0)
  556. elif cmd == notify_out.ZONE_NEW_DATA_READY_CMD:
  557. zone_name = args.get('zone_name')
  558. zone_class = args.get('zone_class')
  559. if zone_name and zone_class:
  560. logger.info(XFROUT_NOTIFY_COMMAND, zone_name, zone_class)
  561. self.send_notify(zone_name, zone_class)
  562. answer = create_answer(0)
  563. else:
  564. answer = create_answer(1, "Bad command parameter:" + str(args))
  565. else:
  566. answer = create_answer(1, "Unknown command:" + str(cmd))
  567. return answer
  568. def run(self):
  569. '''Get and process all commands sent from cfgmgr or other modules. '''
  570. while not self._shutdown_event.is_set():
  571. self._cc.check_command(False)
  572. xfrout_server = None
  573. def signal_handler(signal, frame):
  574. if xfrout_server:
  575. xfrout_server.shutdown()
  576. sys.exit(0)
  577. def set_signal_handler():
  578. signal.signal(signal.SIGTERM, signal_handler)
  579. signal.signal(signal.SIGINT, signal_handler)
  580. def set_cmd_options(parser):
  581. parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
  582. help="display more about what is going on")
  583. if '__main__' == __name__:
  584. try:
  585. parser = OptionParser()
  586. set_cmd_options(parser)
  587. (options, args) = parser.parse_args()
  588. VERBOSE_MODE = options.verbose
  589. set_signal_handler()
  590. xfrout_server = XfroutServer()
  591. xfrout_server.run()
  592. except KeyboardInterrupt:
  593. logger.INFO(XFROUT_STOPPED_BY_KEYBOARD)
  594. except SessionError as e:
  595. logger.error(XFROUT_CC_SESSION_ERROR, str(e))
  596. except SessionTimeout as e:
  597. logger.error(XFROUT_CC_SESSION_TIMEOUT_ERROR)
  598. if xfrout_server:
  599. xfrout_server.shutdown()