xfrout.py.in 16 KB


  1. #!@PYTHON@
  2. # Copyright (C) 2010 Internet Systems Consortium.
  3. #
  4. # Permission to use, copy, modify, and distribute this software for any
  5. # purpose with or without fee is hereby granted, provided that the above
  6. # copyright notice and this permission notice appear in all copies.
  7. #
  8. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  9. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  10. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  11. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  12. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  13. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  14. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  15. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  16. import sys; sys.path.append ('@@PYTHONPATH@@')
  17. import isc
  18. import isc.cc
  19. import threading
  20. import struct
  21. import signal
  22. from isc.datasrc import sqlite3_ds
  23. from socketserver import *
  24. import os
  25. from isc.config.ccsession import *
  26. from isc.log.log import *
  27. from isc.cc import SessionError
  28. import socket
  29. from optparse import OptionParser, OptionValueError
  30. try:
  31. from bind10_xfr import *
  32. from bind10_dns import *
  33. except ImportError as e:
  34. # C++ loadable module may not be installed; even so the xfrout process
  35. # must keep running, so we warn about it and move forward.
  36. sys.stderr.write('[b10-xfrout] failed to import DNS or XFR module: %s\n' % str(e))
  37. if "B10_FROM_SOURCE" in os.environ:
  38. SPECFILE_PATH = os.environ["B10_FROM_SOURCE"] + "/src/bin/xfrout"
  39. else:
  40. PREFIX = "@prefix@"
  41. DATAROOTDIR = "@datarootdir@"
  42. SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
  43. SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec"
  44. UNIX_SOCKET_FILE = "@@LOCALSTATEDIR@@/auth_xfrout_conn"
  45. MAX_TRANSFERS_OUT = 10
  46. verbose_mode = False
  47. class XfroutException(Exception): pass
  48. class XfroutSession(BaseRequestHandler):
  49. def handle(self):
  50. fd = recv_fd(self.request.fileno())
  51. if fd < 0:
  52. raise XfroutException("failed to receive the FD for XFR connection")
  53. data_len = self.request.recv(2)
  54. msg_len = struct.unpack('!H', data_len)[0]
  55. msgdata = self.request.recv(msg_len)
  56. sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
  57. try:
  58. self.dns_xfrout_start(sock, msgdata)
  59. except Exception as e:
  60. if verbose_mode:
  61. self.log_msg(str(e))
  62. sock.close()
  63. def _parse_query_message(self, mdata):
  64. ''' parse query message to [socket,message]'''
  65. #TODO, need to add parseHeader() in case the message header is invalid
  66. try:
  67. msg = message(message_mode.PARSE)
  68. msg.from_wire(input_buffer(mdata))
  69. except Exception as err:
  70. if verbose_mode:
  71. self.log_msg(str(err))
  72. return rcode.FORMERR(), None
  73. return rcode.NOERROR(), msg
  74. def _get_query_zone_name(self, msg):
  75. q_iter = question_iter(msg)
  76. question = q_iter.get_question()
  77. return question.get_name().to_text()
  78. def _send_data(self, sock, data):
  79. size = len(data)
  80. total_count = 0
  81. while total_count < size:
  82. count = sock.send(data[total_count:])
  83. total_count += count
  84. def _send_message(self, sock, msg):
  85. obuf = output_buffer(0)
  86. render = message_render(obuf)
  87. msg.to_wire(render)
  88. header_len = struct.pack('H', socket.htons(obuf.get_length()))
  89. self._send_data(sock, header_len)
  90. self._send_data(sock, obuf.get_data())
  91. def _reply_query_with_error_rcode(self, msg, sock, rcode_):
  92. msg.make_response()
  93. msg.set_rcode(rcode_)
  94. self._send_message(sock, msg)
  95. def _reply_query_with_format_error(self, msg, sock):
  96. '''query message format isn't legal.'''
  97. if not msg:
  98. return # query message is invalid. send nothing back.
  99. msg.make_response()
  100. msg.set_rcode(rcode.FORMERR())
  101. self._send_message(sock, msg)
  102. def _zone_is_empty(self, zone):
  103. if sqlite3_ds.get_zone_soa(zone, self.server.get_db_file()):
  104. return False
  105. return True
  106. def _zone_exist(self, zonename):
  107. # Find zone in datasource, should this works? maybe should ask
  108. # config manager.
  109. soa = sqlite3_ds.get_zone_soa(zonename, self.server.get_db_file())
  110. if soa:
  111. return True
  112. return False
  113. def _check_xfrout_available(self, zone_name):
  114. '''Check if xfr request can be responsed.
  115. TODO, Get zone's configuration from cfgmgr or some other place
  116. eg. check allow_transfer setting,
  117. '''
  118. if not self._zone_exist(zone_name):
  119. return rcode.NOTAUTH()
  120. if self._zone_is_empty(zone_name):
  121. return rcode.SERVFAIL()
  122. #TODO, check allow_transfer
  123. if not self.server.increase_transfers_counter():
  124. return rcode.REFUSED()
  125. return rcode.NOERROR()
  126. def dns_xfrout_start(self, sock, msg_query):
  127. rcode_, msg = self._parse_query_message(msg_query)
  128. #TODO. create query message and parse header
  129. if rcode_ != rcode.NOERROR():
  130. return self._reply_query_with_format_error(msg, sock)
  131. zone_name = self._get_query_zone_name(msg)
  132. rcode_ = self._check_xfrout_available(zone_name)
  133. if rcode_ != rcode.NOERROR():
  134. return self. _reply_query_with_error_rcode(msg, sock, rcode_)
  135. try:
  136. if verbose_mode:
  137. self.log_msg("transfer of '%s/IN': AXFR started" % zone_name)
  138. self._reply_xfrout_query(msg, sock, zone_name)
  139. if verbose_mode:
  140. self.log_msg("transfer of '%s/IN': AXFR end" % zone_name)
  141. except Exception as err:
  142. if verbose_mode:
  143. sys.stderr.write(str(err))
  144. self.server.decrease_transfers_counter()
  145. return
  146. def _clear_message(self, msg):
  147. qid = msg.get_qid()
  148. opcode = msg.get_opcode()
  149. rcode = msg.get_rcode()
  150. msg.clear(message_mode.RENDER)
  151. msg.set_qid(qid)
  152. msg.set_opcode(opcode)
  153. msg.set_rcode(rcode)
  154. msg.set_header_flag(message_flag.AA())
  155. msg.set_header_flag(message_flag.QR())
  156. return msg
  157. def _create_rrset_from_db_record(self, record):
  158. '''Create one rrset from one record of datasource, if the schema of record is changed,
  159. This function should be updated first.
  160. '''
  161. rrtype_ = rr_type(record[5])
  162. rdata_ = create_rdata(rrtype_, rr_class.IN(), " ".join(record[7:]))
  163. rrset_ = rrset(name(record[2]), rr_class.IN(), rrtype_, rr_ttl( int(record[4])))
  164. rrset_.add_rdata(rdata_)
  165. return rrset_
  166. def _send_message_with_last_soa(self, msg, sock, rrset_soa):
  167. '''Add the SOA record to the end of message. If it can't be
  168. added, a new message should be created to send out the last soa .
  169. '''
  170. obuf = output_buffer(0)
  171. render = message_render(obuf)
  172. msg.to_wire(render)
  173. old_message_len = obuf.get_length()
  174. msg.add_rrset(section.ANSWER(), rrset_soa)
  175. msg.to_wire(render)
  176. message_len = obuf.get_length()
  177. if message_len != old_message_len:
  178. self._send_message(sock, msg)
  179. else:
  180. msg = self._clear_message(msg)
  181. msg.add_rrset(section.ANSWER(), rrset_soa)
  182. self._send_message(sock, msg)
  183. def _get_message_len(self, msg):
  184. '''Get message length, every time need do like this? Actually there should be
  185. a better way, I need check with jinmei later.
  186. '''
  187. obuf = output_buffer(0)
  188. render = message_render(obuf)
  189. msg.to_wire(render)
  190. return obuf.get_length()
  191. def _reply_xfrout_query(self, msg, sock, zone_name):
  192. #TODO, there should be a better way to insert rrset.
  193. msg.make_response()
  194. msg.set_header_flag(message_flag.AA())
  195. soa_record = sqlite3_ds.get_zone_soa(zone_name, self.server.get_db_file())
  196. rrset_soa = self._create_rrset_from_db_record(soa_record)
  197. msg.add_rrset(section.ANSWER(), rrset_soa)
  198. old_message_len = 0
  199. # TODO, Since add_rrset() return nothing when rrset can't be added, so I have to compare
  200. # the message length to know if the rrset has been added sucessfully.
  201. for rr_data in sqlite3_ds.get_zone_datas(zone_name, self.server.get_db_file()):
  202. if self.server._shutdown_event.is_set(): # Check if xfrout is shutdown
  203. raise XfroutException("shutdown!")
  204. if rr_type(rr_data[5]) == rr_type.SOA(): #ignore soa record
  205. continue
  206. rrset_ = self._create_rrset_from_db_record(rr_data)
  207. msg.add_rrset(section.ANSWER(), rrset_)
  208. message_len = self._get_message_len(msg)
  209. if message_len != old_message_len:
  210. old_message_len = message_len
  211. continue
  212. self._send_message(sock, msg)
  213. msg = self._clear_message(msg)
  214. msg.add_rrset(section.ANSWER(), rrset_) # Add the rrset to the new message
  215. old_message_len = 0
  216. self._send_message_with_last_soa(msg, sock, rrset_soa)
  217. def log_msg(self, msg):
  218. print('[b10-xfrout] ', msg)
  219. class UnixSockServer(ThreadingUnixStreamServer):
  220. '''The unix domain socket server which accept xfr query sent from auth server.'''
  221. def __init__(self, sock_file, handle_class, shutdown_event, config_data, log):
  222. try:
  223. os.unlink(sock_file)
  224. except:
  225. pass
  226. self._sock_file = sock_file
  227. ThreadingUnixStreamServer.__init__(self, sock_file, handle_class)
  228. self._lock = threading.Lock()
  229. self._transfers_counter = 0
  230. self._shutdown_event = shutdown_event
  231. self._log = log
  232. self.update_config_data(config_data)
  233. def shutdown(self):
  234. self._log.log_message('warning', 'Xfrout process will is shutting down')
  235. ThreadingUnixStreamServer.shutdown(self)
  236. try:
  237. os.unlink(self._sock_file)
  238. except:
  239. self._log.log_message('error', 'OS unlink sock file exception while shutting down')
  240. def update_config_data(self, new_config):
  241. '''Apply the new config setting of xfrout module. '''
  242. self._log.log_message('info', 'update config data start')
  243. self._lock.acquire()
  244. self._max_transfers_out = new_config.get('transfers_out')
  245. self._log.log_message('debug', 'max transfer out : %d', self._max_transfers_out)
  246. self._db_file = new_config.get('db_file')
  247. self._log.log_message('debug', 'db file name : %s', self._db_file)
  248. self._lock.release()
  249. self._log.log_message('info', 'update config data complete')
  250. def get_db_file(self):
  251. self._lock.acquire()
  252. file = self._db_file
  253. self._lock.release()
  254. return file
  255. def increase_transfers_counter(self):
  256. '''Return False, if counter + 1 > max_transfers_out, or else
  257. return True
  258. '''
  259. ret = False
  260. self._lock.acquire()
  261. if self._transfers_counter < self._max_transfers_out:
  262. self._transfers_counter += 1
  263. ret = True
  264. self._lock.release()
  265. return ret
  266. def decrease_transfers_counter(self):
  267. self._lock.acquire()
  268. self._transfers_counter -= 1
  269. self._lock.release()
  270. def listen_on_xfr_query(unix_socket_server):
  271. '''Listen xfr query in one single thread. Polls for shutdown
  272. every 0.1 seconds, is there a better time?
  273. '''
  274. unix_socket_server.serve_forever(poll_interval = 0.1)
  275. class XfroutServer:
  276. def __init__(self):
  277. self._unix_socket_server = None
  278. self._log = None
  279. self._listen_sock_file = UNIX_SOCKET_FILE
  280. self._shutdown_event = threading.Event()
  281. self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
  282. self._config_data = self._cc.get_full_config()
  283. self._cc.start()
  284. self._log = isc.log.NSLogger(self._config_data.get('log_name'), self._config_data.get('log_file'),
  285. self._config_data.get('severity'), self._config_data.get('versions'),
  286. self._config_data.get('max_bytes'), True)
  287. self._start_xfr_query_listener()
  288. def _start_xfr_query_listener(self):
  289. '''Start a new thread to accept xfr query. '''
  290. self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession,
  291. self._shutdown_event, self._config_data,
  292. self._log);
  293. listener = threading.Thread(target = listen_on_xfr_query, args = (self._unix_socket_server,))
  294. listener.start()
  295. def config_handler(self, new_config):
  296. '''Update config data. TODO. Do error check'''
  297. answer = create_answer(0)
  298. for key in new_config:
  299. if key not in self._config_data:
  300. answer = create_answer(1, "Unknown config data: " + str(key))
  301. continue
  302. self._config_data[key] = new_config[key]
  303. if self._log:
  304. self._log.update_config(new_config)
  305. if self._unix_socket_server:
  306. self._unix_socket_server.update_config_data(self._config_data)
  307. return answer
  308. def shutdown(self):
  309. ''' shutdown the xfrout process. The thread which is doing zone transfer-out should be
  310. terminated.
  311. '''
  312. global xfrout_server
  313. xfrout_server = None #Avoid shutdown is called twice
  314. self._shutdown_event.set()
  315. if self._unix_socket_server:
  316. self._unix_socket_server.shutdown()
  317. main_thread = threading.currentThread()
  318. for th in threading.enumerate():
  319. if th is main_thread:
  320. continue
  321. th.join()
  322. def command_handler(self, cmd, args):
  323. if cmd == "shutdown":
  324. if verbose_mode:
  325. self._log.log_message("info", "Received shutdown command")
  326. self.shutdown()
  327. answer = create_answer(0)
  328. else:
  329. answer = create_answer(1, "Unknown command:" + str(cmd))
  330. return answer
  331. def run(self):
  332. '''Get and process all commands sent from cfgmgr or other modules. '''
  333. while not self._shutdown_event.is_set():
  334. self._cc.check_command()
  335. xfrout_server = None
  336. def signal_handler(signal, frame):
  337. if xfrout_server:
  338. xfrout_server.shutdown()
  339. sys.exit(0)
  340. def set_signal_handler():
  341. signal.signal(signal.SIGTERM, signal_handler)
  342. signal.signal(signal.SIGINT, signal_handler)
  343. def set_cmd_options(parser):
  344. parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
  345. help="display more about what is going on")
  346. if '__main__' == __name__:
  347. try:
  348. parser = OptionParser()
  349. set_cmd_options(parser)
  350. (options, args) = parser.parse_args()
  351. verbose_mode = options.verbose
  352. set_signal_handler()
  353. xfrout_server = XfroutServer()
  354. xfrout_server.run()
  355. except KeyboardInterrupt:
  356. sys.stderr.write("[b10-xfrout] exit xfrout process")
  357. except SessionError as e:
  358. sys.stderr.write("[b10-xfrout] Error creating xfrout,"
  359. "is the command channel daemon running?")
  360. except ModuleCCSessionError as e:
  361. sys.stderr.write("info", '[b10-xfrout] exit xfrout process:', e)
  362. if xfrout_server:
  363. xfrout_server.shutdown()