|
@@ -855,8 +855,6 @@ class TestAXFR(TestXfrinConnection):
|
|
|
|
|
|
def tearDown(self):
|
|
def tearDown(self):
|
|
time.time = self.orig_time_time
|
|
time.time = self.orig_time_time
|
|
- # clear all statistics counters after each test
|
|
|
|
- self.conn._counters.clear_all()
|
|
|
|
super().tearDown()
|
|
super().tearDown()
|
|
|
|
|
|
def __create_mock_tsig(self, key, error, has_last_signature=True):
|
|
def __create_mock_tsig(self, key, error, has_last_signature=True):
|
|
@@ -1087,18 +1085,7 @@ class TestAXFR(TestXfrinConnection):
|
|
# we need to defer the creation until we know the QID, which is
|
|
# we need to defer the creation until we know the QID, which is
|
|
# determined in _check_soa_serial(), so we use response_generator.
|
|
# determined in _check_soa_serial(), so we use response_generator.
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
- # check the statistics counters
|
|
|
|
- name2count = (('soaoutv4', 1),
|
|
|
|
- ('soaoutv6', 0))
|
|
|
|
- for (n, c) in name2count:
|
|
|
|
- self.assertRaises(isc.cc.data.DataNotFoundError,
|
|
|
|
- self.conn._counters.get, 'zones',
|
|
|
|
- TEST_ZONE_NAME_STR, n)
|
|
|
|
self.assertEqual(self.conn._check_soa_serial(), XFRIN_OK)
|
|
self.assertEqual(self.conn._check_soa_serial(), XFRIN_OK)
|
|
- for (n, c) in name2count:
|
|
|
|
- self.assertEqual(c, self.conn._counters.get('zones',
|
|
|
|
- TEST_ZONE_NAME_STR,
|
|
|
|
- n))
|
|
|
|
|
|
|
|
def test_soacheck_with_bad_response(self):
|
|
def test_soacheck_with_bad_response(self):
|
|
self.conn.response_generator = self._create_broken_response_data
|
|
self.conn.response_generator = self._create_broken_response_data
|
|
@@ -1464,17 +1451,6 @@ class TestAXFR(TestXfrinConnection):
|
|
|
|
|
|
def test_do_xfrin(self):
|
|
def test_do_xfrin(self):
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
- # check the statistics counters
|
|
|
|
- name2count = (('axfrreqv4', 1),
|
|
|
|
- ('axfrreqv6', 0),
|
|
|
|
- ('ixfrreqv4', 0),
|
|
|
|
- ('ixfrreqv6', 0),
|
|
|
|
- ('xfrsuccess', 1),
|
|
|
|
- ('latest_axfr_duration', 0.0))
|
|
|
|
- for (n, c) in name2count:
|
|
|
|
- self.assertRaises(isc.cc.data.DataNotFoundError,
|
|
|
|
- self.conn._counters.get, 'zones',
|
|
|
|
- TEST_ZONE_NAME_STR, n)
|
|
|
|
self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
|
|
self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
|
|
self.assertFalse(self.conn._datasrc_client._journaling_enabled)
|
|
self.assertFalse(self.conn._datasrc_client._journaling_enabled)
|
|
|
|
|
|
@@ -1485,19 +1461,6 @@ class TestAXFR(TestXfrinConnection):
|
|
self.assertEqual(0, self.conn._transfer_stats.ixfr_addition_count)
|
|
self.assertEqual(0, self.conn._transfer_stats.ixfr_addition_count)
|
|
self.assertEqual(177, self.conn._transfer_stats.byte_count)
|
|
self.assertEqual(177, self.conn._transfer_stats.byte_count)
|
|
self.assertGreater(self.conn._transfer_stats.get_running_time(), 0)
|
|
self.assertGreater(self.conn._transfer_stats.get_running_time(), 0)
|
|
- for (n, c) in name2count:
|
|
|
|
- if n == 'latest_axfr_duration':
|
|
|
|
- self.assertGreaterEqual(
|
|
|
|
- self.conn._counters.get('zones',
|
|
|
|
- TEST_ZONE_NAME_STR,
|
|
|
|
- n),
|
|
|
|
- c)
|
|
|
|
- else:
|
|
|
|
- self.assertEqual(
|
|
|
|
- c,
|
|
|
|
- self.conn._counters.get('zones',
|
|
|
|
- TEST_ZONE_NAME_STR,
|
|
|
|
- n))
|
|
|
|
|
|
|
|
def test_do_xfrin_with_tsig(self):
|
|
def test_do_xfrin_with_tsig(self):
|
|
# use TSIG with a mock context. we fake all verify results to
|
|
# use TSIG with a mock context. we fake all verify results to
|
|
@@ -1519,15 +1482,8 @@ class TestAXFR(TestXfrinConnection):
|
|
self.conn._tsig_ctx_creator = \
|
|
self.conn._tsig_ctx_creator = \
|
|
lambda key: self.__create_mock_tsig(key, TSIGError.BAD_SIG)
|
|
lambda key: self.__create_mock_tsig(key, TSIGError.BAD_SIG)
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
- # check the statistics counters
|
|
|
|
- self.assertRaises(isc.cc.data.DataNotFoundError,
|
|
|
|
- self.conn._counters.get, 'zones',
|
|
|
|
- TEST_ZONE_NAME_STR, 'xfrfail')
|
|
|
|
self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
|
|
self.assertEqual(1, self.conn._tsig_ctx.verify_called)
|
|
self.assertEqual(1, self.conn._tsig_ctx.verify_called)
|
|
- self.assertEqual(1, self.conn._counters.get('zones',
|
|
|
|
- TEST_ZONE_NAME_STR,
|
|
|
|
- 'xfrfail'))
|
|
|
|
|
|
|
|
def test_do_xfrin_without_last_tsig(self):
|
|
def test_do_xfrin_without_last_tsig(self):
|
|
# TSIG verify will succeed, but it will pretend the last message is
|
|
# TSIG verify will succeed, but it will pretend the last message is
|
|
@@ -1666,75 +1622,6 @@ class TestAXFR(TestXfrinConnection):
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
self.assertEqual(self.conn.do_xfrin(True), XFRIN_FAIL)
|
|
self.assertEqual(self.conn.do_xfrin(True), XFRIN_FAIL)
|
|
|
|
|
|
-class TestXfrinConnectionV6(TestXfrinConnection):
|
|
|
|
- '''Test of TestXfrinConnection for IPv6'''
|
|
|
|
- master_addrinfo = TEST_MASTER_IPV6_ADDRINFO
|
|
|
|
-
|
|
|
|
-class TestAXFRV6(TestXfrinConnectionV6):
|
|
|
|
- def setUp(self):
|
|
|
|
- super().setUp()
|
|
|
|
- XfrinInitialSOA().set_xfrstate(self.conn, XfrinInitialSOA())
|
|
|
|
-
|
|
|
|
- def tearDown(self):
|
|
|
|
- # clear all statistics counters after each test
|
|
|
|
- self.conn._counters.clear_all()
|
|
|
|
- super().tearDown()
|
|
|
|
-
|
|
|
|
- def test_soacheck(self):
|
|
|
|
- # we need to defer the creation until we know the QID, which is
|
|
|
|
- # determined in _check_soa_serial(), so we use response_generator.
|
|
|
|
- self.conn.response_generator = self._create_soa_response_data
|
|
|
|
- # check the statistics counters
|
|
|
|
- name2count = (('soaoutv4', 0),
|
|
|
|
- ('soaoutv6', 1))
|
|
|
|
- for (n, c) in name2count:
|
|
|
|
- self.assertRaises(isc.cc.data.DataNotFoundError,
|
|
|
|
- self.conn._counters.get, 'zones',
|
|
|
|
- TEST_ZONE_NAME_STR, n)
|
|
|
|
- self.assertEqual(self.conn._check_soa_serial(), XFRIN_OK)
|
|
|
|
- for (n, c) in name2count:
|
|
|
|
- self.assertEqual(c, self.conn._counters.get('zones',
|
|
|
|
- TEST_ZONE_NAME_STR,
|
|
|
|
- n))
|
|
|
|
-
|
|
|
|
- def test_do_xfrin(self):
|
|
|
|
- self.conn.response_generator = self._create_normal_response_data
|
|
|
|
- # check the statistics counters
|
|
|
|
- name2count = (('axfrreqv4', 0),
|
|
|
|
- ('axfrreqv6', 1),
|
|
|
|
- ('ixfrreqv4', 0),
|
|
|
|
- ('ixfrreqv6', 0),
|
|
|
|
- ('xfrsuccess', 1),
|
|
|
|
- ('latest_axfr_duration', 0.0))
|
|
|
|
- for (n, c) in name2count:
|
|
|
|
- self.assertRaises(isc.cc.data.DataNotFoundError,
|
|
|
|
- self.conn._counters.get, 'zones',
|
|
|
|
- TEST_ZONE_NAME_STR, n)
|
|
|
|
- self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
|
|
|
|
- self.assertFalse(self.conn._datasrc_client._journaling_enabled)
|
|
|
|
-
|
|
|
|
- self.assertEqual(2, self.conn._transfer_stats.message_count)
|
|
|
|
- self.assertEqual(2, self.conn._transfer_stats.axfr_rr_count)
|
|
|
|
- self.assertEqual(0, self.conn._transfer_stats.ixfr_changeset_count)
|
|
|
|
- self.assertEqual(0, self.conn._transfer_stats.ixfr_deletion_count)
|
|
|
|
- self.assertEqual(0, self.conn._transfer_stats.ixfr_addition_count)
|
|
|
|
- self.assertEqual(177, self.conn._transfer_stats.byte_count)
|
|
|
|
- self.assertGreater(self.conn._transfer_stats.get_running_time(), 0)
|
|
|
|
- for (n, c) in name2count:
|
|
|
|
- if n == 'latest_axfr_duration':
|
|
|
|
- self.assertAlmostEqual(
|
|
|
|
- c,
|
|
|
|
- self.conn._counters.get('zones',
|
|
|
|
- TEST_ZONE_NAME_STR,
|
|
|
|
- n),
|
|
|
|
- delta=1.0)
|
|
|
|
- else:
|
|
|
|
- self.assertEqual(
|
|
|
|
- c,
|
|
|
|
- self.conn._counters.get('zones',
|
|
|
|
- TEST_ZONE_NAME_STR,
|
|
|
|
- n))
|
|
|
|
-
|
|
|
|
class TestIXFRResponse(TestXfrinConnection):
|
|
class TestIXFRResponse(TestXfrinConnection):
|
|
def setUp(self):
|
|
def setUp(self):
|
|
# replace time.time with a steadily increasing fake one
|
|
# replace time.time with a steadily increasing fake one
|