|
@@ -8,22 +8,21 @@ import time
|
|
|
import sys
|
|
|
import threading
|
|
|
import tempfile
|
|
|
+import json
|
|
|
|
|
|
import msgq
|
|
|
import isc.config.cfgmgr
|
|
|
import stats
|
|
|
import stats_httpd
|
|
|
|
|
|
-# TODO: consider appropriate timeout seconds
|
|
|
-TIMEOUT_SEC = 0.05
|
|
|
-
|
|
|
-def send_command(command_name, module_name, params=None, session=None, nonblock=False, timeout=TIMEOUT_SEC):
|
|
|
- if not session:
|
|
|
- cc_session = isc.cc.Session()
|
|
|
- else:
|
|
|
+def send_command(command_name, module_name, params=None, session=None, nonblock=False, timeout=None):
|
|
|
+ if session is not None:
|
|
|
cc_session = session
|
|
|
- orig_timeout = cc_session.get_timeout()
|
|
|
- cc_session.set_timeout(timeout * 1000)
|
|
|
+ else:
|
|
|
+ cc_session = isc.cc.Session()
|
|
|
+ if timeout is not None:
|
|
|
+ orig_timeout = cc_session.get_timeout()
|
|
|
+ cc_session.set_timeout(timeout * 1000)
|
|
|
command = isc.config.ccsession.create_command(command_name, params)
|
|
|
seq = cc_session.group_sendmsg(command, module_name)
|
|
|
try:
|
|
@@ -33,38 +32,35 @@ def send_command(command_name, module_name, params=None, session=None, nonblock=
|
|
|
except isc.cc.SessionTimeout:
|
|
|
pass
|
|
|
finally:
|
|
|
- if not session:
|
|
|
- cc_session.close()
|
|
|
- else:
|
|
|
+ if timeout is not None:
|
|
|
cc_session.set_timeout(orig_timeout)
|
|
|
+ if session is None:
|
|
|
+ cc_session.close()
|
|
|
|
|
|
-def send_shutdown(module_name):
|
|
|
- return send_command("shutdown", module_name)
|
|
|
+def send_shutdown(module_name, **kwargs):
|
|
|
+ return send_command("shutdown", module_name, **kwargs)
|
|
|
|
|
|
class ThreadingServerManager:
|
|
|
- def __init__(self, server_class):
|
|
|
- self.server_class = server_class
|
|
|
- self.server_class_name = server_class.__name__
|
|
|
- self.server = self.server_class()
|
|
|
+ def __init__(self, server, *args, **kwargs):
|
|
|
+ self.server = server(*args, **kwargs)
|
|
|
+ self.server_name = server.__name__
|
|
|
self.server._thread = threading.Thread(
|
|
|
- name=self.server_class_name, target=self.server.run)
|
|
|
+ name=self.server_name, target=self.server.run)
|
|
|
self.server._thread.daemon = True
|
|
|
|
|
|
def run(self):
|
|
|
self.server._thread.start()
|
|
|
self.server._started.wait()
|
|
|
self.server._started.clear()
|
|
|
- # waiting for the server's being ready for listening
|
|
|
- time.sleep(TIMEOUT_SEC)
|
|
|
|
|
|
def shutdown(self):
|
|
|
self.server.shutdown()
|
|
|
- self.server._thread.join(TIMEOUT_SEC)
|
|
|
+ self.server._thread.join(0) # timeout is 0
|
|
|
|
|
|
class MockMsgq:
|
|
|
def __init__(self):
|
|
|
self._started = threading.Event()
|
|
|
- self.msgq = msgq.MsgQ(None)
|
|
|
+ self.msgq = msgq.MsgQ(verbose=True)
|
|
|
result = self.msgq.setup()
|
|
|
if result:
|
|
|
sys.exit("Error on Msgq startup: %s" % result)
|
|
@@ -72,11 +68,14 @@ class MockMsgq:
|
|
|
def run(self):
|
|
|
self._started.set()
|
|
|
try:
|
|
|
+ # any message is written to /dev/null
|
|
|
+ sys.stderr = open(os.devnull, "w")
|
|
|
self.msgq.run()
|
|
|
+ sys.stderr.close()
|
|
|
except Exception:
|
|
|
pass
|
|
|
finally:
|
|
|
- self.shutdown()
|
|
|
+ self.msgq.shutdown()
|
|
|
|
|
|
def shutdown(self):
|
|
|
self.msgq.shutdown()
|
|
@@ -90,10 +89,7 @@ class MockCfgmgr:
|
|
|
|
|
|
def run(self):
|
|
|
self._started.set()
|
|
|
- try:
|
|
|
- self.cfgmgr.run()
|
|
|
- finally:
|
|
|
- self.shutdown()
|
|
|
+ self.cfgmgr.run()
|
|
|
|
|
|
def shutdown(self):
|
|
|
self.cfgmgr.running = False
|
|
@@ -155,6 +151,7 @@ class MockBoss:
|
|
|
return isc.config.create_answer(0)
|
|
|
|
|
|
def command_handler(self, command, *args, **kwargs):
|
|
|
+ self._started.set()
|
|
|
self.got_command_name = command
|
|
|
if command == 'sendstats':
|
|
|
params = { "owner": "Boss",
|
|
@@ -244,29 +241,47 @@ class MyStats(stats.Stats):
|
|
|
|
|
|
def run(self):
|
|
|
self._started.set()
|
|
|
- stats.Stats.start(self)
|
|
|
+ self.start()
|
|
|
|
|
|
def shutdown(self):
|
|
|
- send_shutdown("Stats")
|
|
|
+ self.command_shutdown()
|
|
|
|
|
|
class MyStatsHttpd(stats_httpd.StatsHttpd):
|
|
|
- def __init__(self):
|
|
|
+ ORIG_SPECFILE_LOCATION = stats_httpd.SPECFILE_LOCATION
|
|
|
+ def __init__(self, *server_address):
|
|
|
self._started = threading.Event()
|
|
|
- stats_httpd.StatsHttpd.__init__(self)
|
|
|
+ if server_address:
|
|
|
+ stats_httpd.SPECFILE_LOCATION = self.get_specfile(*server_address)
|
|
|
+ try:
|
|
|
+ stats_httpd.StatsHttpd.__init__(self)
|
|
|
+ finally:
|
|
|
+ stats_httpd.SPECFILE_LOCATION.close()
|
|
|
+ stats_httpd.SPECFILE_LOCATION = self.ORIG_SPECFILE_LOCATION
|
|
|
+ else:
|
|
|
+ stats_httpd.StatsHttpd.__init__(self)
|
|
|
+
|
|
|
+ def get_specfile(self, *server_address):
|
|
|
+ spec = json.load(open(self.ORIG_SPECFILE_LOCATION))
|
|
|
+ config = spec['module_spec']['config_data']
|
|
|
+ for i in range(len(config)):
|
|
|
+ if config[i]['item_name'] == 'listen_on':
|
|
|
+ config[i]['item_default'] = \
|
|
|
+ [ dict(address=a[0], port=a[1]) for a in server_address ]
|
|
|
+ break
|
|
|
+ return io.StringIO(json.dumps(spec))
|
|
|
|
|
|
def run(self):
|
|
|
self._started.set()
|
|
|
- stats_httpd.StatsHttpd.start(self)
|
|
|
+ self.start()
|
|
|
|
|
|
def shutdown(self):
|
|
|
- send_shutdown("StatsHttpd")
|
|
|
+ self.stop()
|
|
|
|
|
|
class BaseModules:
|
|
|
def __init__(self):
|
|
|
- self.class_name = BaseModules.__name__
|
|
|
-
|
|
|
# Change value of BIND10_MSGQ_SOCKET_FILE in environment variables
|
|
|
- os.environ['BIND10_MSGQ_SOCKET_FILE'] = tempfile.mktemp(prefix='unix_socket.')
|
|
|
+ if 'BIND10_MSGQ_SOCKET_FILE' not in os.environ:
|
|
|
+ os.environ['BIND10_MSGQ_SOCKET_FILE'] = tempfile.mktemp(prefix='msgq_socket_')
|
|
|
# MockMsgq
|
|
|
self.msgq = ThreadingServerManager(MockMsgq)
|
|
|
self.msgq.run()
|