xfrout.py.in 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. #!@PYTHON@
  2. # Copyright (C) 2010 Internet Systems Consortium.
  3. #
  4. # Permission to use, copy, modify, and distribute this software for any
  5. # purpose with or without fee is hereby granted, provided that the above
  6. # copyright notice and this permission notice appear in all copies.
  7. #
  8. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  9. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  10. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  11. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  12. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  13. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  14. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  15. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  16. import sys; sys.path.append ('@@PYTHONPATH@@')
  17. import isc
  18. import isc.cc
  19. import threading
  20. import struct
  21. import signal
  22. from isc.datasrc import sqlite3_ds
  23. from socketserver import *
  24. import os
  25. from isc.config.ccsession import *
  26. from isc.cc import SessionError
  27. import socket
  28. from optparse import OptionParser, OptionValueError
  29. try:
  30. from bind10_xfr import *
  31. from bind10_dns import *
  32. except ImportError as e:
  33. # C++ loadable module may not be installed; even so the xfrout process
  34. # must keep running, so we warn about it and move forward.
  35. sys.stderr.write('[b10-xfrout] failed to import DNS or XFR module: %s\n' % str(e))
  36. if "B10_FROM_SOURCE" in os.environ:
  37. SPECFILE_PATH = os.environ["B10_FROM_SOURCE"] + "/src/bin/xfrout"
  38. else:
  39. PREFIX = "@prefix@"
  40. DATAROOTDIR = "@datarootdir@"
  41. SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
  42. SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec"
  43. UNIX_SOCKET_FILE = "@@LOCALSTATEDIR@@/auth_xfrout_conn"
  44. MAX_TRANSFERS_OUT = 10
  45. verbose_mode = False
  46. class XfroutException(Exception): pass
  47. class XfroutSession(BaseRequestHandler):
  48. def handle(self):
  49. fd = recv_fd(self.request.fileno())
  50. if fd < 0:
  51. raise XfroutException("failed to receive the FD for XFR connection")
  52. data_len = self.request.recv(2)
  53. msg_len = struct.unpack('!H', data_len)[0]
  54. msgdata = self.request.recv(msg_len)
  55. sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
  56. try:
  57. self.dns_xfrout_start(sock, msgdata)
  58. except Exception as e:
  59. if verbose_mode:
  60. self.log_msg(str(e))
  61. sock.close()
  62. def _parse_query_message(self, mdata):
  63. ''' parse query message to [socket,message]'''
  64. #TODO, need to add parseHeader() in case the message header is invalid
  65. try:
  66. msg = message(message_mode.PARSE)
  67. msg.from_wire(input_buffer(mdata))
  68. except Exception as err:
  69. if verbose_mode:
  70. self.log_msg(str(err))
  71. return rcode.FORMERR(), None
  72. return rcode.NOERROR(), msg
  73. def _get_query_zone_name(self, msg):
  74. q_iter = question_iter(msg)
  75. question = q_iter.get_question()
  76. return question.get_name().to_text()
  77. def _send_data(self, sock, data):
  78. size = len(data)
  79. total_count = 0
  80. while total_count < size:
  81. count = sock.send(data[total_count:])
  82. total_count += count
  83. def _send_message(self, sock, msg):
  84. obuf = output_buffer(0)
  85. render = message_render(obuf)
  86. msg.to_wire(render)
  87. header_len = struct.pack('H', socket.htons(obuf.get_length()))
  88. self._send_data(sock, header_len)
  89. self._send_data(sock, obuf.get_data())
  90. def _reply_query_with_error_rcode(self, msg, sock, rcode_):
  91. msg.make_response()
  92. msg.set_rcode(rcode_)
  93. self._send_message(sock, msg)
  94. def _reply_query_with_format_error(self, msg, sock):
  95. '''query message format isn't legal.'''
  96. if not msg:
  97. return # query message is invalid. send nothing back.
  98. msg.make_response()
  99. msg.set_rcode(rcode.FORMERR())
  100. self._send_message(sock, msg)
  101. def _zone_is_empty(self, zone):
  102. if sqlite3_ds.get_zone_soa(zone, self.server.get_db_file()):
  103. return False
  104. return True
  105. def _zone_exist(self, zonename):
  106. # Find zone in datasource, should this works? maybe should ask
  107. # config manager.
  108. soa = sqlite3_ds.get_zone_soa(zonename, self.server.get_db_file())
  109. if soa:
  110. return True
  111. return False
  112. def _check_xfrout_available(self, zone_name):
  113. '''Check if xfr request can be responsed.
  114. TODO, Get zone's configuration from cfgmgr or some other place
  115. eg. check allow_transfer setting,
  116. '''
  117. if not self._zone_exist(zone_name):
  118. return rcode.NOTAUTH()
  119. if self._zone_is_empty(zone_name):
  120. return rcode.SERVFAIL()
  121. #TODO, check allow_transfer
  122. if not self.server.increase_transfers_counter():
  123. return rcode.REFUSED()
  124. return rcode.NOERROR()
  125. def dns_xfrout_start(self, sock, msg_query):
  126. rcode_, msg = self._parse_query_message(msg_query)
  127. #TODO. create query message and parse header
  128. if rcode_ != rcode.NOERROR():
  129. return self._reply_query_with_format_error(msg, sock)
  130. zone_name = self._get_query_zone_name(msg)
  131. rcode_ = self._check_xfrout_available(zone_name)
  132. if rcode_ != rcode.NOERROR():
  133. return self. _reply_query_with_error_rcode(msg, sock, rcode_)
  134. try:
  135. if verbose_mode:
  136. self.log_msg("transfer of '%s/IN': AXFR started" % zone_name)
  137. self._reply_xfrout_query(msg, sock, zone_name)
  138. if verbose_mode:
  139. self.log_msg("transfer of '%s/IN': AXFR end" % zone_name)
  140. except Exception as err:
  141. if verbose_mode:
  142. sys.stderr.write(str(err))
  143. self.server.decrease_transfers_counter()
  144. return
  145. def _clear_message(self, msg):
  146. qid = msg.get_qid()
  147. opcode = msg.get_opcode()
  148. rcode = msg.get_rcode()
  149. msg.clear(message_mode.RENDER)
  150. msg.set_qid(qid)
  151. msg.set_opcode(opcode)
  152. msg.set_rcode(rcode)
  153. msg.set_header_flag(message_flag.AA())
  154. msg.set_header_flag(message_flag.QR())
  155. return msg
  156. def _create_rrset_from_db_record(self, record):
  157. '''Create one rrset from one record of datasource, if the schema of record is changed,
  158. This function should be updated first.
  159. '''
  160. rrtype_ = rr_type(record[5])
  161. rdata_ = create_rdata(rrtype_, rr_class.IN(), " ".join(record[7:]))
  162. rrset_ = rrset(name(record[2]), rr_class.IN(), rrtype_, rr_ttl( int(record[4])))
  163. rrset_.add_rdata(rdata_)
  164. return rrset_
  165. def _send_message_with_last_soa(self, msg, sock, rrset_soa):
  166. '''Add the SOA record to the end of message. If it can't be
  167. added, a new message should be created to send out the last soa .
  168. '''
  169. obuf = output_buffer(0)
  170. render = message_render(obuf)
  171. msg.to_wire(render)
  172. old_message_len = obuf.get_length()
  173. msg.add_rrset(section.ANSWER(), rrset_soa)
  174. msg.to_wire(render)
  175. message_len = obuf.get_length()
  176. if message_len != old_message_len:
  177. self._send_message(sock, msg)
  178. else:
  179. msg = self._clear_message(msg)
  180. msg.add_rrset(section.ANSWER(), rrset_soa)
  181. self._send_message(sock, msg)
  182. def _get_message_len(self, msg):
  183. '''Get message length, every time need do like this? Actually there should be
  184. a better way, I need check with jinmei later.
  185. '''
  186. obuf = output_buffer(0)
  187. render = message_render(obuf)
  188. msg.to_wire(render)
  189. return obuf.get_length()
  190. def _reply_xfrout_query(self, msg, sock, zone_name):
  191. #TODO, there should be a better way to insert rrset.
  192. msg.make_response()
  193. msg.set_header_flag(message_flag.AA())
  194. soa_record = sqlite3_ds.get_zone_soa(zone_name, self.server.get_db_file())
  195. rrset_soa = self._create_rrset_from_db_record(soa_record)
  196. msg.add_rrset(section.ANSWER(), rrset_soa)
  197. old_message_len = 0
  198. # TODO, Since add_rrset() return nothing when rrset can't be added, so I have to compare
  199. # the message length to know if the rrset has been added sucessfully.
  200. for rr_data in sqlite3_ds.get_zone_datas(zone_name, self.server.get_db_file()):
  201. if self.server._shutdown_event.is_set(): # Check if xfrout is shutdown
  202. raise XfroutException("shutdown!")
  203. if rr_type(rr_data[5]) == rr_type.SOA(): #ignore soa record
  204. continue
  205. rrset_ = self._create_rrset_from_db_record(rr_data)
  206. msg.add_rrset(section.ANSWER(), rrset_)
  207. message_len = self._get_message_len(msg)
  208. if message_len != old_message_len:
  209. old_message_len = message_len
  210. continue
  211. self._send_message(sock, msg)
  212. msg = self._clear_message(msg)
  213. msg.add_rrset(section.ANSWER(), rrset_) # Add the rrset to the new message
  214. old_message_len = 0
  215. self._send_message_with_last_soa(msg, sock, rrset_soa)
  216. def log_msg(self, msg):
  217. print('[b10-xfrout] ', msg)
  218. class UnixSockServer(ThreadingUnixStreamServer):
  219. '''The unix domain socket server which accept xfr query sent from auth server.'''
  220. def __init__(self, sock_file, handle_class, shutdown_event, config_data):
  221. try:
  222. os.unlink(sock_file)
  223. except:
  224. pass
  225. self._sock_file = sock_file
  226. ThreadingUnixStreamServer.__init__(self, sock_file, handle_class)
  227. self._lock = threading.Lock()
  228. self._transfers_counter = 0
  229. self._shutdown_event = shutdown_event
  230. self.update_config_data(config_data)
  231. def shutdown(self):
  232. ThreadingUnixStreamServer.shutdown(self)
  233. try:
  234. os.unlink(self._sock_file)
  235. except:
  236. pass
  237. def update_config_data(self, new_config):
  238. '''Apply the new config setting of xfrout module. '''
  239. self._lock.acquire()
  240. self._max_transfers_out = new_config.get('transfers_out')
  241. self._db_file = new_config.get('db_file')
  242. self._lock.release()
  243. def get_db_file(self):
  244. self._lock.acquire()
  245. file = self._db_file
  246. self._lock.release()
  247. return file
  248. def increase_transfers_counter(self):
  249. '''Return False, if counter + 1 > max_transfers_out, or else
  250. return True
  251. '''
  252. ret = False
  253. self._lock.acquire()
  254. if self._transfers_counter < self._max_transfers_out:
  255. self._transfers_counter += 1
  256. ret = True
  257. self._lock.release()
  258. return ret
  259. def decrease_transfers_counter(self):
  260. self._lock.acquire()
  261. self._transfers_counter -= 1
  262. self._lock.release()
  263. def listen_on_xfr_query(unix_socket_server):
  264. '''Listen xfr query in one single thread. Polls for shutdown
  265. every 0.1 seconds, is there a better time?
  266. '''
  267. unix_socket_server.serve_forever(poll_interval = 0.1)
  268. class XfroutServer:
  269. def __init__(self):
  270. self._unix_socket_server = None
  271. self._listen_sock_file = UNIX_SOCKET_FILE
  272. self._shutdown_event = threading.Event()
  273. self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
  274. self._config_data = self._cc.get_full_config()
  275. self._cc.start()
  276. self._start_xfr_query_listener()
  277. def _start_xfr_query_listener(self):
  278. '''Start a new thread to accept xfr query. '''
  279. self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession,
  280. self._shutdown_event, self._config_data);
  281. listener = threading.Thread(target = listen_on_xfr_query, args = (self._unix_socket_server,))
  282. listener.start()
  283. def config_handler(self, new_config):
  284. '''Update config data. TODO. Do error check'''
  285. answer = create_answer(0)
  286. for key in new_config:
  287. if key not in self._config_data:
  288. answer = create_answer(1, "Unknown config data: " + str(key))
  289. continue
  290. self._config_data[key] = new_config[key]
  291. if self._unix_socket_server:
  292. self._unix_socket_server.update_config_data(self._config_data)
  293. return answer
  294. def shutdown(self):
  295. ''' shutdown the xfrout process. The thread which is doing zone transfer-out should be
  296. terminated.
  297. '''
  298. global xfrout_server
  299. xfrout_server = None #Avoid shutdown is called twice
  300. self._shutdown_event.set()
  301. if self._unix_socket_server:
  302. self._unix_socket_server.shutdown()
  303. main_thread = threading.currentThread()
  304. for th in threading.enumerate():
  305. if th is main_thread:
  306. continue
  307. th.join()
  308. def command_handler(self, cmd, args):
  309. if cmd == "shutdown":
  310. if verbose_mode:
  311. log_msg("Received shutdown command")
  312. self.shutdown()
  313. answer = create_answer(0)
  314. else:
  315. answer = create_answer(1, "Unknown command:" + str(cmd))
  316. return answer
  317. def run(self):
  318. '''Get and process all commands sent from cfgmgr or other modules. '''
  319. while not self._shutdown_event.is_set():
  320. self._cc.check_command()
  321. xfrout_server = None
  322. def signal_handler(signal, frame):
  323. if xfrout_server:
  324. xfrout_server.shutdown()
  325. sys.exit(0)
  326. def set_signal_handler():
  327. signal.signal(signal.SIGTERM, signal_handler)
  328. signal.signal(signal.SIGINT, signal_handler)
  329. def set_cmd_options(parser):
  330. parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
  331. help="display more about what is going on")
  332. if '__main__' == __name__:
  333. try:
  334. parser = OptionParser()
  335. set_cmd_options(parser)
  336. (options, args) = parser.parse_args()
  337. verbose_mode = options.verbose
  338. set_signal_handler()
  339. xfrout_server = XfroutServer()
  340. xfrout_server.run()
  341. except KeyboardInterrupt:
  342. print("[b10-xfrout] exit xfrout process")
  343. except SessionError as e:
  344. print('[b10-xfrout] Error creating xfrout, '
  345. 'is the command channel daemon running?' )
  346. except ModuleCCSessionError as e:
  347. print('[b10-xfrout] exit xfrout process:', e)
  348. if xfrout_server:
  349. xfrout_server.shutdown()