xfrout.py.in 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530
  1. #!@PYTHON@
  2. # Copyright (C) 2010 Internet Systems Consortium.
  3. #
  4. # Permission to use, copy, modify, and distribute this software for any
  5. # purpose with or without fee is hereby granted, provided that the above
  6. # copyright notice and this permission notice appear in all copies.
  7. #
  8. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  9. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  10. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  11. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  12. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  13. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  14. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  15. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  16. import sys; sys.path.append ('@@PYTHONPATH@@')
  17. import isc
  18. import isc.cc
  19. import threading
  20. import struct
  21. import signal
  22. from isc.datasrc import sqlite3_ds
  23. from socketserver import *
  24. import os
  25. from isc.config.ccsession import *
  26. from isc.log.log import *
  27. from isc.cc import SessionError, SessionTimeout
  28. from isc.notify import notify_out
  29. import socket
  30. import select
  31. import errno
  32. from optparse import OptionParser, OptionValueError
  33. try:
  34. from libxfr_python import *
  35. from pydnspp import *
  36. except ImportError as e:
  37. # C++ loadable module may not be installed; even so the xfrout process
  38. # must keep running, so we warn about it and move forward.
  39. sys.stderr.write('[b10-xfrout] failed to import DNS or XFR module: %s\n' % str(e))
  40. if "B10_FROM_BUILD" in os.environ:
  41. SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/xfrout"
  42. AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth"
  43. UNIX_SOCKET_FILE= os.environ["B10_FROM_BUILD"] + "/auth_xfrout_conn"
  44. else:
  45. PREFIX = "@prefix@"
  46. DATAROOTDIR = "@datarootdir@"
  47. SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
  48. AUTH_SPECFILE_PATH = SPECFILE_PATH
  49. UNIX_SOCKET_FILE = "@@LOCALSTATEDIR@@/auth_xfrout_conn"
  50. SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec"
  51. AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + os.sep + "auth.spec"
  52. MAX_TRANSFERS_OUT = 10
  53. VERBOSE_MODE = False
  54. XFROUT_MAX_MESSAGE_SIZE = 65535
  55. def get_rrset_len(rrset):
  56. """Returns the wire length of the given RRset"""
  57. bytes = bytearray()
  58. rrset.to_wire(bytes)
  59. return len(bytes)
  60. class XfroutSession(BaseRequestHandler):
  61. def __init__(self, request, client_address, server, log):
  62. # The initializer for the superclass may call functions
  63. # that need _log to be set, so we set it first
  64. self._log = log
  65. BaseRequestHandler.__init__(self, request, client_address, server)
  66. def handle(self):
  67. fd = recv_fd(self.request.fileno())
  68. if fd < 0:
  69. # This may happen when one xfrout process try to connect to
  70. # xfrout unix socket server, to check whether there is another
  71. # xfrout running.
  72. self._log.log_message("error", "Failed to receive the file descriptor for XFR connection")
  73. return
  74. data_len = self.request.recv(2)
  75. msg_len = struct.unpack('!H', data_len)[0]
  76. msgdata = self.request.recv(msg_len)
  77. sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
  78. try:
  79. self.dns_xfrout_start(sock, msgdata)
  80. #TODO, avoid catching all exceptions
  81. except Exception as e:
  82. self._log.log_message("error", str(e))
  83. try:
  84. sock.shutdown(socket.SHUT_RDWR)
  85. except socket.error:
  86. # Avoid socket error caused by shutting down
  87. # one non-connected socket.
  88. pass
  89. sock.close()
  90. os.close(fd)
  91. pass
  92. def _parse_query_message(self, mdata):
  93. ''' parse query message to [socket,message]'''
  94. #TODO, need to add parseHeader() in case the message header is invalid
  95. try:
  96. msg = Message(Message.PARSE)
  97. Message.from_wire(msg, mdata)
  98. except Exception as err:
  99. self._log.log_message("error", str(err))
  100. return Rcode.FORMERR(), None
  101. return Rcode.NOERROR(), msg
  102. def _get_query_zone_name(self, msg):
  103. question = msg.get_question()[0]
  104. return question.get_name().to_text()
  105. def _send_data(self, sock, data):
  106. size = len(data)
  107. total_count = 0
  108. while total_count < size:
  109. count = sock.send(data[total_count:])
  110. total_count += count
  111. def _send_message(self, sock, msg):
  112. render = MessageRenderer()
  113. render.set_length_limit(XFROUT_MAX_MESSAGE_SIZE)
  114. msg.to_wire(render)
  115. header_len = struct.pack('H', socket.htons(render.get_length()))
  116. self._send_data(sock, header_len)
  117. self._send_data(sock, render.get_data())
  118. def _reply_query_with_error_rcode(self, msg, sock, rcode_):
  119. msg.make_response()
  120. msg.set_rcode(rcode_)
  121. self._send_message(sock, msg)
  122. def _reply_query_with_format_error(self, msg, sock):
  123. '''query message format isn't legal.'''
  124. if not msg:
  125. return # query message is invalid. send nothing back.
  126. msg.make_response()
  127. msg.set_rcode(Rcode.FORMERR())
  128. self._send_message(sock, msg)
  129. def _zone_is_empty(self, zone):
  130. if sqlite3_ds.get_zone_soa(zone, self.server.get_db_file()):
  131. return False
  132. return True
  133. def _zone_exist(self, zonename):
  134. # Find zone in datasource, should this works? maybe should ask
  135. # config manager.
  136. soa = sqlite3_ds.get_zone_soa(zonename, self.server.get_db_file())
  137. if soa:
  138. return True
  139. return False
  140. def _check_xfrout_available(self, zone_name):
  141. '''Check if xfr request can be responsed.
  142. TODO, Get zone's configuration from cfgmgr or some other place
  143. eg. check allow_transfer setting,
  144. '''
  145. if not self._zone_exist(zone_name):
  146. return Rcode.NOTAUTH()
  147. if self._zone_is_empty(zone_name):
  148. return Rcode.SERVFAIL()
  149. #TODO, check allow_transfer
  150. if not self.server.increase_transfers_counter():
  151. return Rcode.REFUSED()
  152. return Rcode.NOERROR()
  153. def dns_xfrout_start(self, sock, msg_query):
  154. rcode_, msg = self._parse_query_message(msg_query)
  155. #TODO. create query message and parse header
  156. if rcode_ != Rcode.NOERROR():
  157. return self._reply_query_with_format_error(msg, sock)
  158. zone_name = self._get_query_zone_name(msg)
  159. rcode_ = self._check_xfrout_available(zone_name)
  160. if rcode_ != Rcode.NOERROR():
  161. self._log.log_message("info", "transfer of '%s/IN' failed: %s",
  162. zone_name, rcode_.to_text())
  163. return self. _reply_query_with_error_rcode(msg, sock, rcode_)
  164. try:
  165. self._log.log_message("info", "transfer of '%s/IN': AXFR started" % zone_name)
  166. self._reply_xfrout_query(msg, sock, zone_name)
  167. self._log.log_message("info", "transfer of '%s/IN': AXFR end" % zone_name)
  168. except Exception as err:
  169. self._log.log_message("error", str(err))
  170. self.server.decrease_transfers_counter()
  171. return
  172. def _clear_message(self, msg):
  173. qid = msg.get_qid()
  174. opcode = msg.get_opcode()
  175. rcode = msg.get_rcode()
  176. msg.clear(Message.RENDER)
  177. msg.set_qid(qid)
  178. msg.set_opcode(opcode)
  179. msg.set_rcode(rcode)
  180. msg.set_header_flag(MessageFlag.AA())
  181. msg.set_header_flag(MessageFlag.QR())
  182. return msg
  183. def _create_rrset_from_db_record(self, record):
  184. '''Create one rrset from one record of datasource, if the schema of record is changed,
  185. This function should be updated first.
  186. '''
  187. rrtype_ = RRType(record[5])
  188. rdata_ = Rdata(rrtype_, RRClass("IN"), " ".join(record[7:]))
  189. rrset_ = RRset(Name(record[2]), RRClass("IN"), rrtype_, RRTTL( int(record[4])))
  190. rrset_.add_rdata(rdata_)
  191. return rrset_
  192. def _send_message_with_last_soa(self, msg, sock, rrset_soa, message_upper_len):
  193. '''Add the SOA record to the end of message. If it can't be
  194. added, a new message should be created to send out the last soa .
  195. '''
  196. rrset_len = get_rrset_len(rrset_soa)
  197. if message_upper_len + rrset_len < XFROUT_MAX_MESSAGE_SIZE:
  198. msg.add_rrset(Section.ANSWER(), rrset_soa)
  199. else:
  200. self._send_message(sock, msg)
  201. msg = self._clear_message(msg)
  202. msg.add_rrset(Section.ANSWER(), rrset_soa)
  203. self._send_message(sock, msg)
  204. def _reply_xfrout_query(self, msg, sock, zone_name):
  205. #TODO, there should be a better way to insert rrset.
  206. msg.make_response()
  207. msg.set_header_flag(MessageFlag.AA())
  208. soa_record = sqlite3_ds.get_zone_soa(zone_name, self.server.get_db_file())
  209. rrset_soa = self._create_rrset_from_db_record(soa_record)
  210. msg.add_rrset(Section.ANSWER(), rrset_soa)
  211. message_upper_len = get_rrset_len(rrset_soa)
  212. for rr_data in sqlite3_ds.get_zone_datas(zone_name, self.server.get_db_file()):
  213. if self.server._shutdown_event.is_set(): # Check if xfrout is shutdown
  214. self._log.log_message("info", "xfrout process is being shutdown")
  215. return
  216. # TODO: RRType.SOA() ?
  217. if RRType(rr_data[5]) == RRType("SOA"): #ignore soa record
  218. continue
  219. rrset_ = self._create_rrset_from_db_record(rr_data)
  220. # We calculate the maximum size of the RRset (i.e. the
  221. # size without compression) and use that to see if we
  222. # may have reached the limit
  223. rrset_len = get_rrset_len(rrset_)
  224. if message_upper_len + rrset_len < XFROUT_MAX_MESSAGE_SIZE:
  225. msg.add_rrset(Section.ANSWER(), rrset_)
  226. message_upper_len += rrset_len
  227. continue
  228. self._send_message(sock, msg)
  229. msg = self._clear_message(msg)
  230. msg.add_rrset(Section.ANSWER(), rrset_) # Add the rrset to the new message
  231. message_upper_len = rrset_len
  232. self._send_message_with_last_soa(msg, sock, rrset_soa, message_upper_len)
  233. class UnixSockServer(ThreadingUnixStreamServer):
  234. '''The unix domain socket server which accept xfr query sent from auth server.'''
  235. def __init__(self, sock_file, handle_class, shutdown_event, config_data, cc, log):
  236. self._remove_unused_sock_file(sock_file)
  237. self._sock_file = sock_file
  238. ThreadingUnixStreamServer.__init__(self, sock_file, handle_class)
  239. self._lock = threading.Lock()
  240. self._transfers_counter = 0
  241. self._shutdown_event = shutdown_event
  242. self._log = log
  243. self.update_config_data(config_data)
  244. self._cc = cc
  245. def finish_request(self, request, client_address):
  246. '''Finish one request by instantiating RequestHandlerClass.'''
  247. self.RequestHandlerClass(request, client_address, self, self._log)
  248. def _remove_unused_sock_file(self, sock_file):
  249. '''Try to remove the socket file. If the file is being used
  250. by one running xfrout process, exit from python.
  251. If it's not a socket file or nobody is listening
  252. , it will be removed. If it can't be removed, exit from python. '''
  253. if self._sock_file_in_use(sock_file):
  254. sys.stderr.write("[b10-xfrout] Fail to start xfrout process, unix socket"
  255. " file '%s' is being used by another xfrout process\n" % sock_file)
  256. sys.exit(0)
  257. else:
  258. if not os.path.exists(sock_file):
  259. return
  260. try:
  261. os.unlink(sock_file)
  262. except OSError as err:
  263. sys.stderr.write('[b10-xfrout] Fail to remove file %s: %s\n' % (sock_file, err))
  264. sys.exit(0)
  265. def _sock_file_in_use(self, sock_file):
  266. '''Check whether the socket file 'sock_file' exists and
  267. is being used by one running xfrout process. If it is,
  268. return True, or else return False. '''
  269. try:
  270. sock = socket.socket(socket.AF_UNIX)
  271. sock.connect(sock_file)
  272. except socket.error as err:
  273. return False
  274. else:
  275. return True
  276. def shutdown(self):
  277. ThreadingUnixStreamServer.shutdown(self)
  278. try:
  279. os.unlink(self._sock_file)
  280. except Exception as e:
  281. self._log.log_message("error", str(e))
  282. def update_config_data(self, new_config):
  283. '''Apply the new config setting of xfrout module. '''
  284. self._log.log_message('info', 'update config data start.')
  285. self._lock.acquire()
  286. self._max_transfers_out = new_config.get('transfers_out')
  287. self._log.log_message('info', 'max transfer out : %d', self._max_transfers_out)
  288. self._lock.release()
  289. self._log.log_message('info', 'update config data complete.')
  290. def get_db_file(self):
  291. file, is_default = self._cc.get_remote_config_value("Auth", "database_file")
  292. # this too should be unnecessary, but currently the
  293. # 'from build' override isn't stored in the config
  294. # (and we don't have indirect python access to datasources yet)
  295. if is_default and "B10_FROM_BUILD" in os.environ:
  296. file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3"
  297. return file
  298. def increase_transfers_counter(self):
  299. '''Return False, if counter + 1 > max_transfers_out, or else
  300. return True
  301. '''
  302. ret = False
  303. self._lock.acquire()
  304. if self._transfers_counter < self._max_transfers_out:
  305. self._transfers_counter += 1
  306. ret = True
  307. self._lock.release()
  308. return ret
  309. def decrease_transfers_counter(self):
  310. self._lock.acquire()
  311. self._transfers_counter -= 1
  312. self._lock.release()
  313. def listen_on_xfr_query(unix_socket_server):
  314. '''Listen xfr query in one single thread. Polls for shutdown
  315. every 0.1 seconds, is there a better time?
  316. '''
  317. while True:
  318. try:
  319. unix_socket_server.serve_forever(poll_interval = 0.1)
  320. except select.error as err:
  321. # serve_forever() calls select.select(), which can be
  322. # interrupted.
  323. # If it is interrupted, it raises select.error with the
  324. # errno set to EINTR. We ignore this case, and let the
  325. # normal program flow continue by trying serve_forever()
  326. # again.
  327. if err.args[0] != errno.EINTR: raise
  328. else:
  329. # serve_forever() loop has been stoped normally.
  330. break
  331. class XfroutServer:
  332. def __init__(self):
  333. self._unix_socket_server = None
  334. self._log = None
  335. self._listen_sock_file = UNIX_SOCKET_FILE
  336. self._shutdown_event = threading.Event()
  337. self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
  338. self._config_data = self._cc.get_full_config()
  339. self._cc.start()
  340. self._cc.add_remote_config(AUTH_SPECFILE_LOCATION);
  341. self._log = isc.log.NSLogger(self._config_data.get('log_name'), self._config_data.get('log_file'),
  342. self._config_data.get('log_severity'), self._config_data.get('log_versions'),
  343. self._config_data.get('log_max_bytes'), True)
  344. self._start_xfr_query_listener()
  345. self._start_notifier()
  346. def _start_xfr_query_listener(self):
  347. '''Start a new thread to accept xfr query. '''
  348. self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession,
  349. self._shutdown_event, self._config_data,
  350. self._cc, self._log);
  351. listener = threading.Thread(target = listen_on_xfr_query, args = (self._unix_socket_server,))
  352. listener.start()
  353. def _start_notifier(self):
  354. datasrc = self._unix_socket_server.get_db_file()
  355. self._notifier = notify_out.NotifyOut(datasrc, self._log)
  356. self._notifier.dispatcher()
  357. def send_notify(self, zone_name, zone_class):
  358. self._notifier.send_notify(zone_name, zone_class)
  359. def config_handler(self, new_config):
  360. '''Update config data. TODO. Do error check'''
  361. answer = create_answer(0)
  362. for key in new_config:
  363. if key not in self._config_data:
  364. answer = create_answer(1, "Unknown config data: " + str(key))
  365. continue
  366. self._config_data[key] = new_config[key]
  367. if self._log:
  368. self._log.update_config(new_config)
  369. if self._unix_socket_server:
  370. self._unix_socket_server.update_config_data(self._config_data)
  371. return answer
  372. def shutdown(self):
  373. ''' shutdown the xfrout process. The thread which is doing zone transfer-out should be
  374. terminated.
  375. '''
  376. global xfrout_server
  377. xfrout_server = None #Avoid shutdown is called twice
  378. self._shutdown_event.set()
  379. self._notifier.shutdown()
  380. if self._unix_socket_server:
  381. self._unix_socket_server.shutdown()
  382. def command_handler(self, cmd, args):
  383. if cmd == "shutdown":
  384. self._log.log_message("info", "Received shutdown command.")
  385. self.shutdown()
  386. answer = create_answer(0)
  387. elif cmd == notify_out.ZONE_NEW_DATA_READY_CMD:
  388. zone_name = args.get('zone_name')
  389. zone_class = args.get('zone_class')
  390. if zone_name and zone_class:
  391. self._log.log_message("info", "zone '%s/%s': receive notify others command" \
  392. % (zone_name, zone_class))
  393. self.send_notify(zone_name, zone_class)
  394. answer = create_answer(0)
  395. else:
  396. answer = create_answer(1, "Bad command parameter:" + str(args))
  397. else:
  398. answer = create_answer(1, "Unknown command:" + str(cmd))
  399. return answer
  400. def run(self):
  401. '''Get and process all commands sent from cfgmgr or other modules. '''
  402. while not self._shutdown_event.is_set():
  403. self._cc.check_command()
  404. xfrout_server = None
  405. def signal_handler(signal, frame):
  406. if xfrout_server:
  407. xfrout_server.shutdown()
  408. sys.exit(0)
  409. def set_signal_handler():
  410. signal.signal(signal.SIGTERM, signal_handler)
  411. signal.signal(signal.SIGINT, signal_handler)
  412. def set_cmd_options(parser):
  413. parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
  414. help="display more about what is going on")
  415. if '__main__' == __name__:
  416. try:
  417. parser = OptionParser()
  418. set_cmd_options(parser)
  419. (options, args) = parser.parse_args()
  420. VERBOSE_MODE = options.verbose
  421. set_signal_handler()
  422. xfrout_server = XfroutServer()
  423. xfrout_server.run()
  424. except KeyboardInterrupt:
  425. sys.stderr.write("[b10-xfrout] exit xfrout process\n")
  426. except SessionError as e:
  427. sys.stderr.write("[b10-xfrout] Error creating xfrout, "
  428. "is the command channel daemon running?\n")
  429. except SessionTimeout as e:
  430. sys.stderr.write("[b10-xfrout] Error creating xfrout, "
  431. "is the configuration manager running?\n")
  432. except ModuleCCSessionError as e:
  433. sys.stderr.write("[b10-xfrout] exit xfrout process:%s\n" % str(e))
  434. if xfrout_server:
  435. xfrout_server.shutdown()