|
@@ -14,15 +14,23 @@
|
|
|
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
|
|
|
|
import isc.log
|
|
|
-import unittest
|
|
|
from isc.dns import *
|
|
|
from isc.datasrc import DataSourceClient
|
|
|
from isc.ddns.zone_config import *
|
|
|
+import isc.acl.dns
|
|
|
+from isc.acl.acl import ACCEPT, REJECT, DROP, LoaderError
|
|
|
+
|
|
|
+import unittest
|
|
|
+import socket
|
|
|
|
|
|
# Some common test parameters
|
|
|
TEST_ZONE_NAME = Name('example.org')
|
|
|
TEST_SECONDARY_ZONE_NAME = Name('example.com')
|
|
|
TEST_RRCLASS = RRClass.IN()
|
|
|
+TEST_TSIG_KEY = TSIGKey("example.com:SFuWd/q99SzF8Yzd1QbB9g==")
|
|
|
+TEST_ACL_CONTEXT = isc.acl.dns.RequestContext(
|
|
|
+ socket.getaddrinfo("192.0.2.1", 1234, 0, socket.SOCK_DGRAM,
|
|
|
+ socket.IPPROTO_UDP, socket.AI_NUMERICHOST)[0][4])
|
|
|
|
|
|
class FakeDataSourceClient:
|
|
|
'''Faked data source client used in the ZoneConfigTest.
|
|
@@ -93,7 +101,7 @@ class ZoneConfigTest(unittest.TestCase):
|
|
|
# empty secondary list doesn't cause any disruption.
|
|
|
zconfig = ZoneConfig([], TEST_RRCLASS, self.__datasrc_client)
|
|
|
self.assertEqual((ZONE_PRIMARY, self.__datasrc_client),
|
|
|
- (self.zconfig.find_zone(TEST_ZONE_NAME, TEST_RRCLASS)))
|
|
|
+ self.zconfig.find_zone(TEST_ZONE_NAME, TEST_RRCLASS))
|
|
|
# adding some mulitle tuples, including subdomainof the test zone name,
|
|
|
# and the same zone name but a different class
|
|
|
zconfig = ZoneConfig([(TEST_SECONDARY_ZONE_NAME, TEST_RRCLASS),
|
|
@@ -102,14 +110,55 @@ class ZoneConfigTest(unittest.TestCase):
|
|
|
(TEST_ZONE_NAME, RRClass.CH())],
|
|
|
TEST_RRCLASS, self.__datasrc_client)
|
|
|
self.assertEqual((ZONE_PRIMARY, self.__datasrc_client),
|
|
|
- (self.zconfig.find_zone(TEST_ZONE_NAME, TEST_RRCLASS)))
|
|
|
+ self.zconfig.find_zone(TEST_ZONE_NAME, TEST_RRCLASS))
|
|
|
# secondary zone list has a duplicate entry, which is just
|
|
|
# (effecitivey) ignored
|
|
|
zconfig = ZoneConfig([(TEST_SECONDARY_ZONE_NAME, TEST_RRCLASS),
|
|
|
(TEST_SECONDARY_ZONE_NAME, TEST_RRCLASS)],
|
|
|
TEST_RRCLASS, self.__datasrc_client)
|
|
|
self.assertEqual((ZONE_PRIMARY, self.__datasrc_client),
|
|
|
- (self.zconfig.find_zone(TEST_ZONE_NAME, TEST_RRCLASS)))
|
|
|
+ self.zconfig.find_zone(TEST_ZONE_NAME, TEST_RRCLASS))
|
|
|
+
|
|
|
+class ACLConfigTest(unittest.TestCase):
|
|
|
+ def setUp(self):
|
|
|
+ self.__datasrc_client = FakeDataSourceClient()
|
|
|
+ self.__zconfig = ZoneConfig([(TEST_SECONDARY_ZONE_NAME, TEST_RRCLASS)],
|
|
|
+ TEST_RRCLASS, self.__datasrc_client)
|
|
|
+
|
|
|
+ def test_get_update_acl(self):
|
|
|
+ # By default, no ACL is set, and the default ACL is "reject all"
|
|
|
+ acl = self.__zconfig.get_update_acl(TEST_ZONE_NAME, TEST_RRCLASS)
|
|
|
+ self.assertEqual(REJECT, acl.execute(TEST_ACL_CONTEXT))
|
|
|
+
|
|
|
+ # Add a map entry that would match the request, and it should now be
|
|
|
+ # accepted.
|
|
|
+ acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
|
|
|
+ REQUEST_LOADER.load([{"action": "ACCEPT"}])}
|
|
|
+ self.__zconfig.set_update_acl_map(acl_map)
|
|
|
+ acl = self.__zconfig.get_update_acl(TEST_ZONE_NAME, TEST_RRCLASS)
|
|
|
+ self.assertEqual(ACCEPT, acl.execute(TEST_ACL_CONTEXT))
|
|
|
+
|
|
|
+ # 'All reject' ACL will still apply for any other zones
|
|
|
+ acl = self.__zconfig.get_update_acl(Name('example.com'), TEST_RRCLASS)
|
|
|
+ self.assertEqual(REJECT, acl.execute(TEST_ACL_CONTEXT))
|
|
|
+ acl = self.__zconfig.get_update_acl(TEST_ZONE_NAME, RRClass.CH())
|
|
|
+ self.assertEqual(REJECT, acl.execute(TEST_ACL_CONTEXT))
|
|
|
+
|
|
|
+ # Test with a map with a few more ACL entries. Should be nothing
|
|
|
+ # special.
|
|
|
+ acl_map = {(Name('example.com'), TEST_RRCLASS):
|
|
|
+ REQUEST_LOADER.load([{"action": "REJECT"}]),
|
|
|
+ (TEST_ZONE_NAME, TEST_RRCLASS):
|
|
|
+ REQUEST_LOADER.load([{"action": "ACCEPT"}]),
|
|
|
+ (TEST_ZONE_NAME, RRClass.CH()):
|
|
|
+ REQUEST_LOADER.load([{"action": "DROP"}])}
|
|
|
+ self.__zconfig.set_update_acl_map(acl_map)
|
|
|
+ acl = self.__zconfig.get_update_acl(TEST_ZONE_NAME, TEST_RRCLASS)
|
|
|
+ self.assertEqual(ACCEPT, acl.execute(TEST_ACL_CONTEXT))
|
|
|
+ acl = self.__zconfig.get_update_acl(Name('example.com'), TEST_RRCLASS)
|
|
|
+ self.assertEqual(REJECT, acl.execute(TEST_ACL_CONTEXT))
|
|
|
+ acl = self.__zconfig.get_update_acl(TEST_ZONE_NAME, RRClass.CH())
|
|
|
+ self.assertEqual(DROP, acl.execute(TEST_ACL_CONTEXT))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
isc.log.init("bind10")
|