|
@@ -17,6 +17,7 @@
|
|
|
|
|
|
from isc.ddns.session import *
|
|
|
from isc.dns import *
|
|
|
+from isc.acl.acl import ACCEPT
|
|
|
import isc.util.cio.socketsession
|
|
|
from isc.datasrc import DataSourceClient
|
|
|
import ddns
|
|
@@ -31,14 +32,19 @@ import unittest
|
|
|
TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
|
|
|
READ_ZONE_DB_FILE = TESTDATA_PATH + "rwtest.sqlite3" # original, to be copied
|
|
|
TEST_ZONE_NAME = Name('example.org')
|
|
|
+TEST_ZONE_NAME_STR = TEST_ZONE_NAME.to_text()
|
|
|
UPDATE_RRTYPE = RRType.SOA()
|
|
|
TEST_QID = 5353 # arbitrary chosen
|
|
|
TEST_RRCLASS = RRClass.IN()
|
|
|
+TEST_RRCLASS_STR = TEST_RRCLASS.to_text()
|
|
|
TEST_SERVER6 = ('2001:db8::53', 53, 0, 0)
|
|
|
TEST_CLIENT6 = ('2001:db8::1', 53000, 0, 0)
|
|
|
TEST_SERVER4 = ('192.0.2.53', 53)
|
|
|
TEST_CLIENT4 = ('192.0.2.1', 53534)
|
|
|
TEST_ZONE_RECORD = Question(TEST_ZONE_NAME, TEST_RRCLASS, UPDATE_RRTYPE)
|
|
|
+TEST_ACL_CONTEXT = isc.acl.dns.RequestContext(
|
|
|
+ socket.getaddrinfo("192.0.2.1", 1234, 0, socket.SOCK_DGRAM,
|
|
|
+ socket.IPPROTO_UDP, socket.AI_NUMERICHOST)[0][4])
|
|
|
# TSIG key for tests when needed. The key name is TEST_ZONE_NAME.
|
|
|
TEST_TSIG_KEY = TSIGKey("example.org:SFuWd/q99SzF8Yzd1QbB9g==")
|
|
|
# TSIG keyring that contanins the test key
|
|
@@ -235,12 +241,92 @@ class TestDDNSServer(unittest.TestCase):
|
|
|
ddns.clear_socket()
|
|
|
self.assertFalse(os.path.exists(ddns.SOCKET_FILE))
|
|
|
|
|
|
+ def test_initial_config(self):
|
|
|
+ # right now, the only configuration is the zone configuration, whose
|
|
|
+ # default should be an empty map.
|
|
|
+ self.assertEqual({}, self.ddns_server._zone_config)
|
|
|
+
|
|
|
def test_config_handler(self):
|
|
|
- # Config handler does not do anything yet, but should at least
|
|
|
- # return 'ok' for now.
|
|
|
- new_config = {}
|
|
|
+ # Update with a simple zone configuration: including an accept-all ACL
|
|
|
+ new_config = { 'zones': [ { 'origin': TEST_ZONE_NAME_STR,
|
|
|
+ 'class': TEST_RRCLASS_STR,
|
|
|
+ 'update_acl': [{'action': 'ACCEPT'}] } ] }
|
|
|
+ answer = self.ddns_server.config_handler(new_config)
|
|
|
+ self.assertEqual((0, None), isc.config.parse_answer(answer))
|
|
|
+ acl = self.ddns_server._zone_config[(TEST_ZONE_NAME, TEST_RRCLASS)]
|
|
|
+ self.assertEqual(ACCEPT, acl.execute(TEST_ACL_CONTEXT))
|
|
|
+
|
|
|
+ # Slightly more complicated one: containing multiple ACLs
|
|
|
+ new_config = { 'zones': [ { 'origin': 'example.com',
|
|
|
+ 'class': 'CH',
|
|
|
+ 'update_acl': [{'action': 'REJECT',
|
|
|
+ 'from': '2001:db8::1'}] },
|
|
|
+ { 'origin': TEST_ZONE_NAME_STR,
|
|
|
+ 'class': TEST_RRCLASS_STR,
|
|
|
+ 'update_acl': [{'action': 'ACCEPT'}] },
|
|
|
+ { 'origin': 'example.org',
|
|
|
+ 'class': 'CH',
|
|
|
+ 'update_acl': [{'action': 'DROP'}] } ] }
|
|
|
answer = self.ddns_server.config_handler(new_config)
|
|
|
self.assertEqual((0, None), isc.config.parse_answer(answer))
|
|
|
+ self.assertEqual(3, len(self.ddns_server._zone_config))
|
|
|
+ acl = self.ddns_server._zone_config[(TEST_ZONE_NAME, TEST_RRCLASS)]
|
|
|
+ self.assertEqual(ACCEPT, acl.execute(TEST_ACL_CONTEXT))
|
|
|
+
|
|
|
+ # empty zone config
|
|
|
+ new_config = { 'zones': [] }
|
|
|
+ answer = self.ddns_server.config_handler(new_config)
|
|
|
+ self.assertEqual((0, None), isc.config.parse_answer(answer))
|
|
|
+ self.assertEqual({}, self.ddns_server._zone_config)
|
|
|
+
|
|
|
+ # bad zone config data: bad name. The previous config shouls be kept.
|
|
|
+ bad_config = { 'zones': [ { 'origin': 'bad..example',
|
|
|
+ 'class': TEST_RRCLASS_STR,
|
|
|
+ 'update_acl': [{'action': 'ACCEPT'}] } ] }
|
|
|
+ answer = self.ddns_server.config_handler(bad_config)
|
|
|
+ self.assertEqual(1, isc.config.parse_answer(answer)[0])
|
|
|
+ self.assertEqual({}, self.ddns_server._zone_config)
|
|
|
+
|
|
|
+ # bad zone config data: bad class.
|
|
|
+ bad_config = { 'zones': [ { 'origin': TEST_ZONE_NAME_STR,
|
|
|
+ 'class': 'badclass',
|
|
|
+ 'update_acl': [{'action': 'ACCEPT'}] } ] }
|
|
|
+ answer = self.ddns_server.config_handler(bad_config)
|
|
|
+ self.assertEqual(1, isc.config.parse_answer(answer)[0])
|
|
|
+ self.assertEqual({}, self.ddns_server._zone_config)
|
|
|
+
|
|
|
+ # bad zone config data: bad ACL.
|
|
|
+ bad_config = { 'zones': [ { 'origin': TEST_ZONE_NAME_STR,
|
|
|
+ 'class': TEST_RRCLASS_STR,
|
|
|
+ 'update_acl': [{'action': 'badaction'}]}]}
|
|
|
+ answer = self.ddns_server.config_handler(bad_config)
|
|
|
+ self.assertEqual(1, isc.config.parse_answer(answer)[0])
|
|
|
+ self.assertEqual({}, self.ddns_server._zone_config)
|
|
|
+
|
|
|
+ # the first zone cofig is valid, but not the second. the first one
|
|
|
+ # shouldn't be installed.
|
|
|
+ bad_config = { 'zones': [ { 'origin': TEST_ZONE_NAME_STR,
|
|
|
+ 'class': TEST_RRCLASS_STR,
|
|
|
+ 'update_acl': [{'action': 'ACCEPT'}] },
|
|
|
+ { 'origin': 'bad..example',
|
|
|
+ 'class': TEST_RRCLASS_STR,
|
|
|
+ 'update_acl': [{'action': 'ACCEPT'}] } ] }
|
|
|
+ answer = self.ddns_server.config_handler(bad_config)
|
|
|
+ self.assertEqual(1, isc.config.parse_answer(answer)[0])
|
|
|
+ self.assertEqual({}, self.ddns_server._zone_config)
|
|
|
+
|
|
|
+ # Half-broken case: 'origin, class' pair is duplicate. For now we
|
|
|
+ # we accept it (the latter one will win)
|
|
|
+ dup_config = { 'zones': [ { 'origin': TEST_ZONE_NAME_STR,
|
|
|
+ 'class': TEST_RRCLASS_STR,
|
|
|
+ 'update_acl': [{'action': 'REJECT'}] },
|
|
|
+ { 'origin': TEST_ZONE_NAME_STR,
|
|
|
+ 'class': TEST_RRCLASS_STR,
|
|
|
+ 'update_acl': [{'action': 'ACCEPT'}] } ] }
|
|
|
+ answer = self.ddns_server.config_handler(dup_config)
|
|
|
+ self.assertEqual((0, None), isc.config.parse_answer(answer))
|
|
|
+ acl = self.ddns_server._zone_config[(TEST_ZONE_NAME, TEST_RRCLASS)]
|
|
|
+ self.assertEqual(ACCEPT, acl.execute(TEST_ACL_CONTEXT))
|
|
|
|
|
|
def test_shutdown_command(self):
|
|
|
'''Test whether the shutdown command works'''
|
|
@@ -628,6 +714,7 @@ class TestMain(unittest.TestCase):
|
|
|
self.assertTrue(self._server.exception_raised)
|
|
|
|
|
|
class TestConfig(unittest.TestCase):
|
|
|
+ '''Test some simple config related things that don't need server. '''
|
|
|
def setUp(self):
|
|
|
self.__ccsession = MyCCSession()
|
|
|
|