|
@@ -1,4 +1,4 @@
|
|
|
-# Copyright (C) 2009 Internet Systems Consortium.
|
|
|
+# Copyright (C) 2011 Internet Systems Consortium.
|
|
|
#
|
|
|
# Permission to use, copy, modify, and distribute this software for any
|
|
|
# purpose with or without fee is hereby granted, provided that the above
|
|
@@ -20,8 +20,10 @@ from xfrin import *
|
|
|
#
|
|
|
# Commonly used (mostly constant) test parameters
|
|
|
#
|
|
|
-TEST_ZONE_NAME = "example.com"
|
|
|
+TEST_ZONE_NAME_STR = "example.com."
|
|
|
+TEST_ZONE_NAME = Name(TEST_ZONE_NAME_STR)
|
|
|
TEST_RRCLASS = RRClass.IN()
|
|
|
+TEST_RRCLASS_STR = 'IN'
|
|
|
TEST_DB_FILE = 'db_file'
|
|
|
TEST_MASTER_IPV4_ADDRESS = '127.0.0.1'
|
|
|
TEST_MASTER_IPV4_ADDRINFO = (socket.AF_INET, socket.SOCK_STREAM,
|
|
@@ -40,12 +42,12 @@ TSIG_KEY = TSIGKey("example.com:SFuWd/q99SzF8Yzd1QbB9g==")
|
|
|
soa_rdata = Rdata(RRType.SOA(), TEST_RRCLASS,
|
|
|
'master.example.com. admin.example.com ' +
|
|
|
'1234 3600 1800 2419200 7200')
|
|
|
-soa_rrset = RRset(Name(TEST_ZONE_NAME), TEST_RRCLASS, RRType.SOA(),
|
|
|
+soa_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA(),
|
|
|
RRTTL(3600))
|
|
|
soa_rrset.add_rdata(soa_rdata)
|
|
|
-example_axfr_question = Question(Name(TEST_ZONE_NAME), TEST_RRCLASS,
|
|
|
+example_axfr_question = Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
RRType.AXFR())
|
|
|
-example_soa_question = Question(Name(TEST_ZONE_NAME), TEST_RRCLASS,
|
|
|
+example_soa_question = Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
RRType.SOA())
|
|
|
default_questions = [example_axfr_question]
|
|
|
default_answers = [soa_rrset]
|
|
@@ -60,6 +62,13 @@ def strip_mutable_tsig_data(data):
|
|
|
# Time Signed.
|
|
|
return data[0:-32] + data[-26:-22] + data[-6:]
|
|
|
|
|
|
+class MockCC():
|
|
|
+ def get_default_value(self, identifier):
|
|
|
+ if identifier == "zones/master_port":
|
|
|
+ return TEST_MASTER_PORT
|
|
|
+ if identifier == "zones/class":
|
|
|
+ return 'IN'
|
|
|
+
|
|
|
class MockXfrin(Xfrin):
|
|
|
# This is a class attribute of a callable object that specifies a non
|
|
|
# default behavior triggered in _cc_check_command(). Specific test methods
|
|
@@ -70,6 +79,7 @@ class MockXfrin(Xfrin):
|
|
|
|
|
|
def _cc_setup(self):
|
|
|
self._tsig_key_str = None
|
|
|
+ self._module_cc = MockCC()
|
|
|
pass
|
|
|
|
|
|
def _get_db_file(self):
|
|
@@ -80,6 +90,19 @@ class MockXfrin(Xfrin):
|
|
|
if MockXfrin.check_command_hook:
|
|
|
MockXfrin.check_command_hook()
|
|
|
|
|
|
+ def xfrin_start(self, zone_name, rrclass, db_file, master_addrinfo, tsig_key_str,
|
|
|
+ check_soa = True):
|
|
|
+ # store some of the arguments for verification, then call this
|
|
|
+ # method in the superclass
|
|
|
+ self.xfrin_started_zone_name = zone_name
|
|
|
+ self.xfrin_started_rrclass = rrclass
|
|
|
+ self.xfrin_started_master_addr = master_addrinfo[2][0]
|
|
|
+ self.xfrin_started_master_port = master_addrinfo[2][1]
|
|
|
+ self.xfrin_started_tsig_key_str = tsig_key_str
|
|
|
+ return Xfrin.xfrin_start(self, zone_name, rrclass, db_file,
|
|
|
+ master_addrinfo, tsig_key_str,
|
|
|
+ check_soa = True)
|
|
|
+
|
|
|
class MockXfrinConnection(XfrinConnection):
|
|
|
def __init__(self, sock_map, zone_name, rrclass, db_file, shutdown_event,
|
|
|
master_addr):
|
|
@@ -450,7 +473,7 @@ class TestXfrin(unittest.TestCase):
|
|
|
sys.stderr = open(os.devnull, 'w')
|
|
|
self.xfr = MockXfrin()
|
|
|
self.args = {}
|
|
|
- self.args['zone_name'] = TEST_ZONE_NAME
|
|
|
+ self.args['zone_name'] = TEST_ZONE_NAME_STR
|
|
|
self.args['port'] = TEST_MASTER_PORT
|
|
|
self.args['master'] = TEST_MASTER_IPV4_ADDRESS
|
|
|
self.args['db_file'] = TEST_DB_FILE
|
|
@@ -502,8 +525,7 @@ class TestXfrin(unittest.TestCase):
|
|
|
def test_parse_cmd_params_nomaster(self):
|
|
|
# master address is mandatory.
|
|
|
del self.args['master']
|
|
|
- master_addrinfo = self._do_parse_master_port()
|
|
|
- self.assertEqual(master_addrinfo[2][0], DEFAULT_MASTER)
|
|
|
+ self.assertRaises(XfrinException, self._do_parse_master_port)
|
|
|
|
|
|
def test_parse_cmd_params_bad_ip4(self):
|
|
|
self.args['master'] = '3.3.3.3.3'
|
|
@@ -533,28 +555,30 @@ class TestXfrin(unittest.TestCase):
|
|
|
def test_command_handler_retransfer(self):
|
|
|
self.assertEqual(self.xfr.command_handler("retransfer",
|
|
|
self.args)['result'][0], 0)
|
|
|
+ self.assertEqual(self.args['master'], self.xfr.xfrin_started_master_addr)
|
|
|
+ self.assertEqual(int(self.args['port']), self.xfr.xfrin_started_master_port)
|
|
|
|
|
|
def test_command_handler_retransfer_short_command1(self):
|
|
|
# try it when only specifying the zone name (of unknown zone)
|
|
|
short_args = {}
|
|
|
- short_args['zone_name'] = TEST_ZONE_NAME
|
|
|
+ short_args['zone_name'] = TEST_ZONE_NAME_STR
|
|
|
self.assertEqual(self.xfr.command_handler("retransfer",
|
|
|
- short_args)['result'][0], 0)
|
|
|
+ short_args)['result'][0], 1)
|
|
|
|
|
|
def test_command_handler_retransfer_short_command2(self):
|
|
|
# try it when only specifying the zone name (of unknown zone)
|
|
|
short_args = {}
|
|
|
- short_args['zone_name'] = TEST_ZONE_NAME + "."
|
|
|
+ short_args['zone_name'] = TEST_ZONE_NAME_STR
|
|
|
self.assertEqual(self.xfr.command_handler("retransfer",
|
|
|
- short_args)['result'][0], 0)
|
|
|
+ short_args)['result'][0], 1)
|
|
|
|
|
|
def test_command_handler_retransfer_short_command3(self):
|
|
|
# try it when only specifying the zone name (of known zone)
|
|
|
short_args = {}
|
|
|
- short_args['zone_name'] = TEST_ZONE_NAME
|
|
|
+ short_args['zone_name'] = TEST_ZONE_NAME_STR
|
|
|
|
|
|
zones = { 'zones': [
|
|
|
- { 'name': TEST_ZONE_NAME,
|
|
|
+ { 'name': TEST_ZONE_NAME_STR,
|
|
|
'master_addr': TEST_MASTER_IPV4_ADDRESS,
|
|
|
'master_port': TEST_MASTER_PORT
|
|
|
}
|
|
@@ -562,6 +586,10 @@ class TestXfrin(unittest.TestCase):
|
|
|
self.xfr.config_handler(zones)
|
|
|
self.assertEqual(self.xfr.command_handler("retransfer",
|
|
|
short_args)['result'][0], 0)
|
|
|
+ self.assertEqual(TEST_MASTER_IPV4_ADDRESS,
|
|
|
+ self.xfr.xfrin_started_master_addr)
|
|
|
+ self.assertEqual(int(TEST_MASTER_PORT),
|
|
|
+ self.xfr.xfrin_started_master_port)
|
|
|
|
|
|
def test_command_handler_retransfer_badcommand(self):
|
|
|
self.args['master'] = 'invalid'
|
|
@@ -569,13 +597,15 @@ class TestXfrin(unittest.TestCase):
|
|
|
self.args)['result'][0], 1)
|
|
|
|
|
|
def test_command_handler_retransfer_quota(self):
|
|
|
+ self.args['master'] = '127.0.0.1'
|
|
|
+
|
|
|
for i in range(self.xfr._max_transfers_in - 1):
|
|
|
- self.xfr.recorder.increment(str(i) + TEST_ZONE_NAME)
|
|
|
+ self.xfr.recorder.increment(Name(str(i) + TEST_ZONE_NAME_STR))
|
|
|
# there can be one more outstanding transfer.
|
|
|
self.assertEqual(self.xfr.command_handler("retransfer",
|
|
|
self.args)['result'][0], 0)
|
|
|
# make sure the # xfrs would excceed the quota
|
|
|
- self.xfr.recorder.increment(str(self.xfr._max_transfers_in) + TEST_ZONE_NAME)
|
|
|
+ self.xfr.recorder.increment(Name(str(self.xfr._max_transfers_in) + TEST_ZONE_NAME_STR))
|
|
|
# this one should fail
|
|
|
self.assertEqual(self.xfr.command_handler("retransfer",
|
|
|
self.args)['result'][0], 1)
|
|
@@ -599,6 +629,10 @@ class TestXfrin(unittest.TestCase):
|
|
|
self.args['master'] = TEST_MASTER_IPV6_ADDRESS
|
|
|
self.assertEqual(self.xfr.command_handler("refresh",
|
|
|
self.args)['result'][0], 0)
|
|
|
+ self.assertEqual(TEST_MASTER_IPV6_ADDRESS,
|
|
|
+ self.xfr.xfrin_started_master_addr)
|
|
|
+ self.assertEqual(int(TEST_MASTER_PORT),
|
|
|
+ self.xfr.xfrin_started_master_port)
|
|
|
|
|
|
def test_command_handler_notify(self):
|
|
|
# at this level, refresh is no different than retransfer.
|
|
@@ -611,8 +645,9 @@ class TestXfrin(unittest.TestCase):
|
|
|
# try it with a known zone
|
|
|
self.args['master'] = TEST_MASTER_IPV6_ADDRESS
|
|
|
|
|
|
+ # but use a different address in the actual command
|
|
|
zones = { 'zones': [
|
|
|
- { 'name': TEST_ZONE_NAME,
|
|
|
+ { 'name': TEST_ZONE_NAME_STR,
|
|
|
'master_addr': TEST_MASTER_IPV4_ADDRESS,
|
|
|
'master_port': TEST_MASTER_PORT
|
|
|
}
|
|
@@ -621,6 +656,16 @@ class TestXfrin(unittest.TestCase):
|
|
|
self.assertEqual(self.xfr.command_handler("notify",
|
|
|
self.args)['result'][0], 0)
|
|
|
|
|
|
+ # and see if we used the address from the command, and not from
|
|
|
+ # the config
|
|
|
+ # This is actually NOT the address given in the command, which
|
|
|
+ # would at this point not make sense, see the TODO in
|
|
|
+ # xfrin.py.in:542)
|
|
|
+ self.assertEqual(TEST_MASTER_IPV4_ADDRESS,
|
|
|
+ self.xfr.xfrin_started_master_addr)
|
|
|
+ self.assertEqual(int(TEST_MASTER_PORT),
|
|
|
+ self.xfr.xfrin_started_master_port)
|
|
|
+
|
|
|
def test_command_handler_unknown(self):
|
|
|
self.assertEqual(self.xfr.command_handler("xxx", None)['result'][0], 1)
|
|
|
|
|
@@ -629,37 +674,127 @@ class TestXfrin(unittest.TestCase):
|
|
|
self.assertEqual(self.xfr.config_handler({'transfers_in': 3})['result'][0], 0)
|
|
|
self.assertEqual(self.xfr._max_transfers_in, 3)
|
|
|
|
|
|
+ def _check_zones_config(self, config_given):
|
|
|
+ for zone_config in config_given['zones']:
|
|
|
+ zone_name = zone_config['name']
|
|
|
+ zone_info = self.xfr._get_zone_info(Name(zone_name), RRClass.IN())
|
|
|
+ self.assertEqual(zone_info.master_addr_str, zone_config['master_addr'])
|
|
|
+ self.assertEqual(zone_info.master_port_str, zone_config['master_port'])
|
|
|
+ if 'tsig_key' in zone_config:
|
|
|
+ self.assertEqual(zone_info.tsig_key_str, zone_config['tsig_key'])
|
|
|
+ else:
|
|
|
+ self.assertIsNone(zone_info.tsig_key_str)
|
|
|
+
|
|
|
def test_command_handler_zones(self):
|
|
|
+ zones1 = { 'zones': [
|
|
|
+ { 'name': 'test.example.',
|
|
|
+ 'master_addr': '192.0.2.1',
|
|
|
+ 'master_port': 53
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.assertEqual(self.xfr.config_handler(zones1)['result'][0], 0)
|
|
|
+ self._check_zones_config(zones1)
|
|
|
+
|
|
|
+ zones2 = { 'zones': [
|
|
|
+ { 'name': 'test.example.',
|
|
|
+ 'master_addr': '192.0.2.2',
|
|
|
+ 'master_port': 53,
|
|
|
+ 'tsig_key': "example.com:SFuWd/q99SzF8Yzd1QbB9g=="
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.assertEqual(self.xfr.config_handler(zones2)['result'][0], 0)
|
|
|
+ self._check_zones_config(zones2)
|
|
|
+
|
|
|
+ # test that configuring the zone multiple times fails
|
|
|
+ zones = { 'zones': [
|
|
|
+ { 'name': 'test.example.',
|
|
|
+ 'master_addr': '192.0.2.1',
|
|
|
+ 'master_port': 53
|
|
|
+ },
|
|
|
+ { 'name': 'test.example.',
|
|
|
+ 'master_addr': '192.0.2.2',
|
|
|
+ 'master_port': 53
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.assertEqual(self.xfr.config_handler(zones)['result'][0], 1)
|
|
|
+ # since this has failed, we should still have the previous config
|
|
|
+ self._check_zones_config(zones2)
|
|
|
+
|
|
|
+ zones = { 'zones': [
|
|
|
+ { 'name': 'test.example.',
|
|
|
+ 'master_addr': '192.0.2.3',
|
|
|
+ 'master_port': 53,
|
|
|
+ 'class': 'BADCLASS'
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.assertEqual(self.xfr.config_handler(zones)['result'][0], 1)
|
|
|
+ self._check_zones_config(zones2)
|
|
|
+
|
|
|
+ zones = { 'zones': [
|
|
|
+ { 'master_addr': '192.0.2.4',
|
|
|
+ 'master_port': 53
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.assertEqual(self.xfr.config_handler(zones)['result'][0], 1)
|
|
|
+ # since this has failed, we should still have the previous config
|
|
|
+ self._check_zones_config(zones2)
|
|
|
+
|
|
|
zones = { 'zones': [
|
|
|
- { 'name': 'test.com.',
|
|
|
- 'master_addr': '1.1.1.1',
|
|
|
+ { 'name': 'bad..zone.',
|
|
|
+ 'master_addr': '192.0.2.5',
|
|
|
'master_port': 53
|
|
|
}
|
|
|
]}
|
|
|
- self.assertEqual(self.xfr.config_handler(zones)['result'][0], 0)
|
|
|
+ self.assertEqual(self.xfr.config_handler(zones)['result'][0], 1)
|
|
|
+ # since this has failed, we should still have the previous config
|
|
|
+ self._check_zones_config(zones2)
|
|
|
|
|
|
zones = { 'zones': [
|
|
|
- { 'master_addr': '1.1.1.1',
|
|
|
+ { 'name': '',
|
|
|
+ 'master_addr': '192.0.2.6',
|
|
|
'master_port': 53
|
|
|
}
|
|
|
]}
|
|
|
self.assertEqual(self.xfr.config_handler(zones)['result'][0], 1)
|
|
|
+ # since this has failed, we should still have the previous config
|
|
|
+ self._check_zones_config(zones2)
|
|
|
|
|
|
zones = { 'zones': [
|
|
|
- { 'name': 'test.com',
|
|
|
+ { 'name': 'test.example',
|
|
|
'master_addr': 'badaddress',
|
|
|
'master_port': 53
|
|
|
}
|
|
|
]}
|
|
|
self.assertEqual(self.xfr.config_handler(zones)['result'][0], 1)
|
|
|
+ # since this has failed, we should still have the previous config
|
|
|
+ self._check_zones_config(zones2)
|
|
|
|
|
|
zones = { 'zones': [
|
|
|
- { 'name': 'test.com',
|
|
|
- 'master_addr': '1.1.1.1',
|
|
|
+ { 'name': 'test.example',
|
|
|
+ 'master_addr': '192.0.2.7',
|
|
|
'master_port': 'bad_port'
|
|
|
}
|
|
|
]}
|
|
|
self.assertEqual(self.xfr.config_handler(zones)['result'][0], 1)
|
|
|
+ # since this has failed, we should still have the previous config
|
|
|
+ self._check_zones_config(zones2)
|
|
|
+
|
|
|
+ # let's also add a zone that is correct too, and make sure
|
|
|
+ # that the new config is not partially taken
|
|
|
+ zones = { 'zones': [
|
|
|
+ { 'name': 'test.example.',
|
|
|
+ 'master_addr': '192.0.2.8',
|
|
|
+ 'master_port': 53
|
|
|
+ },
|
|
|
+ { 'name': 'test2.example.',
|
|
|
+ 'master_addr': '192.0.2.9',
|
|
|
+ 'master_port': 53,
|
|
|
+ 'tsig_key': 'badkey'
|
|
|
+ }
|
|
|
+ ]}
|
|
|
+ self.assertEqual(self.xfr.config_handler(zones)['result'][0], 1)
|
|
|
+ # since this has failed, we should still have the previous config
|
|
|
+ self._check_zones_config(zones2)
|
|
|
|
|
|
|
|
|
def raise_interrupt():
|