|
@@ -0,0 +1,280 @@
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+import unittest
|
|
|
|
+import socket
|
|
|
|
+from isc.acl.acl import LoaderError, Error, ACCEPT, REJECT, DROP
|
|
|
|
+from isc.acl.dns import *
|
|
|
|
+
|
|
|
|
+def get_sockaddr(address, port):
|
|
|
|
+ '''This is a simple shortcut wrapper for getaddrinfo'''
|
|
|
|
+ ai = socket.getaddrinfo(address, port, 0, socket.SOCK_DGRAM,
|
|
|
|
+ socket.IPPROTO_UDP, socket.AI_NUMERICHOST)[0]
|
|
|
|
+ return ai[4]
|
|
|
|
+
|
|
|
|
+def get_acl(prefix):
|
|
|
|
+ '''This is a simple shortcut for creating an ACL containing single rule
|
|
|
|
+ that accepts addresses for the given IP prefix (and reject any others
|
|
|
|
+ by default)
|
|
|
|
+ '''
|
|
|
|
+ return REQUEST_LOADER.load('[{"action": "ACCEPT", "from": "' + \
|
|
|
|
+ prefix + '"}]')
|
|
|
|
+
|
|
|
|
+def get_acl_json(prefix):
|
|
|
|
+ '''Same as get_acl, but this function passes a Python representation of
|
|
|
|
+ JSON to the loader, not a string.'''
|
|
|
|
+ json = [{"action": "ACCEPT"}]
|
|
|
|
+ json[0]["from"] = prefix
|
|
|
|
+ return REQUEST_LOADER.load(json)
|
|
|
|
+
|
|
|
|
+def get_context(address):
|
|
|
|
+ '''This is a simple shortcut wrapper for creating a RequestContext
|
|
|
|
+ object with a given IP address. Port number doesn't matter in the test
|
|
|
|
+ (as of the initial implementation), so it's fixed for simplicity.
|
|
|
|
+ '''
|
|
|
|
+ return RequestContext(get_sockaddr(address, 53000))
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+CONTEXT4 = get_context('192.0.2.1')
|
|
|
|
+CONTEXT6 = get_context('2001:db8::1')
|
|
|
|
+
|
|
|
|
+class RequestContextTest(unittest.TestCase):
|
|
|
|
+
|
|
|
|
+ def test_construct(self):
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
|
|
|
|
+ 'remote_addr=[192.0.2.1]:53001>',
|
|
|
|
+ RequestContext(('192.0.2.1', 53001)).__str__())
|
|
|
|
+ self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
|
|
|
|
+ 'remote_addr=[2001:db8::1234]:53006>',
|
|
|
|
+ RequestContext(('2001:db8::1234', 53006,
|
|
|
|
+ 0, 0)).__str__())
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
|
|
|
|
+ 'remote_addr=[192.0.2.1]:0>',
|
|
|
|
+ RequestContext(('192.0.2.1', 65536)).__str__())
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
|
|
|
|
+ 'remote_addr=[192.0.2.1]:53001>',
|
|
|
|
+ RequestContext(get_sockaddr('192.0.2.1',
|
|
|
|
+ 53001)).__str__())
|
|
|
|
+ self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
|
|
|
|
+ 'remote_addr=[2001:db8::1234]:53006>',
|
|
|
|
+ RequestContext(get_sockaddr('2001:db8::1234',
|
|
|
|
+ 53006)).__str__())
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ self.assertRaises(TypeError, RequestContext, 1)
|
|
|
|
+
|
|
|
|
+ self.assertRaises(TypeError, RequestContext, ('192.0.2.1', 53), 0)
|
|
|
|
+
|
|
|
|
+ self.assertRaises(TypeError, RequestContext, (0, 53))
|
|
|
|
+ self.assertRaises(TypeError, RequestContext, ('192.0.2.1', 'http'))
|
|
|
|
+ self.assertRaises(TypeError, RequestContext, ('::', 0, 'flow', 0))
|
|
|
|
+
|
|
|
|
+ self.assertRaises(Error, RequestContext, ('example.com', 5300))
|
|
|
|
+ self.assertRaises(Error, RequestContext, ('192.0.2.1.1', 5300))
|
|
|
|
+ self.assertRaises(Error, RequestContext, ('2001:db8:::1', 5300))
|
|
|
|
+
|
|
|
|
+class RequestACLTest(unittest.TestCase):
|
|
|
|
+
|
|
|
|
+ def test_direct_construct(self):
|
|
|
|
+ self.assertRaises(Error, RequestACL)
|
|
|
|
+
|
|
|
|
+ def test_request_loader(self):
|
|
|
|
+
|
|
|
|
+ REQUEST_LOADER.load('[{"action": "DROP"}]')
|
|
|
|
+ REQUEST_LOADER.load([{"action": "DROP"}])
|
|
|
|
+ REQUEST_LOADER.load('[{"action": "DROP", "from": "192.0.2.1"}]')
|
|
|
|
+ REQUEST_LOADER.load([{"action": "DROP", "from": "192.0.2.1"}])
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ self.assertRaises(TypeError, REQUEST_LOADER.load, b'')
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ self.assertRaises(TypeError, REQUEST_LOADER.load,
|
|
|
|
+ '[{"action": "DROP"}]', 0)
|
|
|
|
+
|
|
|
|
+ def test_bad_acl_syntax(self):
|
|
|
|
+
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, '{}');
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, {});
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, '42');
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, 42);
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, 'true');
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, True);
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, 'null');
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, None);
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, '"hello"');
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, "hello");
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, '[42]');
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, [42]);
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, '["hello"]');
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, ["hello"]);
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, '[[]]');
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, [[]]);
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, '[true]');
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, [True]);
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, '[null]');
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, [None]);
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, '[{}]');
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, [{}]);
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ '[{"action": "ACCEPT", "bad": "192.0.2.1"}]')
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ [{"action": "ACCEPT", "bad": "192.0.2.1"}])
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ '[{"action": "ACCEPT", "from": 4}]')
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ [{"action": "ACCEPT", "from": 4}])
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ '[{"action": "ACCEPT", "from": []}]')
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ [{"action": "ACCEPT", "from": []}])
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ '[{"action": "ACCEPT", "from": "bad"}]')
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ [{"action": "ACCEPT", "from": "bad"}])
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ '[{"action": "ACCEPT", "from": null}]')
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ [{"action": "ACCEPT", "from": None}])
|
|
|
|
+
|
|
|
|
+ def test_bad_acl_ipsyntax(self):
|
|
|
|
+
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ '[{"action": "DROP", "from": "192.0.2.43/-1"}]')
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ [{"action": "DROP", "from": "192.0.2.43/-1"}])
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ '[{"action": "DROP", "from": "192.0.2.43//1"}]')
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ [{"action": "DROP", "from": "192.0.2.43//1"}])
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ '[{"action": "DROP", "from": "192.0.2.43/1/"}]')
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ [{"action": "DROP", "from": "192.0.2.43/1/"}])
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ '[{"action": "DROP", "from": "/192.0.2.43/1"}]')
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ [{"action": "DROP", "from": "/192.0.2.43/1"}])
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ '[{"action": "DROP", "from": "2001:db8::/xxxx"}]')
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ [{"action": "DROP", "from": "2001:db8::/xxxx"}])
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ '[{"action": "DROP", "from": "2001:db8::/32/s"}]')
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ [{"action": "DROP", "from": "2001:db8::/32/s"}])
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ '[{"action": "DROP", "from": "1/"}]')
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ [{"action": "DROP", "from": "1/"}])
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ '[{"action": "DROP", "from": "/1"}]')
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ [{"action": "DROP", "from": "/1"}])
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ '[{"action": "DROP", "from": "192.0.2.0/33"}]')
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ [{"action": "DROP", "from": "192.0.2.0/33"}])
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ '[{"action": "DROP", "from": "::1/129"}]')
|
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
|
+ [{"action": "DROP", "from": "::1/129"}])
|
|
|
|
+
|
|
|
|
+ def test_execute(self):
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ self.assertEqual(ACCEPT, get_acl('192.0.2.1').execute(CONTEXT4))
|
|
|
|
+ self.assertEqual(ACCEPT, get_acl_json('192.0.2.1').execute(CONTEXT4))
|
|
|
|
+ self.assertEqual(REJECT, get_acl('192.0.2.53').execute(CONTEXT4))
|
|
|
|
+ self.assertEqual(REJECT, get_acl_json('192.0.2.53').execute(CONTEXT4))
|
|
|
|
+ self.assertEqual(ACCEPT, get_acl('192.0.2.0/24').execute(CONTEXT4))
|
|
|
|
+ self.assertEqual(ACCEPT, get_acl_json('192.0.2.0/24').execute(CONTEXT4))
|
|
|
|
+ self.assertEqual(REJECT, get_acl('192.0.1.0/24').execute(CONTEXT4))
|
|
|
|
+ self.assertEqual(REJECT, get_acl_json('192.0.1.0/24').execute(CONTEXT4))
|
|
|
|
+ self.assertEqual(REJECT, get_acl('192.0.1.0/24').execute(CONTEXT4))
|
|
|
|
+ self.assertEqual(REJECT, get_acl_json('192.0.1.0/24').execute(CONTEXT4))
|
|
|
|
+
|
|
|
|
+ self.assertEqual(ACCEPT, get_acl('2001:db8::1').execute(CONTEXT6))
|
|
|
|
+ self.assertEqual(ACCEPT, get_acl_json('2001:db8::1').execute(CONTEXT6))
|
|
|
|
+ self.assertEqual(REJECT, get_acl('2001:db8::53').execute(CONTEXT6))
|
|
|
|
+ self.assertEqual(REJECT, get_acl_json('2001:db8::53').execute(CONTEXT6))
|
|
|
|
+ self.assertEqual(ACCEPT, get_acl('2001:db8::/64').execute(CONTEXT6))
|
|
|
|
+ self.assertEqual(ACCEPT,
|
|
|
|
+ get_acl_json('2001:db8::/64').execute(CONTEXT6))
|
|
|
|
+ self.assertEqual(REJECT, get_acl('2001:db8:1::/64').execute(CONTEXT6))
|
|
|
|
+ self.assertEqual(REJECT,
|
|
|
|
+ get_acl_json('2001:db8:1::/64').execute(CONTEXT6))
|
|
|
|
+ self.assertEqual(REJECT, get_acl('32.1.13.184').execute(CONTEXT6))
|
|
|
|
+ self.assertEqual(REJECT, get_acl_json('32.1.13.184').execute(CONTEXT6))
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ acl = REQUEST_LOADER.load('[ {"action": "ACCEPT", ' +
|
|
|
|
+ ' "from": "192.0.2.1"},' +
|
|
|
|
+ ' {"action": "REJECT",' +
|
|
|
|
+ ' "from": "192.0.2.0/24"},' +
|
|
|
|
+ ' {"action": "DROP",' +
|
|
|
|
+ ' "from": "2001:db8::1"},' +
|
|
|
|
+ '] }')
|
|
|
|
+ self.assertEqual(ACCEPT, acl.execute(CONTEXT4))
|
|
|
|
+ self.assertEqual(REJECT, acl.execute(get_context('192.0.2.2')))
|
|
|
|
+ self.assertEqual(DROP, acl.execute(get_context('2001:db8::1')))
|
|
|
|
+ self.assertEqual(REJECT, acl.execute(get_context('2001:db8::2')))
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ acl = REQUEST_LOADER.load([{"action": "ACCEPT", "from": "192.0.2.1"},
|
|
|
|
+ {"action": "REJECT",
|
|
|
|
+ "from": "192.0.2.0/24"},
|
|
|
|
+ {"action": "DROP", "from": "2001:db8::1"}])
|
|
|
|
+ self.assertEqual(ACCEPT, acl.execute(CONTEXT4))
|
|
|
|
+ self.assertEqual(REJECT, acl.execute(get_context('192.0.2.2')))
|
|
|
|
+ self.assertEqual(DROP, acl.execute(get_context('2001:db8::1')))
|
|
|
|
+ self.assertEqual(REJECT, acl.execute(get_context('2001:db8::2')))
|
|
|
|
+
|
|
|
|
+ def test_bad_execute(self):
|
|
|
|
+ acl = get_acl('192.0.2.1')
|
|
|
|
+
|
|
|
|
+ self.assertRaises(TypeError, acl.execute)
|
|
|
|
+
|
|
|
|
+ self.assertRaises(TypeError, acl.execute, get_context('192.0.2.2'), 0)
|
|
|
|
+
|
|
|
|
+ self.assertRaises(TypeError, acl.execute, 'bad parameter')
|
|
|
|
+
|
|
|
|
+class RequestLoaderTest(unittest.TestCase):
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ def test_construct(self):
|
|
|
|
+
|
|
|
|
+ self.assertRaises(Error, RequestLoader)
|
|
|
|
+
|
|
|
|
+if __name__ == '__main__':
|
|
|
|
+ unittest.main()
|