|
@@ -32,6 +32,13 @@ def get_acl(prefix):
|
|
|
return REQUEST_LOADER.load('[{"action": "ACCEPT", "from": "' + \
|
|
|
prefix + '"}]')
|
|
|
|
|
|
+def get_acl_json(prefix):
|
|
|
+ '''Same as get_acl, but this function passes a Python representation of
|
|
|
+ JSON to the loader, not a string.'''
|
|
|
+ json = [{"action": "ACCEPT"}]
|
|
|
+ json[0]["from"] = prefix
|
|
|
+ return REQUEST_LOADER.load(json)
|
|
|
+
|
|
|
def get_context(address):
|
|
|
'''This is a simple shortcut wrapper for creating a RequestContext
|
|
|
object with a given IP address. Port number doesn't matter in the test
|
|
@@ -100,11 +107,14 @@ class RequestACLTest(unittest.TestCase):
|
|
|
def test_request_loader(self):
|
|
|
# these shouldn't raise an exception
|
|
|
REQUEST_LOADER.load('[{"action": "DROP"}]')
|
|
|
+ REQUEST_LOADER.load([{"action": "DROP"}])
|
|
|
REQUEST_LOADER.load('[{"action": "DROP", "from": "192.0.2.1"}]')
|
|
|
+ REQUEST_LOADER.load([{"action": "DROP", "from": "192.0.2.1"}])
|
|
|
|
|
|
- # Invalid types
|
|
|
- self.assertRaises(TypeError, REQUEST_LOADER.load, 1)
|
|
|
- self.assertRaises(TypeError, REQUEST_LOADER.load, [])
|
|
|
+ # Invalid types (note that arguments like '1' or '[]' is of valid
|
|
|
+ # 'type' (but syntax error at a higher level)). So we need to use
|
|
|
+ # something that is not really JSON nor string.
|
|
|
+ self.assertRaises(TypeError, REQUEST_LOADER.load, b'')
|
|
|
|
|
|
# Incorrect number of arguments
|
|
|
self.assertRaises(TypeError, REQUEST_LOADER.load,
|
|
@@ -113,66 +123,119 @@ class RequestACLTest(unittest.TestCase):
|
|
|
def test_bad_acl_syntax(self):
|
|
|
# the following are derived from loader_test.cc
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load, '{}');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, {});
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load, '42');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, 42);
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load, 'true');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, True);
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load, 'null');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, None);
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load, '"hello"');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, "hello");
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load, '[42]');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, [42]);
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load, '["hello"]');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, ["hello"]);
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load, '[[]]');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, [[]]);
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load, '[true]');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, [True]);
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load, '[null]');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, [None]);
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load, '[{}]');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, [{}]);
|
|
|
|
|
|
# the following are derived from dns_test.cc
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
'[{"action": "ACCEPT", "bad": "192.0.2.1"}]')
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "ACCEPT", "bad": "192.0.2.1"}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
'[{"action": "ACCEPT", "from": 4}]')
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "ACCEPT", "from": 4}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
'[{"action": "ACCEPT", "from": []}]')
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "ACCEPT", "from": []}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
'[{"action": "ACCEPT", "from": "bad"}]')
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "ACCEPT", "from": "bad"}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
'[{"action": "ACCEPT", "from": null}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "ACCEPT", "from": None}])
|
|
|
|
|
|
def test_bad_acl_ipsyntax(self):
|
|
|
# this test is derived from ip_check_unittest.cc
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
'[{"action": "DROP", "from": "192.0.2.43/-1"}]')
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
- '[{"action": "DROP", "from": "192.0.2.43//1"')
|
|
|
+ [{"action": "DROP", "from": "192.0.2.43/-1"}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "DROP", "from": "192.0.2.43//1"}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "DROP", "from": "192.0.2.43//1"}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "DROP", "from": "192.0.2.43/1/"}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "DROP", "from": "192.0.2.43/1/"}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "DROP", "from": "/192.0.2.43/1"}]')
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
- '[{"action": "DROP", "from": "192.0.2.43/1/"')
|
|
|
+ [{"action": "DROP", "from": "/192.0.2.43/1"}])
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
- '[{"action": "DROP", "from": "/192.0.2.43/1"')
|
|
|
+ '[{"action": "DROP", "from": "2001:db8::/xxxx"}]')
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
- '[{"action": "DROP", "from": "2001:db8::/xxxx"')
|
|
|
+ [{"action": "DROP", "from": "2001:db8::/xxxx"}])
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
- '[{"action": "DROP", "from": "2001:db8::/32/s"')
|
|
|
+ '[{"action": "DROP", "from": "2001:db8::/32/s"}]')
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
- '[{"action": "DROP", "from": "1/"')
|
|
|
+ [{"action": "DROP", "from": "2001:db8::/32/s"}])
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
- '[{"action": "DROP", "from": "/1"')
|
|
|
+ '[{"action": "DROP", "from": "1/"}]')
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
- '[{"action": "DROP", "from": "192.0.2.0/33"')
|
|
|
+ [{"action": "DROP", "from": "1/"}])
|
|
|
self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
- '[{"action": "DROP", "from": "::1/129"')
|
|
|
+ '[{"action": "DROP", "from": "/1"}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "DROP", "from": "/1"}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "DROP", "from": "192.0.2.0/33"}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "DROP", "from": "192.0.2.0/33"}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "DROP", "from": "::1/129"}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "DROP", "from": "::1/129"}])
|
|
|
|
|
|
def test_execute(self):
|
|
|
# tests derived from dns_test.cc. We don't directly expose checks
|
|
|
# in the python wrapper, so we test it via execute().
|
|
|
self.assertEqual(ACCEPT, get_acl('192.0.2.1').execute(CONTEXT4))
|
|
|
+ self.assertEqual(ACCEPT, get_acl_json('192.0.2.1').execute(CONTEXT4))
|
|
|
self.assertEqual(REJECT, get_acl('192.0.2.53').execute(CONTEXT4))
|
|
|
+ self.assertEqual(REJECT, get_acl_json('192.0.2.53').execute(CONTEXT4))
|
|
|
self.assertEqual(ACCEPT, get_acl('192.0.2.0/24').execute(CONTEXT4))
|
|
|
+ self.assertEqual(ACCEPT, get_acl_json('192.0.2.0/24').execute(CONTEXT4))
|
|
|
self.assertEqual(REJECT, get_acl('192.0.1.0/24').execute(CONTEXT4))
|
|
|
+ self.assertEqual(REJECT, get_acl_json('192.0.1.0/24').execute(CONTEXT4))
|
|
|
self.assertEqual(REJECT, get_acl('192.0.1.0/24').execute(CONTEXT4))
|
|
|
+ self.assertEqual(REJECT, get_acl_json('192.0.1.0/24').execute(CONTEXT4))
|
|
|
|
|
|
self.assertEqual(ACCEPT, get_acl('2001:db8::1').execute(CONTEXT6))
|
|
|
+ self.assertEqual(ACCEPT, get_acl_json('2001:db8::1').execute(CONTEXT6))
|
|
|
self.assertEqual(REJECT, get_acl('2001:db8::53').execute(CONTEXT6))
|
|
|
+ self.assertEqual(REJECT, get_acl_json('2001:db8::53').execute(CONTEXT6))
|
|
|
self.assertEqual(ACCEPT, get_acl('2001:db8::/64').execute(CONTEXT6))
|
|
|
+ self.assertEqual(ACCEPT,
|
|
|
+ get_acl_json('2001:db8::/64').execute(CONTEXT6))
|
|
|
self.assertEqual(REJECT, get_acl('2001:db8:1::/64').execute(CONTEXT6))
|
|
|
+ self.assertEqual(REJECT,
|
|
|
+ get_acl_json('2001:db8:1::/64').execute(CONTEXT6))
|
|
|
self.assertEqual(REJECT, get_acl('32.1.13.184').execute(CONTEXT6))
|
|
|
+ self.assertEqual(REJECT, get_acl_json('32.1.13.184').execute(CONTEXT6))
|
|
|
|
|
|
# A bit more complicated example, derived from resolver_config_unittest
|
|
|
acl = REQUEST_LOADER.load('[ {"action": "ACCEPT", ' +
|
|
@@ -187,6 +250,16 @@ class RequestACLTest(unittest.TestCase):
|
|
|
self.assertEqual(DROP, acl.execute(get_context('2001:db8::1')))
|
|
|
self.assertEqual(REJECT, acl.execute(get_context('2001:db8::2')))
|
|
|
|
|
|
+ # same test using the JSON representation
|
|
|
+ acl = REQUEST_LOADER.load([{"action": "ACCEPT", "from": "192.0.2.1"},
|
|
|
+ {"action": "REJECT",
|
|
|
+ "from": "192.0.2.0/24"},
|
|
|
+ {"action": "DROP", "from": "2001:db8::1"}])
|
|
|
+ self.assertEqual(ACCEPT, acl.execute(CONTEXT4))
|
|
|
+ self.assertEqual(REJECT, acl.execute(get_context('192.0.2.2')))
|
|
|
+ self.assertEqual(DROP, acl.execute(get_context('2001:db8::1')))
|
|
|
+ self.assertEqual(REJECT, acl.execute(get_context('2001:db8::2')))
|
|
|
+
|
|
|
def test_bad_execute(self):
|
|
|
acl = get_acl('192.0.2.1')
|
|
|
# missing parameter
|