|
@@ -15,7 +15,7 @@
|
|
|
|
|
|
import unittest
|
|
|
import socket
|
|
|
-from isc.acl.acl import LoaderError, Error
|
|
|
+from isc.acl.acl import LoaderError, Error, ACCEPT, REJECT, DROP
|
|
|
from isc.acl.dns import *
|
|
|
|
|
|
def get_sockaddr(address, port):
|
|
@@ -23,6 +23,24 @@ def get_sockaddr(address, port):
|
|
|
ai = socket.getaddrinfo(address, port, 0, 0, 0, socket.AI_NUMERICHOST)[0]
|
|
|
return ai[4]
|
|
|
|
|
|
+def get_acl(prefix):
|
|
|
+ '''This is a simple shortcut for creating an ACL containing single rule
|
|
|
+ that accepts addresses for the given IP prefix (and reject any others
|
|
|
+ by default)
|
|
|
+ '''
|
|
|
+ return load_request_acl('[{"action": "ACCEPT", "from": "' + prefix + '"}]')
|
|
|
+
|
|
|
+def get_context(address):
|
|
|
+ '''This is a simple shortcut wrapper for creating a RequestContext
|
|
|
+ object with a given IP address. Port number doesn't matter in the test
|
|
|
+ (as of the initial implementation), so it's fixed for simplicity.
|
|
|
+ '''
|
|
|
+ return RequestContext(get_sockaddr(address, 53000))
|
|
|
+
|
|
|
+# These are commonly used RequestContext object
|
|
|
+CONTEXT4 = get_context('192.0.2.1')
|
|
|
+CONTEXT6 = get_context('2001:db8::1')
|
|
|
+
|
|
|
class RequestContextTest(unittest.TestCase):
|
|
|
|
|
|
def test_construct(self):
|
|
@@ -74,6 +92,9 @@ class RequestContextTest(unittest.TestCase):
|
|
|
|
|
|
class RequestACLTest(unittest.TestCase):
|
|
|
|
|
|
+ def test_direct_construct(self):
|
|
|
+ acl = RequestACL()
|
|
|
+
|
|
|
def test_request_loader(self):
|
|
|
# these shouldn't raise an exception
|
|
|
load_request_acl('[{"action": "DROP"}]')
|
|
@@ -88,7 +109,7 @@ class RequestACLTest(unittest.TestCase):
|
|
|
'[{"action": "DROP"}]', 0)
|
|
|
|
|
|
def test_bad_acl_syntax(self):
|
|
|
- # this test is derived from loader_test.cc
|
|
|
+ # the following are derived from loader_test.cc
|
|
|
self.assertRaises(LoaderError, load_request_acl, '{}');
|
|
|
self.assertRaises(LoaderError, load_request_acl, '42');
|
|
|
self.assertRaises(LoaderError, load_request_acl, 'true');
|
|
@@ -101,6 +122,18 @@ class RequestACLTest(unittest.TestCase):
|
|
|
self.assertRaises(LoaderError, load_request_acl, '[null]');
|
|
|
self.assertRaises(LoaderError, load_request_acl, '[{}]');
|
|
|
|
|
|
+ # the following are derived from dns_test.cc
|
|
|
+ self.assertRaises(LoaderError, load_request_acl,
|
|
|
+ '[{"action": "ACCEPT", "bad": "192.0.2.1"}]')
|
|
|
+ self.assertRaises(LoaderError, load_request_acl,
|
|
|
+ '[{"action": "ACCEPT", "from": 4}]')
|
|
|
+ self.assertRaises(LoaderError, load_request_acl,
|
|
|
+ '[{"action": "ACCEPT", "from": []}]')
|
|
|
+ self.assertRaises(LoaderError, load_request_acl,
|
|
|
+ '[{"action": "ACCEPT", "from": "bad"}]')
|
|
|
+ self.assertRaises(LoaderError, load_request_acl,
|
|
|
+ '[{"action": "ACCEPT", "from": null}]')
|
|
|
+
|
|
|
def test_bad_acl_ipsyntax(self):
|
|
|
# this test is derived from ip_check_unittest.cc
|
|
|
self.assertRaises(LoaderError, load_request_acl,
|
|
@@ -124,8 +157,42 @@ class RequestACLTest(unittest.TestCase):
|
|
|
self.assertRaises(LoaderError, load_request_acl,
|
|
|
'[{"action": "DROP", "from": "::1/129"')
|
|
|
|
|
|
- def test_construct(self):
|
|
|
- RequestACL()
|
|
|
+ def test_execute(self):
|
|
|
+ # tests derived from dns_test.cc. We don't directly expose checks
|
|
|
+ # in the python wrapper, so we test it via execute().
|
|
|
+ self.assertEqual(ACCEPT, get_acl('192.0.2.1').execute(CONTEXT4))
|
|
|
+ self.assertEqual(REJECT, get_acl('192.0.2.53').execute(CONTEXT4))
|
|
|
+ self.assertEqual(ACCEPT, get_acl('192.0.2.0/24').execute(CONTEXT4))
|
|
|
+ self.assertEqual(REJECT, get_acl('192.0.1.0/24').execute(CONTEXT4))
|
|
|
+ self.assertEqual(REJECT, get_acl('192.0.1.0/24').execute(CONTEXT4))
|
|
|
+
|
|
|
+ self.assertEqual(ACCEPT, get_acl('2001:db8::1').execute(CONTEXT6))
|
|
|
+ self.assertEqual(REJECT, get_acl('2001:db8::53').execute(CONTEXT6))
|
|
|
+ self.assertEqual(ACCEPT, get_acl('2001:db8::/64').execute(CONTEXT6))
|
|
|
+ self.assertEqual(REJECT, get_acl('2001:db8:1::/64').execute(CONTEXT6))
|
|
|
+ self.assertEqual(REJECT, get_acl('32.1.13.184').execute(CONTEXT6))
|
|
|
+
|
|
|
+ # A bit more complicated example, derived from resolver_config_unittest
|
|
|
+ acl = load_request_acl('[ {"action": "ACCEPT", ' +
|
|
|
+ ' "from": "192.0.2.1"},' +
|
|
|
+ ' {"action": "REJECT",' +
|
|
|
+ ' "from": "192.0.2.0/24"},' +
|
|
|
+ ' {"action": "DROP",' +
|
|
|
+ ' "from": "2001:db8::1"},' +
|
|
|
+ '] }')
|
|
|
+ self.assertEqual(ACCEPT, acl.execute(CONTEXT4))
|
|
|
+ self.assertEqual(REJECT, acl.execute(get_context('192.0.2.2')))
|
|
|
+ self.assertEqual(DROP, acl.execute(get_context('2001:db8::1')))
|
|
|
+ self.assertEqual(REJECT, acl.execute(get_context('2001:db8::2')))
|
|
|
+
|
|
|
+ def test_bad_execute(self):
|
|
|
+ acl = get_acl('192.0.2.1')
|
|
|
+ # missing parameter
|
|
|
+ self.assertRaises(TypeError, acl.execute)
|
|
|
+ # too many parameters
|
|
|
+ self.assertRaises(TypeError, acl.execute, get_context('192.0.2.2'), 0)
|
|
|
+ # type mismatch
|
|
|
+ self.assertRaises(TypeError, acl.execute, 'bad parameter')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
unittest.main()
|