xfrout.py.in 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. #!@PYTHON@
  2. # Copyright (C) 2010 Internet Systems Consortium.
  3. #
  4. # Permission to use, copy, modify, and distribute this software for any
  5. # purpose with or without fee is hereby granted, provided that the above
  6. # copyright notice and this permission notice appear in all copies.
  7. #
  8. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  9. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  10. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  11. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  12. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  13. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  14. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  15. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  16. import sys; sys.path.append ('@@PYTHONPATH@@')
  17. import isc
  18. import isc.cc
  19. import os
  20. import threading
  21. import struct
  22. import signal
  23. from isc.auth import sqlite3_ds
  24. from socketserver import *
  25. import os
  26. from isc.config.ccsession import *
  27. import socket
  28. from bind10_xfr import *
  29. from bind10_dns import *
  30. from optparse import OptionParser, OptionValueError
  31. try:
  32. from bind10_xfr import *
  33. from bind10_dns import *
  34. except:
  35. pass
  36. if "B10_FROM_SOURCE" in os.environ:
  37. SPECFILE_PATH = os.environ["B10_FROM_SOURCE"] + "/src/bin/xfrout"
  38. else:
  39. PREFIX = "@prefix@"
  40. DATAROOTDIR = "@datarootdir@"
  41. SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
  42. SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec"
  43. MAX_TRANSFERS_OUT = 10
  44. verbose_mode = True
  45. class XfroutException(Exception): pass
  46. class XfroutSession(BaseRequestHandler):
  47. def handle(self):
  48. fd = recv_fd(self.request.fileno())
  49. data_len = self.request.recv(2)
  50. msg_len = struct.unpack('H', data_len)[0]
  51. msgdata = self.request.recv(msg_len)
  52. sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
  53. try:
  54. self.dns_xfrout_start(sock, msgdata)
  55. except Exception as e:
  56. self.log_msg(str(e))
  57. sock.close()
  58. def _parse_query_message(self, mdata):
  59. ''' parse query message to [socket,message]'''
  60. #TODO, need to add parseHeader() in case the message header is invalid
  61. try:
  62. msg = message(message_mode.PARSE)
  63. msg.from_wire(input_buffer(mdata))
  64. except Exception as err:
  65. self.log_msg(str(err))
  66. return rcode.FORMERR(), None
  67. return rcode.NOERROR(), msg
  68. def _get_query_zone_name(self, msg):
  69. q_iter = question_iter(msg)
  70. question = q_iter.get_question()
  71. return question.get_name().to_text()
  72. def _send_data(self, sock, data):
  73. size = len(data)
  74. total_count = 0
  75. while total_count < size:
  76. count = sock.send(data[total_count:])
  77. total_count += count
  78. def _send_message(self, sock, msg):
  79. obuf = output_buffer(0)
  80. render = message_render(obuf)
  81. msg.to_wire(render)
  82. header_len = struct.pack('H', socket.htons(obuf.get_length()))
  83. self._send_data(sock, header_len)
  84. self._send_data(sock, obuf.get_data())
  85. def _reply_query_with_error_rcode(self, msg, sock, rcode_):
  86. msg.make_response()
  87. msg.set_rcode(rcode_)
  88. self._send_message(sock, msg)
  89. def _reply_query_with_format_error(self, sock, msg):
  90. '''query message format isn't legal.'''
  91. if not msg:
  92. return # query message is invalid. send nothing back.
  93. msg.make_response()
  94. msg.set_rcode(rcode.FORMERR())
  95. self._send_message(sock, msg)
  96. def _zone_is_empty(self, zone):
  97. if sqlite3_ds.get_zone_soa(zone, self.server.get_db_file()):
  98. return False
  99. return True
  100. def _zone_exist(self, zonename):
  101. # Find zone in datasource, should this works? maybe should ask
  102. # config manager.
  103. soa = sqlite3_ds.get_zone_soa(zonename, self.server.get_db_file())
  104. if soa:
  105. return True
  106. return False
  107. def _check_xfrout_available(self, zone_name):
  108. '''Check if xfr request can be responsed.
  109. TODO, Get zone's configuration from cfgmgr or some other place
  110. eg. check allow_transfer setting,
  111. '''
  112. if not self._zone_exist(zone_name):
  113. return rcode.NOTAUTH()
  114. if self._zone_is_empty(zone_name):
  115. return rcode.SERVFAIL()
  116. #TODO, check allow_transfer
  117. if not self.server.increase_transfers_counter():
  118. return rcode.REFUSED()
  119. return rcode.NOERROR()
  120. def dns_xfrout_start(self, sock, msg_query):
  121. rcode_, msg = self._parse_query_message(msg_query)
  122. #TODO. create query message and parse header
  123. if rcode_ != rcode.NOERROR():
  124. return self._reply_query_with_format_error(msg, sock, msg_query)
  125. zone_name = self._get_query_zone_name(msg)
  126. rcode_ = self._check_xfrout_available(zone_name)
  127. if rcode_ != rcode.NOERROR():
  128. return self. _reply_query_with_error_rcode(msg, sock, rcode_)
  129. try:
  130. if verbose_mode:
  131. self.log_msg("transfer of '%s/IN': AXFR started" % zone_name)
  132. self._reply_xfrout_query(msg, sock, zone_name)
  133. if verbose_mode:
  134. self.log_msg("transfer of '%s/IN': AXFR end" % zone_name)
  135. except Exception as err:
  136. if verbose_mode:
  137. sys.stderr.write(str(err))
  138. self.server.decrease_transfers_counter()
  139. return
  140. def _clear_message(self, msg):
  141. qid = msg.get_qid()
  142. opcode = msg.get_opcode()
  143. rcode = msg.get_rcode()
  144. msg.clear(message_mode.RENDER)
  145. msg.set_qid(qid)
  146. msg.set_opcode(opcode)
  147. msg.set_rcode(rcode)
  148. msg.set_header_flag(message_flag.AA())
  149. msg.set_header_flag(message_flag.QR())
  150. return msg
  151. def _create_rrset_from_db_record(self, record):
  152. '''Create one rrset from one record of datasource, if the schema of record is changed,
  153. This function should be updated first.
  154. '''
  155. rrtype_ = rr_type(record[5])
  156. rdata_ = create_rdata(rrtype_, rr_class.IN(), " ".join(record[7:]))
  157. rrset_ = rrset(name(record[2]), rr_class.IN(), rrtype_, rr_ttl( int(record[4])))
  158. rrset_.add_rdata(rdata_)
  159. return rrset_
  160. def _send_message_with_last_soa(self, msg, sock, rrset_soa):
  161. '''Add the SOA record to the end of message. If it can't be
  162. added, a new message should be created to send out the last soa .
  163. '''
  164. obuf = output_buffer(0)
  165. render = message_render(obuf)
  166. msg.to_wire(render)
  167. old_message_len = obuf.get_length()
  168. msg.add_rrset(section.ANSWER(), rrset_soa)
  169. msg.to_wire(render)
  170. message_len = obuf.get_length()
  171. if message_len != old_message_len:
  172. self._send_message(sock, msg)
  173. else:
  174. msg = self._clear_message(msg)
  175. msg.add_rrset(section.ANSWER(), rrset_soa)
  176. self._send_message(sock, msg)
  177. def _get_message_len(self, msg):
  178. '''Get message length, every time need do like this? Actually there should be
  179. a better way, I need check with jinmei later.
  180. '''
  181. obuf = output_buffer(0)
  182. render = message_render(obuf)
  183. msg.to_wire(render)
  184. return obuf.get_length()
  185. def _reply_xfrout_query(self, msg, sock, zone_name):
  186. #TODO, there should be a better way to insert rrset.
  187. msg.make_response()
  188. msg.set_header_flag(message_flag.AA())
  189. soa_record = sqlite3_ds.get_zone_soa(zone_name, self.server.get_db_file())
  190. rrset_soa = self._create_rrset_from_db_record(soa_record)
  191. msg.add_rrset(section.ANSWER(), rrset_soa)
  192. old_message_len = 0
  193. # TODO, Since add_rrset() return nothing when rrset can't be added, so I have to compare
  194. # the message length to know if the rrset has been added sucessfully.
  195. for rr_data in sqlite3_ds.get_zone_datas(zone_name, self.server.get_db_file()):
  196. if self.server._shutdown_event.is_set(): # Check if xfrout is shutdown
  197. raise XfroutException("shutdown!")
  198. if rr_type(rr_data[5]) == rr_type.SOA(): #ignore soa record
  199. continue
  200. rrset_ = self._create_rrset_from_db_record(rr_data)
  201. msg.add_rrset(section.ANSWER(), rrset_)
  202. message_len = self._get_message_len(msg)
  203. if message_len != old_message_len:
  204. old_message_len = message_len
  205. continue
  206. self._send_message(sock, msg)
  207. msg = self._clear_message(msg)
  208. msg.add_rrset(section.ANSWER(), rrset_) # Add the rrset to the new message
  209. old_message_len = 0
  210. self._send_message_with_last_soa(msg, sock, rrset_soa)
  211. def log_msg(self, msg):
  212. print('[b10-xfrout] ', msg)
  213. class UnixSockServer(ThreadingUnixStreamServer):
  214. '''The unix domain socket server which accept xfr query sent from auth server.'''
  215. def __init__(self, sock_file, handle_class, shutdown_event, config_data):
  216. ThreadingUnixStreamServer.__init__(self, sock_file, handle_class)
  217. self._lock = threading.Lock()
  218. self._transfers_counter = 0
  219. self._shutdown_event = shutdown_event
  220. self.update_config_data(config_data)
  221. def update_config_data(self, new_config):
  222. '''Apply the new config setting of xfrout module. '''
  223. self._lock.acquire()
  224. self._max_transfers_out = new_config.get('transfers_out')
  225. self._db_file = new_config.get('db_file')
  226. self._lock.release()
  227. def get_db_file(self):
  228. self._lock.acquire()
  229. file = self._db_file
  230. self._lock.release()
  231. return file
  232. def increase_transfers_counter(self):
  233. '''Return False, if counter + 1 > max_transfers_out, or else
  234. return True
  235. '''
  236. ret = False
  237. self._lock.acquire()
  238. if self._transfers_counter < self._max_transfers_out:
  239. self._transfers_counter += 1
  240. ret = True
  241. self._lock.release()
  242. return ret
  243. def decrease_transfers_counter(self):
  244. self._lock.acquire()
  245. self._transfers_counter -= 1
  246. self._lock.release()
  247. def listen_on_xfr_query(unix_socket_server):
  248. '''Listen xfr query in one single thread. '''
  249. unix_socket_server.serve_forever()
  250. class XfroutServer:
  251. def __init__(self):
  252. self._init_config_data()
  253. self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
  254. self._cc.start()
  255. self._lock = threading.Lock()
  256. self._shutdown_event = threading.Event()
  257. self._listen_sock_file = '/tmp/auth_xfrout_conn' # TODO, should this be configurable in cfgmgr
  258. self._start_xfr_query_listener()
  259. def _start_xfr_query_listener(self):
  260. '''Start a new thread to accept xfr query. '''
  261. try:
  262. os.unlink(self._listen_sock_file)
  263. except:
  264. pass
  265. self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession,
  266. self._shutdown_event, self._config_data);
  267. listener = threading.Thread(target = listen_on_xfr_query, args = (self._unix_socket_server,))
  268. listener.start()
  269. def _init_config_data(self):
  270. '''Init the config item here. In case there is some error in config data got from cfgmgr.'''
  271. self._config_data = {'transfers_out': 10, 'db_file':'/tmp/zone.sqlite3'}
  272. def config_handler(self, new_config):
  273. '''Update config data. TODO. Do error check'''
  274. answer = create_answer(0)
  275. for key in new_config:
  276. if key not in self._config_data:
  277. answer = create_answer(1, "Unknown config data: " + str(key))
  278. continue
  279. self._config_data[key] = new_config[key]
  280. return answer
  281. def shutdown(self):
  282. ''' shutdown the xfrin process. the thread which is doing xfrin should be
  283. terminated.
  284. '''
  285. self._shutdown_event.set()
  286. self._unix_socket_server.shutdown()
  287. main_thread = threading.currentThread()
  288. for th in threading.enumerate():
  289. if th is main_thread:
  290. continue
  291. th.join()
  292. def command_handler(self, cmd, args):
  293. if cmd == "shutdown":
  294. if verbose_mode:
  295. log_msg("Received shutdown command")
  296. self.shutdown()
  297. answer = create_answer(0)
  298. else:
  299. answer = create_answer(1, "Unknown command:" + str(cmd))
  300. return answer
  301. def run(self):
  302. '''Get and process all commands sent from cfgmgr or other modules. '''
  303. while not self._shutdown_event.is_set():
  304. self._cc.check_command()
  305. xfrout_server = None
  306. def signal_handler(signal, frame):
  307. if xfrout_server:
  308. xfrout_server.shutdown()
  309. sys.exit(0)
  310. def set_signal_handler():
  311. signal.signal(signal.SIGTERM, signal_handler)
  312. signal.signal(signal.SIGINT, signal_handler)
  313. def set_cmd_options(parser):
  314. parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
  315. help="display more about what is going on")
  316. if '__main__' == __name__:
  317. try:
  318. parser = OptionParser()
  319. set_cmd_options(parser)
  320. (options, args) = parser.parse_args()
  321. verbose_mode = options.verbose
  322. set_signal_handler()
  323. xfrout_server = XfroutServer()
  324. xfrout_server.run()
  325. except KeyboardInterrupt:
  326. print("[b10-xfrout] exit xfrout process")
  327. except Exception as e:
  328. print('[b10-xfrout] ', e)
  329. if xfrout_server:
  330. xfrout_server.shutdown()