|
@@ -26,8 +26,10 @@ from isc.datasrc import sqlite3_ds
|
|
|
from socketserver import *
|
|
|
import os
|
|
|
from isc.config.ccsession import *
|
|
|
+from isc.log.log import *
|
|
|
from isc.cc import SessionError
|
|
|
import socket
|
|
|
+import select
|
|
|
import errno
|
|
|
from optparse import OptionParser, OptionValueError
|
|
|
try:
|
|
@@ -40,22 +42,25 @@ except ImportError as e:
|
|
|
|
|
|
if "B10_FROM_BUILD" in os.environ:
|
|
|
SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/xfrout"
|
|
|
- UNIX_SOCKET_FILE = os.environ["B10_FROM_SOURCE"] + "/auth_xfrout_conn"
|
|
|
+ AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth"
|
|
|
+ UNIX_SOCKET_FILE= os.environ["B10_FROM_BUILD"] + "/auth_xfrout_conn"
|
|
|
else:
|
|
|
PREFIX = "@prefix@"
|
|
|
DATAROOTDIR = "@datarootdir@"
|
|
|
SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
|
|
|
+ AUTH_SPECFILE_PATH = SPECFILE_PATH
|
|
|
UNIX_SOCKET_FILE = "@@LOCALSTATEDIR@@/auth_xfrout_conn"
|
|
|
-SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec"
|
|
|
|
|
|
+SPECFILE_LOCATION = SPECFILE_PATH + "/xfrout.spec"
|
|
|
+AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + os.sep + "auth.spec"
|
|
|
MAX_TRANSFERS_OUT = 10
|
|
|
-verbose_mode = False
|
|
|
-
|
|
|
-
|
|
|
-class XfroutException(Exception): pass
|
|
|
-class TmpException(Exception): pass
|
|
|
+VERBOSE_MODE = False
|
|
|
|
|
|
class XfroutSession(BaseRequestHandler):
|
|
|
+ def __init__(self, request, client_address, server, log):
|
|
|
+ BaseRequestHandler.__init__(self, request, client_address, server)
|
|
|
+ self._log = log
|
|
|
+
|
|
|
def handle(self):
|
|
|
fd = recv_fd(self.request.fileno())
|
|
|
|
|
@@ -63,8 +68,7 @@ class XfroutSession(BaseRequestHandler):
|
|
|
# This may happen when one xfrout process try to connect to
|
|
|
# xfrout unix socket server, to check whether there is another
|
|
|
# xfrout running.
|
|
|
- print("[b10-xfrout] Failed to receive the FD for XFR connection, "
|
|
|
- "maybe because another xfrout process was started.")
|
|
|
+ self._log.log_message("error", "Failed to receive the file descriptor for XFR connection")
|
|
|
return
|
|
|
|
|
|
data_len = self.request.recv(2)
|
|
@@ -73,9 +77,8 @@ class XfroutSession(BaseRequestHandler):
|
|
|
sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
|
|
|
try:
|
|
|
self.dns_xfrout_start(sock, msgdata)
|
|
|
- except TmpException as e:
|
|
|
- if verbose_mode:
|
|
|
- self.log_msg(str(e))
|
|
|
+ except Exception as e:
|
|
|
+ self._log.log_message("error", str(e))
|
|
|
|
|
|
sock.shutdown(socket.SHUT_RDWR)
|
|
|
sock.close()
|
|
@@ -88,9 +91,8 @@ class XfroutSession(BaseRequestHandler):
|
|
|
try:
|
|
|
msg = Message(Message.PARSE)
|
|
|
Message.from_wire(msg, mdata)
|
|
|
- except TmpException as err:
|
|
|
- if verbose_mode:
|
|
|
- self.log_msg(str(err))
|
|
|
+ except Exception as err:
|
|
|
+ self._log.log_message("error", str(err))
|
|
|
return Rcode.FORMERR(), None
|
|
|
|
|
|
return Rcode.NOERROR(), msg
|
|
@@ -180,16 +182,11 @@ class XfroutSession(BaseRequestHandler):
|
|
|
return self. _reply_query_with_error_rcode(msg, sock, rcode_)
|
|
|
|
|
|
try:
|
|
|
- if verbose_mode:
|
|
|
- self.log_msg("transfer of '%s/IN': AXFR started" % zone_name)
|
|
|
-
|
|
|
+ self._log.log_message("info", "transfer of '%s/IN': AXFR started" % zone_name)
|
|
|
self._reply_xfrout_query(msg, sock, zone_name)
|
|
|
-
|
|
|
- if verbose_mode:
|
|
|
- self.log_msg("transfer of '%s/IN': AXFR end" % zone_name)
|
|
|
- except TmpException as err:
|
|
|
- if verbose_mode:
|
|
|
- sys.stderr.write("[b10-xfrout] %s\n" % str(err))
|
|
|
+ self._log.log_message("info", "transfer of '%s/IN': AXFR end" % zone_name)
|
|
|
+ except Exception as err:
|
|
|
+ self._log.log_message("error", str(err))
|
|
|
|
|
|
self.server.decrease_transfers_counter()
|
|
|
return
|
|
@@ -261,7 +258,7 @@ class XfroutSession(BaseRequestHandler):
|
|
|
# the message length to know if the rrset has been added sucessfully.
|
|
|
for rr_data in sqlite3_ds.get_zone_datas(zone_name, self.server.get_db_file()):
|
|
|
if self.server._shutdown_event.is_set(): # Check if xfrout is shutdown
|
|
|
- raise XfroutException("shutdown!")
|
|
|
+ self._log.log_message("error", "shutdown!")
|
|
|
|
|
|
# TODO: RRType.SOA() ?
|
|
|
if RRType(rr_data[5]) == RRType("SOA"): #ignore soa record
|
|
@@ -281,21 +278,24 @@ class XfroutSession(BaseRequestHandler):
|
|
|
|
|
|
self._send_message_with_last_soa(msg, sock, rrset_soa)
|
|
|
|
|
|
- def log_msg(self, msg):
|
|
|
- print('[b10-xfrout] ', msg)
|
|
|
-
|
|
|
|
|
|
class UnixSockServer(ThreadingUnixStreamServer):
|
|
|
'''The unix domain socket server which accept xfr query sent from auth server.'''
|
|
|
|
|
|
- def __init__(self, sock_file, handle_class, shutdown_event, config_data):
|
|
|
+ def __init__(self, sock_file, handle_class, shutdown_event, config_data, cc, log):
|
|
|
self._remove_unused_sock_file(sock_file)
|
|
|
self._sock_file = sock_file
|
|
|
ThreadingUnixStreamServer.__init__(self, sock_file, handle_class)
|
|
|
self._lock = threading.Lock()
|
|
|
self._transfers_counter = 0
|
|
|
self._shutdown_event = shutdown_event
|
|
|
+ self._log = log
|
|
|
self.update_config_data(config_data)
|
|
|
+ self._cc = cc
|
|
|
+
|
|
|
+ def finish_request(self, request, client_address):
|
|
|
+ '''Finish one request by instantiating RequestHandlerClass.'''
|
|
|
+ self.RequestHandlerClass(request, client_address, self, self._log)
|
|
|
|
|
|
def _remove_unused_sock_file(self, sock_file):
|
|
|
'''Try to remove the socket file. If the file is being used
|
|
@@ -332,23 +332,28 @@ class UnixSockServer(ThreadingUnixStreamServer):
|
|
|
ThreadingUnixStreamServer.shutdown(self)
|
|
|
try:
|
|
|
os.unlink(self._sock_file)
|
|
|
- except:
|
|
|
- pass
|
|
|
+ except Exception as e:
|
|
|
+ self._log.log_message("error", str(e))
|
|
|
|
|
|
def update_config_data(self, new_config):
|
|
|
'''Apply the new config setting of xfrout module. '''
|
|
|
-
|
|
|
+ self._log.log_message('info', 'update config data start.')
|
|
|
self._lock.acquire()
|
|
|
self._max_transfers_out = new_config.get('transfers_out')
|
|
|
- self._db_file = new_config.get('db_file')
|
|
|
+ self._log.log_message('info', 'max transfer out : %d', self._max_transfers_out)
|
|
|
self._lock.release()
|
|
|
+ self._log.log_message('info', 'update config data complete.')
|
|
|
|
|
|
def get_db_file(self):
|
|
|
- self._lock.acquire()
|
|
|
- file = self._db_file
|
|
|
- self._lock.release()
|
|
|
+ file, is_default = self._cc.get_remote_config_value("Auth", "database_file")
|
|
|
+ # this too should be unnecessary, but currently the
|
|
|
+ # 'from build' override isn't stored in the config
|
|
|
+ # (and we don't have indirect python access to datasources yet)
|
|
|
+ if is_default and "B10_FROM_BUILD" in os.environ:
|
|
|
+ file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3"
|
|
|
return file
|
|
|
|
|
|
+
|
|
|
def increase_transfers_counter(self):
|
|
|
'''Return False, if counter + 1 > max_transfers_out, or else
|
|
|
return True
|
|
@@ -367,29 +372,45 @@ class UnixSockServer(ThreadingUnixStreamServer):
|
|
|
self._lock.release()
|
|
|
|
|
|
def listen_on_xfr_query(unix_socket_server):
|
|
|
-
|
|
|
'''Listen xfr query in one single thread. Polls for shutdown
|
|
|
every 0.1 seconds, is there a better time?
|
|
|
'''
|
|
|
- unix_socket_server.serve_forever(poll_interval = 0.1)
|
|
|
+
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ unix_socket_server.serve_forever(poll_interval = 0.1)
|
|
|
+ except select.error as err:
|
|
|
+ # serve_forever() calls select.select(), which can be
|
|
|
+ # interrupted.
|
|
|
+ # If it is interrupted, it raises select.error with the
|
|
|
+ # errno set to EINTR. We ignore this case, and let the
|
|
|
+ # normal program flow continue by trying serve_forever()
|
|
|
+ # again.
|
|
|
+ if err.args[0] != errno.EINTR: raise
|
|
|
+
|
|
|
|
|
|
|
|
|
class XfroutServer:
|
|
|
def __init__(self):
|
|
|
self._unix_socket_server = None
|
|
|
+ self._log = None
|
|
|
self._listen_sock_file = UNIX_SOCKET_FILE
|
|
|
self._shutdown_event = threading.Event()
|
|
|
self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
|
|
|
+ self._cc.add_remote_config(AUTH_SPECFILE_LOCATION);
|
|
|
self._config_data = self._cc.get_full_config()
|
|
|
self._cc.start()
|
|
|
+ self._log = isc.log.NSLogger(self._config_data.get('log_name'), self._config_data.get('log_file'),
|
|
|
+ self._config_data.get('log_severity'), self._config_data.get('log_versions'),
|
|
|
+ self._config_data.get('log_max_bytes'), True)
|
|
|
self._start_xfr_query_listener()
|
|
|
|
|
|
-
|
|
|
def _start_xfr_query_listener(self):
|
|
|
'''Start a new thread to accept xfr query. '''
|
|
|
|
|
|
self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession,
|
|
|
- self._shutdown_event, self._config_data);
|
|
|
+ self._shutdown_event, self._config_data,
|
|
|
+ self._cc, self._log);
|
|
|
listener = threading.Thread(target = listen_on_xfr_query, args = (self._unix_socket_server,))
|
|
|
listener.start()
|
|
|
|
|
@@ -403,6 +424,9 @@ class XfroutServer:
|
|
|
continue
|
|
|
self._config_data[key] = new_config[key]
|
|
|
|
|
|
+ if self._log:
|
|
|
+ self._log.update_config(new_config)
|
|
|
+
|
|
|
if self._unix_socket_server:
|
|
|
self._unix_socket_server.update_config_data(self._config_data)
|
|
|
|
|
@@ -428,8 +452,7 @@ class XfroutServer:
|
|
|
|
|
|
def command_handler(self, cmd, args):
|
|
|
if cmd == "shutdown":
|
|
|
- if verbose_mode:
|
|
|
- print("[b10-xfrout] Received shutdown command")
|
|
|
+ self._log.log_message("info", "Received shutdown command.")
|
|
|
self.shutdown()
|
|
|
answer = create_answer(0)
|
|
|
else:
|
|
@@ -464,18 +487,18 @@ if '__main__' == __name__:
|
|
|
parser = OptionParser()
|
|
|
set_cmd_options(parser)
|
|
|
(options, args) = parser.parse_args()
|
|
|
- verbose_mode = options.verbose
|
|
|
+ VERBOSE_MODE = options.verbose
|
|
|
|
|
|
set_signal_handler()
|
|
|
xfrout_server = XfroutServer()
|
|
|
xfrout_server.run()
|
|
|
except KeyboardInterrupt:
|
|
|
- print("[b10-xfrout] exit xfrout process")
|
|
|
+ sys.stderr.write("[b10-xfrout] exit xfrout process")
|
|
|
except SessionError as e:
|
|
|
- print('[b10-xfrout] Error creating xfrout, '
|
|
|
- 'is the command channel daemon running?' )
|
|
|
+ sys.stderr.write("[b10-xfrout] Error creating xfrout,"
|
|
|
+ "is the command channel daemon running?")
|
|
|
except ModuleCCSessionError as e:
|
|
|
- print('[b10-xfrout] exit xfrout process:', e)
|
|
|
+ sys.stderr.write("info", '[b10-xfrout] exit xfrout process:', e)
|
|
|
|
|
|
if xfrout_server:
|
|
|
xfrout_server.shutdown()
|