|
@@ -26,7 +26,13 @@ TEST_ZONE_NAME = "example.com"
|
|
|
TEST_RRCLASS = rr_class.IN()
|
|
|
TEST_DB_FILE = 'db_file'
|
|
|
TEST_MASTER_IPV4_ADDRESS = '127.0.0.1'
|
|
|
+TEST_MASTER_IPV4_ADDRINFO = (socket.AF_INET, socket.SOCK_STREAM,
|
|
|
+ socket.IPPROTO_TCP, '',
|
|
|
+ (TEST_MASTER_IPV4_ADDRESS, 53))
|
|
|
TEST_MASTER_IPV6_ADDRESS = '::1'
|
|
|
+TEST_MASTER_IPV6_ADDRINFO = (socket.AF_INET6, socket.SOCK_STREAM,
|
|
|
+ socket.IPPROTO_TCP, '',
|
|
|
+ (TEST_MASTER_IPV6_ADDRESS, 53))
|
|
|
# XXX: This should be a non priviledge port that is unlikely to be used.
|
|
|
# If some other process uses this port test will fail.
|
|
|
TEST_MASTER_PORT = '53535'
|
|
@@ -37,32 +43,45 @@ soa_rdata = create_rdata(rr_type.SOA(), TEST_RRCLASS,
|
|
|
soa_rrset = rrset(name(TEST_ZONE_NAME), TEST_RRCLASS, rr_type.SOA(),
|
|
|
rr_ttl(3600))
|
|
|
soa_rrset.add_rdata(soa_rdata)
|
|
|
-example_question = question(name(TEST_ZONE_NAME), TEST_RRCLASS, rr_type.AXFR())
|
|
|
-default_questions = [example_question]
|
|
|
+example_axfr_question = question(name(TEST_ZONE_NAME), TEST_RRCLASS,
|
|
|
+ rr_type.AXFR())
|
|
|
+example_soa_question = question(name(TEST_ZONE_NAME), TEST_RRCLASS,
|
|
|
+ rr_type.SOA())
|
|
|
+default_questions = [example_axfr_question]
|
|
|
default_answers = [soa_rrset]
|
|
|
|
|
|
class XfrinTestException(Exception):
|
|
|
pass
|
|
|
|
|
|
-# Rewrite the class for unittest.
|
|
|
class MockXfrin(Xfrin):
|
|
|
+ # This is a class attribute of a callable object that specifies a non
|
|
|
+ # default behavior triggered in _cc_check_command(). Specific test methods
|
|
|
+ # are expected to explicitly set this attribute before creating a
|
|
|
+ # MockXfrin object (when it needs a non default behavior).
|
|
|
+ # See the TestMain class.
|
|
|
+ check_command_hook = None
|
|
|
+
|
|
|
def _cc_setup(self):
|
|
|
pass
|
|
|
+
|
|
|
+ def _cc_check_command(self):
|
|
|
+ self._shutdown_event.set()
|
|
|
+ if MockXfrin.check_command_hook:
|
|
|
+ MockXfrin.check_command_hook()
|
|
|
|
|
|
class MockXfrinConnection(XfrinConnection):
|
|
|
- def __init__(self, TEST_ZONE_NAME, db_file, shutdown_event, master_addr):
|
|
|
- super().__init__(TEST_ZONE_NAME, db_file, shutdown_event, master_addr)
|
|
|
+ def __init__(self, sock_map, zone_name, rrclass, db_file, shutdown_event,
|
|
|
+ master_addr):
|
|
|
+ super().__init__(sock_map, zone_name, rrclass, db_file, shutdown_event,
|
|
|
+ master_addr)
|
|
|
self.query_data = b''
|
|
|
self.reply_data = b''
|
|
|
self.force_time_out = False
|
|
|
self.force_close = False
|
|
|
+ self.qlen = None
|
|
|
self.qid = None
|
|
|
self.response_generator = None
|
|
|
|
|
|
- def _handle_xfrin_response(self):
|
|
|
- for rr in super()._handle_xfrin_response():
|
|
|
- pass
|
|
|
-
|
|
|
def _asyncore_loop(self):
|
|
|
if self.force_close:
|
|
|
self.handle_close()
|
|
@@ -80,11 +99,21 @@ class MockXfrinConnection(XfrinConnection):
|
|
|
return data
|
|
|
|
|
|
def send(self, data):
|
|
|
+ if self.qlen != None and len(self.query_data) >= self.qlen:
|
|
|
+ # This is a new query. reset the internal state.
|
|
|
+ self.qlen = None
|
|
|
+ self.qid = None
|
|
|
+ self.query_data = b''
|
|
|
self.query_data += data
|
|
|
- # when the outgoing data is sufficiently large to contain the QID field
|
|
|
- # (4 octets or more - 16-bit length field + 16-bit QID), extract the
|
|
|
- # value so that we can construct a matching response.
|
|
|
+
|
|
|
+ # when the outgoing data is sufficiently large to contain the length
|
|
|
+ # and the QID fields (4 octets or more), extract these fields.
|
|
|
+ # The length will be reset the internal query data to support multiple
|
|
|
+ # queries in a single test.
|
|
|
+ # The QID will be used to construct a matching response.
|
|
|
if len(self.query_data) >= 4 and self.qid == None:
|
|
|
+ self.qlen = socket.htons(struct.unpack('H',
|
|
|
+ self.query_data[0:2])[0])
|
|
|
self.qid = socket.htons(struct.unpack('H', self.query_data[2:4])[0])
|
|
|
# if the response generator method is specified, invoke it now.
|
|
|
if self.response_generator != None:
|
|
@@ -119,92 +148,150 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
def setUp(self):
|
|
|
if os.path.exists(TEST_DB_FILE):
|
|
|
os.remove(TEST_DB_FILE)
|
|
|
- self.conn = MockXfrinConnection('example.com.', TEST_DB_FILE,
|
|
|
+ self.sock_map = {}
|
|
|
+ self.conn = MockXfrinConnection(self.sock_map, 'example.com.',
|
|
|
+ TEST_RRCLASS, TEST_DB_FILE,
|
|
|
threading.Event(),
|
|
|
- TEST_MASTER_IPV4_ADDRESS)
|
|
|
+ TEST_MASTER_IPV4_ADDRINFO)
|
|
|
+ self.axfr_after_soa = False
|
|
|
+ self.soa_response_params = {
|
|
|
+ 'questions': [example_soa_question],
|
|
|
+ 'bad_qid': False,
|
|
|
+ 'response': True,
|
|
|
+ 'rcode': rcode.NOERROR(),
|
|
|
+ 'axfr_after_soa': self._create_normal_response_data
|
|
|
+ }
|
|
|
|
|
|
def tearDown(self):
|
|
|
self.conn.close()
|
|
|
if os.path.exists(TEST_DB_FILE):
|
|
|
os.remove(TEST_DB_FILE)
|
|
|
|
|
|
+ def test_close(self):
|
|
|
+ # we shouldn't be using the global asyncore map.
|
|
|
+ self.assertEqual(len(asyncore.socket_map), 0)
|
|
|
+ # there should be exactly one entry in our local map
|
|
|
+ self.assertEqual(len(self.sock_map), 1)
|
|
|
+ # once closing the dispatch the map should become empty
|
|
|
+ self.conn.close()
|
|
|
+ self.assertEqual(len(self.sock_map), 0)
|
|
|
+
|
|
|
def test_init_ip6(self):
|
|
|
# This test simply creates a new XfrinConnection object with an
|
|
|
# IPv6 address, tries to bind it to an IPv6 wildcard address/port
|
|
|
# to confirm an AF_INET6 socket has been created. A naive application
|
|
|
# tends to assume it's IPv4 only and hardcode AF_INET. This test
|
|
|
# uncovers such a bug.
|
|
|
- c = MockXfrinConnection('example.com.', TEST_DB_FILE,
|
|
|
+ c = MockXfrinConnection({}, 'example.com.', TEST_RRCLASS, TEST_DB_FILE,
|
|
|
threading.Event(),
|
|
|
- TEST_MASTER_IPV6_ADDRESS)
|
|
|
- #This test currently fails. Fix the code, then enable it
|
|
|
- #c.bind(('::', 0))
|
|
|
+ TEST_MASTER_IPV6_ADDRINFO)
|
|
|
+ c.bind(('::', 0))
|
|
|
+ c.close()
|
|
|
+
|
|
|
+ def test_init_chclass(self):
|
|
|
+ c = XfrinConnection({}, 'example.com.', rr_class.CH(), TEST_DB_FILE,
|
|
|
+ threading.Event(), TEST_MASTER_IPV4_ADDRINFO)
|
|
|
+ axfrmsg = c._create_query(rr_type.AXFR())
|
|
|
+ self.assertEqual(question_iter(axfrmsg).get_question().get_class(),
|
|
|
+ rr_class.CH())
|
|
|
c.close()
|
|
|
|
|
|
def test_response_with_invalid_msg(self):
|
|
|
self.conn.reply_data = b'aaaxxxx'
|
|
|
- self.assertRaises(XfrinTestException, self.conn._handle_xfrin_response)
|
|
|
+ self.assertRaises(XfrinTestException, self._handle_xfrin_response)
|
|
|
|
|
|
def test_response_without_end_soa(self):
|
|
|
self.conn._send_query(rr_type.AXFR())
|
|
|
self.conn.reply_data = self.conn.create_response_data()
|
|
|
- self.assertRaises(XfrinTestException, self.conn._handle_xfrin_response)
|
|
|
+ self.assertRaises(XfrinTestException, self._handle_xfrin_response)
|
|
|
|
|
|
def test_response_bad_qid(self):
|
|
|
self.conn._send_query(rr_type.AXFR())
|
|
|
self.conn.reply_data = self.conn.create_response_data(bad_qid = True)
|
|
|
- self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
|
|
|
+ self.assertRaises(XfrinException, self._handle_xfrin_response)
|
|
|
|
|
|
def test_response_non_response(self):
|
|
|
self.conn._send_query(rr_type.AXFR())
|
|
|
self.conn.reply_data = self.conn.create_response_data(response = False)
|
|
|
- self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
|
|
|
+ self.assertRaises(XfrinException, self._handle_xfrin_response)
|
|
|
|
|
|
def test_response_error_code(self):
|
|
|
self.conn._send_query(rr_type.AXFR())
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
- rcode = rcode.SERVFAIL())
|
|
|
- self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
|
|
|
+ rcode=rcode.SERVFAIL())
|
|
|
+ self.assertRaises(XfrinException, self._handle_xfrin_response)
|
|
|
|
|
|
def test_response_multi_question(self):
|
|
|
self.conn._send_query(rr_type.AXFR())
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
- questions=[example_question, example_question])
|
|
|
- self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
|
|
|
+ questions=[example_axfr_question, example_axfr_question])
|
|
|
+ self.assertRaises(XfrinException, self._handle_xfrin_response)
|
|
|
|
|
|
def test_response_empty_answer(self):
|
|
|
self.conn._send_query(rr_type.AXFR())
|
|
|
self.conn.reply_data = self.conn.create_response_data(answers=[])
|
|
|
# Should an empty answer trigger an exception? Even though it's very
|
|
|
# unusual it's not necessarily invalid. Need to revisit.
|
|
|
- self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
|
|
|
+ self.assertRaises(XfrinException, self._handle_xfrin_response)
|
|
|
+
|
|
|
+ def test_response_non_response(self):
|
|
|
+ self.conn._send_query(rr_type.AXFR())
|
|
|
+ self.conn.reply_data = self.conn.create_response_data(response = False)
|
|
|
+ self.assertRaises(XfrinException, self._handle_xfrin_response)
|
|
|
+
|
|
|
+ def test_soacheck(self):
|
|
|
+ # we need to defer the creation until we know the QID, which is
|
|
|
+ # determined in _check_soa_serial(), so we use response_generator.
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.assertEqual(self.conn._check_soa_serial(), XFRIN_OK)
|
|
|
+
|
|
|
+ def test_soacheck_with_bad_response(self):
|
|
|
+ self.conn.response_generator = self._create_broken_response_data
|
|
|
+ self.assertRaises(UserWarning, self.conn._check_soa_serial)
|
|
|
+
|
|
|
+ def test_soacheck_badqid(self):
|
|
|
+ self.soa_response_params['bad_qid'] = True
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
+
|
|
|
+ def test_soacheck_non_response(self):
|
|
|
+ self.soa_response_params['response'] = False
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
+
|
|
|
+ def test_soacheck_error_code(self):
|
|
|
+ self.soa_response_params['rcode'] = rcode.SERVFAIL()
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
|
|
|
def test_response_shutdown(self):
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
self.conn._shutdown_event.set()
|
|
|
self.conn._send_query(rr_type.AXFR())
|
|
|
- self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
|
|
|
+ self.assertRaises(XfrinException, self._handle_xfrin_response)
|
|
|
|
|
|
def test_response_timeout(self):
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
self.conn.force_time_out = True
|
|
|
- self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
|
|
|
+ self.assertRaises(XfrinException, self._handle_xfrin_response)
|
|
|
|
|
|
def test_response_remote_close(self):
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
self.conn.force_close = True
|
|
|
- self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
|
|
|
+ self.assertRaises(XfrinException, self._handle_xfrin_response)
|
|
|
|
|
|
def test_response_bad_message(self):
|
|
|
self.conn.response_generator = self._create_broken_response_data
|
|
|
self.conn._send_query(rr_type.AXFR())
|
|
|
- self.assertRaises(Exception, self.conn._handle_xfrin_response)
|
|
|
+ self.assertRaises(Exception, self._handle_xfrin_response)
|
|
|
|
|
|
def test_response(self):
|
|
|
- # normal case. should silently succeed.
|
|
|
+ # normal case.
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
self.conn._send_query(rr_type.AXFR())
|
|
|
- self.conn._handle_xfrin_response()
|
|
|
+ # two SOAs, and only these have been transfered. the 2nd SOA is just
|
|
|
+ # a marker, so only 1 RR has been provided in the iteration.
|
|
|
+ self.assertEqual(self._handle_xfrin_response(), 1)
|
|
|
|
|
|
def test_do_xfrin(self):
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
@@ -212,30 +299,41 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
|
|
|
def test_do_xfrin_empty_response(self):
|
|
|
# skipping the creation of response data, so the transfer will fail.
|
|
|
- # (but do_xfrin() always return OK.)
|
|
|
- self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
|
|
|
-
|
|
|
- def test_do_xfrin_empty_response(self):
|
|
|
- self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
|
|
|
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
|
|
|
def test_do_xfrin_bad_response(self):
|
|
|
self.conn.response_generator = self._create_broken_response_data
|
|
|
- self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
|
|
|
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
|
|
|
def test_do_xfrin_dberror(self):
|
|
|
# DB file is under a non existent directory, so its creation will fail,
|
|
|
# which will make the transfer fail.
|
|
|
self.conn._db_file = "not_existent/" + TEST_DB_FILE
|
|
|
- self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
|
|
|
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
|
|
|
|
-# This test currently doesn't work due to bug. Fix it and then enable the test.
|
|
|
-# def test_do_xfrin_with_soacheck(self):
|
|
|
-# self.conn.response_generator = self._create_normal_response_data
|
|
|
-# self.assertEqual(self.conn.do_xfrin(True), XFRIN_OK)
|
|
|
+ def test_do_soacheck_and_xfrin(self):
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(True), XFRIN_OK)
|
|
|
|
|
|
-# def test_do_xfrin_with_soacheck_bad_response(self):
|
|
|
-# self.conn.response_generator = self._create_broken_response_data
|
|
|
-# self.assertEqual(self.conn.do_xfrin(True), XFRIN_OK)
|
|
|
+ def test_do_soacheck_broken_response(self):
|
|
|
+ self.conn.response_generator = self._create_broken_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(True), XFRIN_FAIL)
|
|
|
+
|
|
|
+ def test_do_soacheck_badqid(self):
|
|
|
+ # the QID mismatch would internally trigger a XfrinException exception,
|
|
|
+ # and covers part of the code that other tests can't.
|
|
|
+ self.soa_response_params['bad_qid'] = True
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
+ self.assertEqual(self.conn.do_xfrin(True), XFRIN_FAIL)
|
|
|
+
|
|
|
+ def _handle_xfrin_response(self):
|
|
|
+ # This helper methods iterates over all RRs (excluding the ending SOA)
|
|
|
+ # transferred, and simply returns the number of RRs. The return value
|
|
|
+ # may be used an assertion value for test cases.
|
|
|
+ rrs = 0
|
|
|
+ for rr in self.conn._handle_xfrin_response():
|
|
|
+ rrs += 1
|
|
|
+ return rrs
|
|
|
|
|
|
def _create_normal_response_data(self):
|
|
|
# This helper method creates a simple sequence of DNS messages that
|
|
@@ -244,6 +342,19 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
self.conn.reply_data = self.conn.create_response_data()
|
|
|
self.conn.reply_data += self.conn.create_response_data()
|
|
|
|
|
|
+ def _create_soa_response_data(self):
|
|
|
+ # This helper method creates a DNS message that is supposed to be
|
|
|
+ # used a valid response to SOA queries prior to XFR.
|
|
|
+ # If axfr_after_soa is True, it resets the response_generator so that
|
|
|
+ # a valid XFR messages will follow.
|
|
|
+ self.conn.reply_data = self.conn.create_response_data(
|
|
|
+ bad_qid=self.soa_response_params['bad_qid'],
|
|
|
+ response=self.soa_response_params['response'],
|
|
|
+ rcode=self.soa_response_params['rcode'],
|
|
|
+ questions=self.soa_response_params['questions'])
|
|
|
+ if self.soa_response_params['axfr_after_soa'] != None:
|
|
|
+ self.conn.response_generator = self.soa_response_params['axfr_after_soa']
|
|
|
+
|
|
|
def _create_broken_response_data(self):
|
|
|
# This helper method creates a bogus "DNS message" that only contains
|
|
|
# 4 octets of data. The DNS message parser will raise an exception.
|
|
@@ -284,10 +395,9 @@ class TestXfrinRecorder(unittest.TestCase):
|
|
|
self.assertEqual(self.recorder.xfrin_in_progress(TEST_ZONE_NAME), False)
|
|
|
|
|
|
class TestXfrin(unittest.TestCase):
|
|
|
- args = {}
|
|
|
-
|
|
|
def setUp(self):
|
|
|
self.xfr = MockXfrin()
|
|
|
+ self.args = {}
|
|
|
self.args['zone_name'] = TEST_ZONE_NAME
|
|
|
self.args['port'] = TEST_MASTER_PORT
|
|
|
self.args['master'] = TEST_MASTER_IPV4_ADDRESS
|
|
@@ -300,19 +410,21 @@ class TestXfrin(unittest.TestCase):
|
|
|
return self.xfr._parse_cmd_params(self.args)
|
|
|
|
|
|
def test_parse_cmd_params(self):
|
|
|
- name, master, port, db_file = self._do_parse()
|
|
|
- self.assertEqual(port, int(TEST_MASTER_PORT))
|
|
|
+ name, master_addrinfo, db_file = self._do_parse()
|
|
|
+ self.assertEqual(master_addrinfo[4][1], int(TEST_MASTER_PORT))
|
|
|
self.assertEqual(name, TEST_ZONE_NAME)
|
|
|
- self.assertEqual(master, TEST_MASTER_IPV4_ADDRESS)
|
|
|
+ self.assertEqual(master_addrinfo[4][0], TEST_MASTER_IPV4_ADDRESS)
|
|
|
self.assertEqual(db_file, TEST_DB_FILE)
|
|
|
|
|
|
def test_parse_cmd_params_default_port(self):
|
|
|
del self.args['port']
|
|
|
- self.assertEqual(self._do_parse()[2], 53)
|
|
|
+ master_addrinfo = self._do_parse()[1]
|
|
|
+ self.assertEqual(master_addrinfo[4][1], 53)
|
|
|
|
|
|
def test_parse_cmd_params_ip6master(self):
|
|
|
self.args['master'] = TEST_MASTER_IPV6_ADDRESS
|
|
|
- self.assertEqual(self._do_parse()[1], TEST_MASTER_IPV6_ADDRESS)
|
|
|
+ master_addrinfo = self._do_parse()[1]
|
|
|
+ self.assertEqual(master_addrinfo[4][0], TEST_MASTER_IPV6_ADDRESS)
|
|
|
|
|
|
def test_parse_cmd_params_nozone(self):
|
|
|
# zone name is mandatory.
|
|
@@ -325,7 +437,7 @@ class TestXfrin(unittest.TestCase):
|
|
|
self.assertRaises(XfrinException, self._do_parse)
|
|
|
|
|
|
def test_parse_cmd_params_bad_ip4(self):
|
|
|
- self.args['master'] = '3.3.3'
|
|
|
+ self.args['master'] = '3.3.3.3.3'
|
|
|
self.assertRaises(XfrinException, self._do_parse)
|
|
|
|
|
|
def test_parse_cmd_params_bad_ip6(self):
|
|
@@ -339,6 +451,9 @@ class TestXfrin(unittest.TestCase):
|
|
|
self.args['port'] = '65536'
|
|
|
self.assertRaises(XfrinException, self._do_parse)
|
|
|
|
|
|
+ self.args['port'] = 'http'
|
|
|
+ self.assertRaises(XfrinException, self._do_parse)
|
|
|
+
|
|
|
def test_command_handler_shutdown(self):
|
|
|
self.assertEqual(self.xfr.command_handler("shutdown",
|
|
|
None)['result'][0], 0)
|
|
@@ -346,9 +461,6 @@ class TestXfrin(unittest.TestCase):
|
|
|
self.assertEqual(self.xfr.command_handler("shutdown",
|
|
|
"unused")['result'][0], 0)
|
|
|
|
|
|
- self.assertEqual(self.xfr.command_handler("Shutdown",
|
|
|
- "unused")['result'][0], 0)
|
|
|
-
|
|
|
def test_command_handler_retransfer(self):
|
|
|
self.assertEqual(self.xfr.command_handler("retransfer",
|
|
|
self.args)['result'][0], 0)
|
|
@@ -390,6 +502,40 @@ class TestXfrin(unittest.TestCase):
|
|
|
self.assertEqual(self.xfr.command_handler("refresh",
|
|
|
self.args)['result'][0], 0)
|
|
|
|
|
|
+ def test_command_handler_unknown(self):
|
|
|
+ self.assertEqual(self.xfr.command_handler("xxx", None)['result'][0], 1)
|
|
|
+
|
|
|
+def raise_interrupt():
|
|
|
+ raise KeyboardInterrupt()
|
|
|
+
|
|
|
+def raise_ccerror():
|
|
|
+ raise isc.cc.session.SessionError('test error')
|
|
|
+
|
|
|
+def raise_excpetion():
|
|
|
+ raise Exception('test exception')
|
|
|
+
|
|
|
+class TestMain(unittest.TestCase):
|
|
|
+ def setUp(self):
|
|
|
+ MockXfrin.check_command_hook = None
|
|
|
+
|
|
|
+ def tearDown(self):
|
|
|
+ MockXfrin.check_command_hook = None
|
|
|
+
|
|
|
+ def test_startup(self):
|
|
|
+ main(MockXfrin, False)
|
|
|
+
|
|
|
+ def test_startup_interrupt(self):
|
|
|
+ MockXfrin.check_command_hook = raise_interrupt
|
|
|
+ main(MockXfrin, False)
|
|
|
+
|
|
|
+ def test_startup_ccerror(self):
|
|
|
+ MockXfrin.check_command_hook = raise_ccerror
|
|
|
+ main(MockXfrin, False)
|
|
|
+
|
|
|
+ def test_startup_generalerror(self):
|
|
|
+ MockXfrin.check_command_hook = raise_excpetion
|
|
|
+ main(MockXfrin, False)
|
|
|
+
|
|
|
if __name__== "__main__":
|
|
|
try:
|
|
|
unittest.main()
|