xfrout.py.in 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691
  1. #!@PYTHON@
  2. # Copyright (C) 2010 Internet Systems Consortium.
  3. #
  4. # Permission to use, copy, modify, and distribute this software for any
  5. # purpose with or without fee is hereby granted, provided that the above
  6. # copyright notice and this permission notice appear in all copies.
  7. #
  8. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  9. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  10. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  11. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  12. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  13. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  14. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  15. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  16. import sys; sys.path.append ('@@PYTHONPATH@@')
  17. import isc
  18. import isc.cc
  19. import threading
  20. import struct
  21. import signal
  22. from isc.datasrc import sqlite3_ds
  23. from socketserver import *
  24. import os
  25. from isc.config.ccsession import *
  26. #from isc.log.log import *
  27. from isc.cc import SessionError, SessionTimeout
  28. from isc.notify import notify_out
  29. import isc.util.process
  30. import socket
  31. import select
  32. import errno
  33. from optparse import OptionParser, OptionValueError
  34. from isc.util import socketserver_mixin
  35. try:
  36. from libutil_io_python import *
  37. from pydnspp import *
  38. except ImportError as e:
  39. # C++ loadable module may not be installed; even so the xfrout process
  40. # must keep running, so we warn about it and move forward.
  41. sys.stderr.write('[b10-xfrout] failed to import DNS or isc.util.io module: %s\n' % str(e))
  42. isc.util.process.rename()
  43. def init_paths():
  44. global SPECFILE_PATH
  45. global AUTH_SPECFILE_PATH
  46. global UNIX_SOCKET_FILE
  47. if "B10_FROM_BUILD" in os.environ:
  48. SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/xfrout"
  49. AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth"
  50. if "B10_FROM_SOURCE_LOCALSTATEDIR" in os.environ:
  51. UNIX_SOCKET_FILE = os.environ["B10_FROM_SOURCE_LOCALSTATEDIR"] + \
  52. "/auth_xfrout_conn"
  53. else:
  54. UNIX_SOCKET_FILE = os.environ["B10_FROM_BUILD"] + "/auth_xfrout_conn"
  55. else:
  56. PREFIX = "@prefix@"
  57. DATAROOTDIR = "@datarootdir@"
  58. SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
  59. AUTH_SPECFILE_PATH = SPECFILE_PATH
  60. if "BIND10_XFROUT_SOCKET_FILE" in os.environ:
  61. UNIX_SOCKET_FILE = os.environ["BIND10_XFROUT_SOCKET_FILE"]
  62. else:
  63. UNIX_SOCKET_FILE = "@@LOCALSTATEDIR@@/auth_xfrout_conn"
  64. init_paths()
  65. SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec"
  66. AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + os.sep + "auth.spec"
  67. MAX_TRANSFERS_OUT = 10
  68. VERBOSE_MODE = False
  69. # tsig sign every N axfr packets.
  70. TSIG_SIGN_EVERY_NTH = 96
  71. XFROUT_MAX_MESSAGE_SIZE = 65535
  72. def get_rrset_len(rrset):
  73. """Returns the wire length of the given RRset"""
  74. bytes = bytearray()
  75. rrset.to_wire(bytes)
  76. return len(bytes)
  77. class XfroutSession():
  78. def __init__(self, sock_fd, request_data, server, tsig_key_ring):
  79. # The initializer for the superclass may call functions
  80. # that need _log to be set, so we set it first
  81. self._sock_fd = sock_fd
  82. self._request_data = request_data
  83. self._server = server
  84. #self._log = log
  85. self._tsig_key_ring = tsig_key_ring
  86. self._tsig_ctx = None
  87. self._tsig_len = 0
  88. self.handle()
  89. def create_tsig_ctx(self, tsig_record, tsig_key_ring):
  90. return TSIGContext(tsig_record.get_name(), tsig_record.get_rdata().get_algorithm(),
  91. tsig_key_ring)
  92. def handle(self):
  93. ''' Handle a xfrout query, send xfrout response '''
  94. try:
  95. self.dns_xfrout_start(self._sock_fd, self._request_data)
  96. #TODO, avoid catching all exceptions
  97. except Exception as e:
  98. #self._log.log_message("error", str(e))
  99. pass
  100. os.close(self._sock_fd)
  101. def _check_request_tsig(self, msg, request_data):
  102. ''' If request has a tsig record, perform tsig related checks '''
  103. tsig_record = msg.get_tsig_record()
  104. if tsig_record is not None:
  105. self._tsig_len = tsig_record.get_length()
  106. self._tsig_ctx = self.create_tsig_ctx(tsig_record, self._tsig_key_ring)
  107. tsig_error = self._tsig_ctx.verify(tsig_record, request_data)
  108. if tsig_error != TSIGError.NOERROR:
  109. return Rcode.NOTAUTH()
  110. return Rcode.NOERROR()
  111. def _parse_query_message(self, mdata):
  112. ''' parse query message to [socket,message]'''
  113. #TODO, need to add parseHeader() in case the message header is invalid
  114. try:
  115. msg = Message(Message.PARSE)
  116. Message.from_wire(msg, mdata)
  117. # TSIG related checks
  118. rcode = self._check_request_tsig(msg, mdata)
  119. except Exception as err:
  120. #self._log.log_message("error", str(err))
  121. return Rcode.FORMERR(), None
  122. return rcode, msg
  123. def _get_query_zone_name(self, msg):
  124. question = msg.get_question()[0]
  125. return question.get_name().to_text()
  126. def _send_data(self, sock_fd, data):
  127. size = len(data)
  128. total_count = 0
  129. while total_count < size:
  130. count = os.write(sock_fd, data[total_count:])
  131. total_count += count
  132. def _send_message(self, sock_fd, msg, tsig_ctx=None):
  133. render = MessageRenderer()
  134. # As defined in RFC5936 section3.4, perform case-preserving name
  135. # compression for AXFR message.
  136. render.set_compress_mode(MessageRenderer.CASE_SENSITIVE)
  137. render.set_length_limit(XFROUT_MAX_MESSAGE_SIZE)
  138. # XXX Currently, python wrapper doesn't accept 'None' parameter in this case,
  139. # we should remove the if statement and use a universal interface later.
  140. if tsig_ctx is not None:
  141. msg.to_wire(render, tsig_ctx)
  142. else:
  143. msg.to_wire(render)
  144. header_len = struct.pack('H', socket.htons(render.get_length()))
  145. self._send_data(sock_fd, header_len)
  146. self._send_data(sock_fd, render.get_data())
  147. def _reply_query_with_error_rcode(self, msg, sock_fd, rcode_):
  148. msg.make_response()
  149. msg.set_rcode(rcode_)
  150. self._send_message(sock_fd, msg, self._tsig_ctx)
  151. def _reply_query_with_format_error(self, msg, sock_fd):
  152. '''query message format isn't legal.'''
  153. if not msg:
  154. return # query message is invalid. send nothing back.
  155. msg.make_response()
  156. msg.set_rcode(Rcode.FORMERR())
  157. self._send_message(sock_fd, msg, self._tsig_ctx)
  158. def _zone_has_soa(self, zone):
  159. '''Judge if the zone has an SOA record.'''
  160. # In some sense, the SOA defines a zone.
  161. # If the current name server has authority for the
  162. # specific zone, we need to judge if the zone has an SOA record;
  163. # if not, we consider the zone has incomplete data, so xfrout can't
  164. # serve for it.
  165. if sqlite3_ds.get_zone_soa(zone, self._server.get_db_file()):
  166. return True
  167. return False
  168. def _zone_exist(self, zonename):
  169. '''Judge if the zone is configured by config manager.'''
  170. # Currently, if we find the zone in datasource successfully, we
  171. # consider the zone is configured, and the current name server has
  172. # authority for the specific zone.
  173. # TODO: should get zone's configuration from cfgmgr or other place
  174. # in future.
  175. return sqlite3_ds.zone_exist(zonename, self._server.get_db_file())
  176. def _check_xfrout_available(self, zone_name):
  177. '''Check if xfr request can be responsed.
  178. TODO, Get zone's configuration from cfgmgr or some other place
  179. eg. check allow_transfer setting,
  180. '''
  181. # If the current name server does not have authority for the
  182. # zone, xfrout can't serve for it, return rcode NOTAUTH.
  183. if not self._zone_exist(zone_name):
  184. return Rcode.NOTAUTH()
  185. # If we are an authoritative name server for the zone, but fail
  186. # to find the zone's SOA record in datasource, xfrout can't
  187. # provide zone transfer for it.
  188. if not self._zone_has_soa(zone_name):
  189. return Rcode.SERVFAIL()
  190. #TODO, check allow_transfer
  191. if not self._server.increase_transfers_counter():
  192. return Rcode.REFUSED()
  193. return Rcode.NOERROR()
  194. def dns_xfrout_start(self, sock_fd, msg_query):
  195. rcode_, msg = self._parse_query_message(msg_query)
  196. #TODO. create query message and parse header
  197. if rcode_ == Rcode.NOTAUTH():
  198. return self._reply_query_with_error_rcode(msg, sock_fd, rcode_)
  199. elif rcode_ != Rcode.NOERROR():
  200. return self._reply_query_with_format_error(msg, sock_fd)
  201. zone_name = self._get_query_zone_name(msg)
  202. rcode_ = self._check_xfrout_available(zone_name)
  203. if rcode_ != Rcode.NOERROR():
  204. #self._log.log_message("info", "transfer of '%s/IN' failed: %s",
  205. # zone_name, rcode_.to_text())
  206. return self. _reply_query_with_error_rcode(msg, sock_fd, rcode_)
  207. try:
  208. #self._log.log_message("info", "transfer of '%s/IN': AXFR started" % zone_name)
  209. self._reply_xfrout_query(msg, sock_fd, zone_name)
  210. #self._log.log_message("info", "transfer of '%s/IN': AXFR end" % zone_name)
  211. except Exception as err:
  212. #self._log.log_message("error", str(err))
  213. pass
  214. self._server.decrease_transfers_counter()
  215. return
  216. def _clear_message(self, msg):
  217. qid = msg.get_qid()
  218. opcode = msg.get_opcode()
  219. rcode = msg.get_rcode()
  220. msg.clear(Message.RENDER)
  221. msg.set_qid(qid)
  222. msg.set_opcode(opcode)
  223. msg.set_rcode(rcode)
  224. msg.set_header_flag(Message.HEADERFLAG_AA)
  225. msg.set_header_flag(Message.HEADERFLAG_QR)
  226. return msg
  227. def _create_rrset_from_db_record(self, record):
  228. '''Create one rrset from one record of datasource, if the schema of record is changed,
  229. This function should be updated first.
  230. '''
  231. rrtype_ = RRType(record[5])
  232. rdata_ = Rdata(rrtype_, RRClass("IN"), " ".join(record[7:]))
  233. rrset_ = RRset(Name(record[2]), RRClass("IN"), rrtype_, RRTTL( int(record[4])))
  234. rrset_.add_rdata(rdata_)
  235. return rrset_
  236. def _send_message_with_last_soa(self, msg, sock_fd, rrset_soa, message_upper_len,
  237. count_since_last_tsig_sign):
  238. '''Add the SOA record to the end of message. If it can't be
  239. added, a new message should be created to send out the last soa .
  240. '''
  241. rrset_len = get_rrset_len(rrset_soa)
  242. if (count_since_last_tsig_sign == TSIG_SIGN_EVERY_NTH and
  243. message_upper_len + rrset_len >= XFROUT_MAX_MESSAGE_SIZE):
  244. # If tsig context exist, sign the packet with serial number TSIG_SIGN_EVERY_NTH
  245. self._send_message(sock_fd, msg, self._tsig_ctx)
  246. msg = self._clear_message(msg)
  247. elif (count_since_last_tsig_sign != TSIG_SIGN_EVERY_NTH and
  248. message_upper_len + rrset_len + self._tsig_len >= XFROUT_MAX_MESSAGE_SIZE):
  249. self._send_message(sock_fd, msg)
  250. msg = self._clear_message(msg)
  251. # If tsig context exist, sign the last packet
  252. msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
  253. self._send_message(sock_fd, msg, self._tsig_ctx)
  254. def _reply_xfrout_query(self, msg, sock_fd, zone_name):
  255. #TODO, there should be a better way to insert rrset.
  256. count_since_last_tsig_sign = TSIG_SIGN_EVERY_NTH
  257. msg.make_response()
  258. msg.set_header_flag(Message.HEADERFLAG_AA)
  259. soa_record = sqlite3_ds.get_zone_soa(zone_name, self._server.get_db_file())
  260. rrset_soa = self._create_rrset_from_db_record(soa_record)
  261. msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
  262. message_upper_len = get_rrset_len(rrset_soa) + self._tsig_len
  263. for rr_data in sqlite3_ds.get_zone_datas(zone_name, self._server.get_db_file()):
  264. if self._server._shutdown_event.is_set(): # Check if xfrout is shutdown
  265. #self._log.log_message("info", "xfrout process is being shutdown")
  266. return
  267. # TODO: RRType.SOA() ?
  268. if RRType(rr_data[5]) == RRType("SOA"): #ignore soa record
  269. continue
  270. rrset_ = self._create_rrset_from_db_record(rr_data)
  271. # We calculate the maximum size of the RRset (i.e. the
  272. # size without compression) and use that to see if we
  273. # may have reached the limit
  274. rrset_len = get_rrset_len(rrset_)
  275. if message_upper_len + rrset_len < XFROUT_MAX_MESSAGE_SIZE:
  276. msg.add_rrset(Message.SECTION_ANSWER, rrset_)
  277. message_upper_len += rrset_len
  278. continue
  279. # If tsig context exist, sign every N packets
  280. if count_since_last_tsig_sign == TSIG_SIGN_EVERY_NTH:
  281. count_since_last_tsig_sign = 0
  282. self._send_message(sock_fd, msg, self._tsig_ctx)
  283. else:
  284. self._send_message(sock_fd, msg)
  285. count_since_last_tsig_sign += 1
  286. msg = self._clear_message(msg)
  287. msg.add_rrset(Message.SECTION_ANSWER, rrset_) # Add the rrset to the new message
  288. # Reserve tsig space for signed packet
  289. if count_since_last_tsig_sign == TSIG_SIGN_EVERY_NTH:
  290. message_upper_len = rrset_len + self._tsig_len
  291. else:
  292. message_upper_len = rrset_len
  293. self._send_message_with_last_soa(msg, sock_fd, rrset_soa, message_upper_len,
  294. count_since_last_tsig_sign)
  295. class UnixSockServer(socketserver_mixin.NoPollMixIn, ThreadingUnixStreamServer):
  296. '''The unix domain socket server which accept xfr query sent from auth server.'''
  297. def __init__(self, sock_file, handle_class, shutdown_event, config_data, cc):
  298. self._remove_unused_sock_file(sock_file)
  299. self._sock_file = sock_file
  300. socketserver_mixin.NoPollMixIn.__init__(self)
  301. ThreadingUnixStreamServer.__init__(self, sock_file, handle_class)
  302. self._lock = threading.Lock()
  303. self._transfers_counter = 0
  304. self._shutdown_event = shutdown_event
  305. self._write_sock, self._read_sock = socket.socketpair()
  306. #self._log = log
  307. self.update_config_data(config_data)
  308. self._cc = cc
  309. def _receive_query_message(self, sock):
  310. ''' receive request message from sock'''
  311. # receive data length
  312. data_len = sock.recv(2)
  313. if not data_len:
  314. return None
  315. msg_len = struct.unpack('!H', data_len)[0]
  316. # receive data
  317. recv_size = 0
  318. msgdata = b''
  319. while recv_size < msg_len:
  320. data = sock.recv(msg_len - recv_size)
  321. if not data:
  322. return None
  323. recv_size += len(data)
  324. msgdata += data
  325. return msgdata
  326. def handle_request(self):
  327. ''' Enable server handle a request until shutdown or auth is closed.'''
  328. try:
  329. request, client_address = self.get_request()
  330. except socket.error:
  331. #self._log.log_message("error", "Failed to fetch request")
  332. return
  333. # Check self._shutdown_event to ensure the real shutdown comes.
  334. # Linux could trigger a spurious readable event on the _read_sock
  335. # due to a bug, so we need perform a double check.
  336. while not self._shutdown_event.is_set(): # Check if xfrout is shutdown
  337. try:
  338. (rlist, wlist, xlist) = select.select([self._read_sock, request], [], [])
  339. except select.error as e:
  340. if e.args[0] == errno.EINTR:
  341. (rlist, wlist, xlist) = ([], [], [])
  342. continue
  343. else:
  344. #self._log.log_message("error", "Error with select(): %s" %e)
  345. break
  346. # self.server._shutdown_event will be set by now, if it is not a false
  347. # alarm
  348. if self._read_sock in rlist:
  349. continue
  350. try:
  351. self.process_request(request)
  352. except:
  353. #self._log.log_message("error", "Exception happened during processing of %s"
  354. # % str(client_address))
  355. break
  356. def _handle_request_noblock(self):
  357. """Override the function _handle_request_noblock(), it creates a new
  358. thread to handle requests for each auth"""
  359. td = threading.Thread(target=self.handle_request)
  360. td.setDaemon(True)
  361. td.start()
  362. def process_request(self, request):
  363. """Receive socket fd and query message from auth, then
  364. start a new thread to process the request."""
  365. sock_fd = recv_fd(request.fileno())
  366. if sock_fd < 0:
  367. # This may happen when one xfrout process try to connect to
  368. # xfrout unix socket server, to check whether there is another
  369. # xfrout running.
  370. #if sock_fd == FD_COMM_ERROR:
  371. #self._log.log_message("error", "Failed to receive the file descriptor for XFR connection")
  372. return
  373. # receive request msg
  374. request_data = self._receive_query_message(request)
  375. if not request_data:
  376. return
  377. t = threading.Thread(target = self.finish_request,
  378. args = (sock_fd, request_data))
  379. if self.daemon_threads:
  380. t.daemon = True
  381. t.start()
  382. def finish_request(self, sock_fd, request_data):
  383. '''Finish one request by instantiating RequestHandlerClass.'''
  384. self.RequestHandlerClass(sock_fd, request_data, self, self.tsig_key_ring)
  385. def _remove_unused_sock_file(self, sock_file):
  386. '''Try to remove the socket file. If the file is being used
  387. by one running xfrout process, exit from python.
  388. If it's not a socket file or nobody is listening
  389. , it will be removed. If it can't be removed, exit from python. '''
  390. if self._sock_file_in_use(sock_file):
  391. #self._log.log_message("error", "Fail to start xfrout process, unix socket file '%s'"
  392. # " is being used by another xfrout process\n" % sock_file)
  393. sys.exit(0)
  394. else:
  395. if not os.path.exists(sock_file):
  396. return
  397. try:
  398. os.unlink(sock_file)
  399. except OSError as err:
  400. #self._log.log_message("error", "[b10-xfrout] Fail to remove file %s: %s\n" % (sock_file, err))
  401. sys.exit(0)
  402. def _sock_file_in_use(self, sock_file):
  403. '''Check whether the socket file 'sock_file' exists and
  404. is being used by one running xfrout process. If it is,
  405. return True, or else return False. '''
  406. try:
  407. sock = socket.socket(socket.AF_UNIX)
  408. sock.connect(sock_file)
  409. except socket.error as err:
  410. return False
  411. else:
  412. return True
  413. def shutdown(self):
  414. self._write_sock.send(b"shutdown") #terminate the xfrout session thread
  415. super().shutdown() # call the shutdown() of class socketserver_mixin.NoPollMixIn
  416. try:
  417. os.unlink(self._sock_file)
  418. except Exception as e:
  419. #self._log.log_message('error', str(e))
  420. pass
  421. def update_config_data(self, new_config):
  422. '''Apply the new config setting of xfrout module. '''
  423. #self._log.log_message('info', 'update config data start.')
  424. self._lock.acquire()
  425. self._max_transfers_out = new_config.get('transfers_out')
  426. self.set_tsig_key_ring(new_config.get('tsig_key_ring'))
  427. #self._log.log_message('info', 'max transfer out : %d', self._max_transfers_out)
  428. self._lock.release()
  429. #self._log.log_message('info', 'update config data complete.')
  430. def set_tsig_key_ring(self, key_list):
  431. """Set the tsig_key_ring , given a TSIG key string list representation. """
  432. # XXX add values to configure zones/tsig options
  433. self.tsig_key_ring = TSIGKeyRing()
  434. # If key string list is empty, create a empty tsig_key_ring
  435. if not key_list:
  436. return
  437. for key_item in key_list:
  438. try:
  439. self.tsig_key_ring.add(TSIGKey(key_item))
  440. except InvalidParameter as ipe:
  441. errmsg = "bad TSIG key string: " + str(key_item)
  442. #self._log.log_message('error', '%s' % errmsg)
  443. def get_db_file(self):
  444. file, is_default = self._cc.get_remote_config_value("Auth", "database_file")
  445. # this too should be unnecessary, but currently the
  446. # 'from build' override isn't stored in the config
  447. # (and we don't have indirect python access to datasources yet)
  448. if is_default and "B10_FROM_BUILD" in os.environ:
  449. file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3"
  450. return file
  451. def increase_transfers_counter(self):
  452. '''Return False, if counter + 1 > max_transfers_out, or else
  453. return True
  454. '''
  455. ret = False
  456. self._lock.acquire()
  457. if self._transfers_counter < self._max_transfers_out:
  458. self._transfers_counter += 1
  459. ret = True
  460. self._lock.release()
  461. return ret
  462. def decrease_transfers_counter(self):
  463. self._lock.acquire()
  464. self._transfers_counter -= 1
  465. self._lock.release()
  466. class XfroutServer:
  467. def __init__(self):
  468. self._unix_socket_server = None
  469. #self._log = None
  470. self._listen_sock_file = UNIX_SOCKET_FILE
  471. self._shutdown_event = threading.Event()
  472. self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
  473. self._config_data = self._cc.get_full_config()
  474. self._cc.start()
  475. self._cc.add_remote_config(AUTH_SPECFILE_LOCATION);
  476. #self._log = isc.log.NSLogger(self._config_data.get('log_name'), self._config_data.get('log_file'),
  477. # self._config_data.get('log_severity'), self._config_data.get('log_versions'),
  478. # self._config_data.get('log_max_bytes'), True)
  479. self._start_xfr_query_listener()
  480. self._start_notifier()
  481. def _start_xfr_query_listener(self):
  482. '''Start a new thread to accept xfr query. '''
  483. self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession,
  484. self._shutdown_event, self._config_data,
  485. self._cc)
  486. listener = threading.Thread(target=self._unix_socket_server.serve_forever)
  487. listener.start()
  488. def _start_notifier(self):
  489. datasrc = self._unix_socket_server.get_db_file()
  490. self._notifier = notify_out.NotifyOut(datasrc)
  491. self._notifier.dispatcher()
  492. def send_notify(self, zone_name, zone_class):
  493. self._notifier.send_notify(zone_name, zone_class)
  494. def config_handler(self, new_config):
  495. '''Update config data. TODO. Do error check'''
  496. answer = create_answer(0)
  497. for key in new_config:
  498. if key not in self._config_data:
  499. answer = create_answer(1, "Unknown config data: " + str(key))
  500. continue
  501. self._config_data[key] = new_config[key]
  502. #if self._log:
  503. # self._log.update_config(new_config)
  504. if self._unix_socket_server:
  505. self._unix_socket_server.update_config_data(self._config_data)
  506. return answer
  507. def shutdown(self):
  508. ''' shutdown the xfrout process. The thread which is doing zone transfer-out should be
  509. terminated.
  510. '''
  511. global xfrout_server
  512. xfrout_server = None #Avoid shutdown is called twice
  513. self._shutdown_event.set()
  514. self._notifier.shutdown()
  515. if self._unix_socket_server:
  516. self._unix_socket_server.shutdown()
  517. # Wait for all threads to terminate
  518. main_thread = threading.currentThread()
  519. for th in threading.enumerate():
  520. if th is main_thread:
  521. continue
  522. th.join()
  523. def command_handler(self, cmd, args):
  524. if cmd == "shutdown":
  525. #self._log.log_message("info", "Received shutdown command.")
  526. self.shutdown()
  527. answer = create_answer(0)
  528. elif cmd == notify_out.ZONE_NEW_DATA_READY_CMD:
  529. zone_name = args.get('zone_name')
  530. zone_class = args.get('zone_class')
  531. if zone_name and zone_class:
  532. #self._log.log_message("info", "zone '%s/%s': receive notify others command" \
  533. # % (zone_name, zone_class))
  534. self.send_notify(zone_name, zone_class)
  535. answer = create_answer(0)
  536. else:
  537. answer = create_answer(1, "Bad command parameter:" + str(args))
  538. else:
  539. answer = create_answer(1, "Unknown command:" + str(cmd))
  540. return answer
  541. def run(self):
  542. '''Get and process all commands sent from cfgmgr or other modules. '''
  543. while not self._shutdown_event.is_set():
  544. self._cc.check_command(False)
  545. xfrout_server = None
  546. def signal_handler(signal, frame):
  547. if xfrout_server:
  548. xfrout_server.shutdown()
  549. sys.exit(0)
  550. def set_signal_handler():
  551. signal.signal(signal.SIGTERM, signal_handler)
  552. signal.signal(signal.SIGINT, signal_handler)
  553. def set_cmd_options(parser):
  554. parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
  555. help="display more about what is going on")
  556. if '__main__' == __name__:
  557. try:
  558. parser = OptionParser()
  559. set_cmd_options(parser)
  560. (options, args) = parser.parse_args()
  561. VERBOSE_MODE = options.verbose
  562. set_signal_handler()
  563. xfrout_server = XfroutServer()
  564. xfrout_server.run()
  565. except KeyboardInterrupt:
  566. sys.stderr.write("[b10-xfrout] exit xfrout process\n")
  567. except SessionError as e:
  568. sys.stderr.write("[b10-xfrout] Error creating xfrout, "
  569. "is the command channel daemon running?\n")
  570. except SessionTimeout as e:
  571. sys.stderr.write("[b10-xfrout] Error creating xfrout, "
  572. "is the configuration manager running?\n")
  573. except ModuleCCSessionError as e:
  574. sys.stderr.write("[b10-xfrout] exit xfrout process:%s\n" % str(e))
  575. if xfrout_server:
  576. xfrout_server.shutdown()