|
@@ -0,0 +1,280 @@
|
|
|
+# Copyright (C) 2011 Internet Systems Consortium.
|
|
|
+#
|
|
|
+# Permission to use, copy, modify, and distribute this software for any
|
|
|
+# purpose with or without fee is hereby granted, provided that the above
|
|
|
+# copyright notice and this permission notice appear in all copies.
|
|
|
+#
|
|
|
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
|
|
|
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
|
|
|
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
|
|
|
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
|
|
|
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
|
|
|
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
|
|
|
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
|
|
|
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
|
+
|
|
|
+import unittest
|
|
|
+import socket
|
|
|
+from isc.acl.acl import LoaderError, Error, ACCEPT, REJECT, DROP
|
|
|
+from isc.acl.dns import *
|
|
|
+
|
|
|
+def get_sockaddr(address, port):
|
|
|
+ '''This is a simple shortcut wrapper for getaddrinfo'''
|
|
|
+ ai = socket.getaddrinfo(address, port, 0, socket.SOCK_DGRAM,
|
|
|
+ socket.IPPROTO_UDP, socket.AI_NUMERICHOST)[0]
|
|
|
+ return ai[4]
|
|
|
+
|
|
|
+def get_acl(prefix):
|
|
|
+ '''This is a simple shortcut for creating an ACL containing single rule
|
|
|
+ that accepts addresses for the given IP prefix (and reject any others
|
|
|
+ by default)
|
|
|
+ '''
|
|
|
+ return REQUEST_LOADER.load('[{"action": "ACCEPT", "from": "' + \
|
|
|
+ prefix + '"}]')
|
|
|
+
|
|
|
+def get_acl_json(prefix):
|
|
|
+ '''Same as get_acl, but this function passes a Python representation of
|
|
|
+ JSON to the loader, not a string.'''
|
|
|
+ json = [{"action": "ACCEPT"}]
|
|
|
+ json[0]["from"] = prefix
|
|
|
+ return REQUEST_LOADER.load(json)
|
|
|
+
|
|
|
+def get_context(address):
|
|
|
+ '''This is a simple shortcut wrapper for creating a RequestContext
|
|
|
+ object with a given IP address. Port number doesn't matter in the test
|
|
|
+ (as of the initial implementation), so it's fixed for simplicity.
|
|
|
+ '''
|
|
|
+ return RequestContext(get_sockaddr(address, 53000))
|
|
|
+
|
|
|
+# These are commonly used RequestContext object
|
|
|
+CONTEXT4 = get_context('192.0.2.1')
|
|
|
+CONTEXT6 = get_context('2001:db8::1')
|
|
|
+
|
|
|
+class RequestContextTest(unittest.TestCase):
|
|
|
+
|
|
|
+ def test_construct(self):
|
|
|
+ # Construct the context from IPv4/IPv6 addresses, check the object
|
|
|
+ # by printing it.
|
|
|
+ self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
|
|
|
+ 'remote_addr=[192.0.2.1]:53001>',
|
|
|
+ RequestContext(('192.0.2.1', 53001)).__str__())
|
|
|
+ self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
|
|
|
+ 'remote_addr=[2001:db8::1234]:53006>',
|
|
|
+ RequestContext(('2001:db8::1234', 53006,
|
|
|
+ 0, 0)).__str__())
|
|
|
+
|
|
|
+ # Unusual case: port number overflows (this constructor allows that,
|
|
|
+ # although it should be rare anyway; the socket address should
|
|
|
+ # normally come from the Python socket module.
|
|
|
+ self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
|
|
|
+ 'remote_addr=[192.0.2.1]:0>',
|
|
|
+ RequestContext(('192.0.2.1', 65536)).__str__())
|
|
|
+
|
|
|
+ # same test using socket.getaddrinfo() to ensure it accepts the sock
|
|
|
+ # address representation used in the Python socket module.
|
|
|
+ self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
|
|
|
+ 'remote_addr=[192.0.2.1]:53001>',
|
|
|
+ RequestContext(get_sockaddr('192.0.2.1',
|
|
|
+ 53001)).__str__())
|
|
|
+ self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
|
|
|
+ 'remote_addr=[2001:db8::1234]:53006>',
|
|
|
+ RequestContext(get_sockaddr('2001:db8::1234',
|
|
|
+ 53006)).__str__())
|
|
|
+
|
|
|
+ #
|
|
|
+ # Invalid parameters (in our expected usage this should not happen
|
|
|
+ # because the sockaddr would come from the Python socket module, but
|
|
|
+ # validation should still be performed correctly)
|
|
|
+ #
|
|
|
+ # not a tuple
|
|
|
+ self.assertRaises(TypeError, RequestContext, 1)
|
|
|
+ # invalid number of parameters
|
|
|
+ self.assertRaises(TypeError, RequestContext, ('192.0.2.1', 53), 0)
|
|
|
+ # tuple is not in the form of sockaddr
|
|
|
+ self.assertRaises(TypeError, RequestContext, (0, 53))
|
|
|
+ self.assertRaises(TypeError, RequestContext, ('192.0.2.1', 'http'))
|
|
|
+ self.assertRaises(TypeError, RequestContext, ('::', 0, 'flow', 0))
|
|
|
+ # invalid address
|
|
|
+ self.assertRaises(Error, RequestContext, ('example.com', 5300))
|
|
|
+ self.assertRaises(Error, RequestContext, ('192.0.2.1.1', 5300))
|
|
|
+ self.assertRaises(Error, RequestContext, ('2001:db8:::1', 5300))
|
|
|
+
|
|
|
+class RequestACLTest(unittest.TestCase):
|
|
|
+
|
|
|
+ def test_direct_construct(self):
|
|
|
+ self.assertRaises(Error, RequestACL)
|
|
|
+
|
|
|
+ def test_request_loader(self):
|
|
|
+ # these shouldn't raise an exception
|
|
|
+ REQUEST_LOADER.load('[{"action": "DROP"}]')
|
|
|
+ REQUEST_LOADER.load([{"action": "DROP"}])
|
|
|
+ REQUEST_LOADER.load('[{"action": "DROP", "from": "192.0.2.1"}]')
|
|
|
+ REQUEST_LOADER.load([{"action": "DROP", "from": "192.0.2.1"}])
|
|
|
+
|
|
|
+ # Invalid types (note that arguments like '1' or '[]' is of valid
|
|
|
+ # 'type' (but syntax error at a higher level)). So we need to use
|
|
|
+ # something that is not really JSON nor string.
|
|
|
+ self.assertRaises(TypeError, REQUEST_LOADER.load, b'')
|
|
|
+
|
|
|
+ # Incorrect number of arguments
|
|
|
+ self.assertRaises(TypeError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "DROP"}]', 0)
|
|
|
+
|
|
|
+ def test_bad_acl_syntax(self):
|
|
|
+ # the following are derived from loader_test.cc
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, '{}');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, {});
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, '42');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, 42);
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, 'true');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, True);
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, 'null');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, None);
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, '"hello"');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, "hello");
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, '[42]');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, [42]);
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, '["hello"]');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, ["hello"]);
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, '[[]]');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, [[]]);
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, '[true]');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, [True]);
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, '[null]');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, [None]);
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, '[{}]');
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load, [{}]);
|
|
|
+
|
|
|
+ # the following are derived from dns_test.cc
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "ACCEPT", "bad": "192.0.2.1"}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "ACCEPT", "bad": "192.0.2.1"}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "ACCEPT", "from": 4}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "ACCEPT", "from": 4}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "ACCEPT", "from": []}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "ACCEPT", "from": []}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "ACCEPT", "from": "bad"}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "ACCEPT", "from": "bad"}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "ACCEPT", "from": null}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "ACCEPT", "from": None}])
|
|
|
+
|
|
|
+ def test_bad_acl_ipsyntax(self):
|
|
|
+ # this test is derived from ip_check_unittest.cc
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "DROP", "from": "192.0.2.43/-1"}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "DROP", "from": "192.0.2.43/-1"}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "DROP", "from": "192.0.2.43//1"}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "DROP", "from": "192.0.2.43//1"}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "DROP", "from": "192.0.2.43/1/"}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "DROP", "from": "192.0.2.43/1/"}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "DROP", "from": "/192.0.2.43/1"}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "DROP", "from": "/192.0.2.43/1"}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "DROP", "from": "2001:db8::/xxxx"}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "DROP", "from": "2001:db8::/xxxx"}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "DROP", "from": "2001:db8::/32/s"}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "DROP", "from": "2001:db8::/32/s"}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "DROP", "from": "1/"}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "DROP", "from": "1/"}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "DROP", "from": "/1"}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "DROP", "from": "/1"}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "DROP", "from": "192.0.2.0/33"}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "DROP", "from": "192.0.2.0/33"}])
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ '[{"action": "DROP", "from": "::1/129"}]')
|
|
|
+ self.assertRaises(LoaderError, REQUEST_LOADER.load,
|
|
|
+ [{"action": "DROP", "from": "::1/129"}])
|
|
|
+
|
|
|
+ def test_execute(self):
|
|
|
+ # tests derived from dns_test.cc. We don't directly expose checks
|
|
|
+ # in the python wrapper, so we test it via execute().
|
|
|
+ self.assertEqual(ACCEPT, get_acl('192.0.2.1').execute(CONTEXT4))
|
|
|
+ self.assertEqual(ACCEPT, get_acl_json('192.0.2.1').execute(CONTEXT4))
|
|
|
+ self.assertEqual(REJECT, get_acl('192.0.2.53').execute(CONTEXT4))
|
|
|
+ self.assertEqual(REJECT, get_acl_json('192.0.2.53').execute(CONTEXT4))
|
|
|
+ self.assertEqual(ACCEPT, get_acl('192.0.2.0/24').execute(CONTEXT4))
|
|
|
+ self.assertEqual(ACCEPT, get_acl_json('192.0.2.0/24').execute(CONTEXT4))
|
|
|
+ self.assertEqual(REJECT, get_acl('192.0.1.0/24').execute(CONTEXT4))
|
|
|
+ self.assertEqual(REJECT, get_acl_json('192.0.1.0/24').execute(CONTEXT4))
|
|
|
+ self.assertEqual(REJECT, get_acl('192.0.1.0/24').execute(CONTEXT4))
|
|
|
+ self.assertEqual(REJECT, get_acl_json('192.0.1.0/24').execute(CONTEXT4))
|
|
|
+
|
|
|
+ self.assertEqual(ACCEPT, get_acl('2001:db8::1').execute(CONTEXT6))
|
|
|
+ self.assertEqual(ACCEPT, get_acl_json('2001:db8::1').execute(CONTEXT6))
|
|
|
+ self.assertEqual(REJECT, get_acl('2001:db8::53').execute(CONTEXT6))
|
|
|
+ self.assertEqual(REJECT, get_acl_json('2001:db8::53').execute(CONTEXT6))
|
|
|
+ self.assertEqual(ACCEPT, get_acl('2001:db8::/64').execute(CONTEXT6))
|
|
|
+ self.assertEqual(ACCEPT,
|
|
|
+ get_acl_json('2001:db8::/64').execute(CONTEXT6))
|
|
|
+ self.assertEqual(REJECT, get_acl('2001:db8:1::/64').execute(CONTEXT6))
|
|
|
+ self.assertEqual(REJECT,
|
|
|
+ get_acl_json('2001:db8:1::/64').execute(CONTEXT6))
|
|
|
+ self.assertEqual(REJECT, get_acl('32.1.13.184').execute(CONTEXT6))
|
|
|
+ self.assertEqual(REJECT, get_acl_json('32.1.13.184').execute(CONTEXT6))
|
|
|
+
|
|
|
+ # A bit more complicated example, derived from resolver_config_unittest
|
|
|
+ acl = REQUEST_LOADER.load('[ {"action": "ACCEPT", ' +
|
|
|
+ ' "from": "192.0.2.1"},' +
|
|
|
+ ' {"action": "REJECT",' +
|
|
|
+ ' "from": "192.0.2.0/24"},' +
|
|
|
+ ' {"action": "DROP",' +
|
|
|
+ ' "from": "2001:db8::1"},' +
|
|
|
+ '] }')
|
|
|
+ self.assertEqual(ACCEPT, acl.execute(CONTEXT4))
|
|
|
+ self.assertEqual(REJECT, acl.execute(get_context('192.0.2.2')))
|
|
|
+ self.assertEqual(DROP, acl.execute(get_context('2001:db8::1')))
|
|
|
+ self.assertEqual(REJECT, acl.execute(get_context('2001:db8::2')))
|
|
|
+
|
|
|
+ # same test using the JSON representation
|
|
|
+ acl = REQUEST_LOADER.load([{"action": "ACCEPT", "from": "192.0.2.1"},
|
|
|
+ {"action": "REJECT",
|
|
|
+ "from": "192.0.2.0/24"},
|
|
|
+ {"action": "DROP", "from": "2001:db8::1"}])
|
|
|
+ self.assertEqual(ACCEPT, acl.execute(CONTEXT4))
|
|
|
+ self.assertEqual(REJECT, acl.execute(get_context('192.0.2.2')))
|
|
|
+ self.assertEqual(DROP, acl.execute(get_context('2001:db8::1')))
|
|
|
+ self.assertEqual(REJECT, acl.execute(get_context('2001:db8::2')))
|
|
|
+
|
|
|
+ def test_bad_execute(self):
|
|
|
+ acl = get_acl('192.0.2.1')
|
|
|
+ # missing parameter
|
|
|
+ self.assertRaises(TypeError, acl.execute)
|
|
|
+ # too many parameters
|
|
|
+ self.assertRaises(TypeError, acl.execute, get_context('192.0.2.2'), 0)
|
|
|
+ # type mismatch
|
|
|
+ self.assertRaises(TypeError, acl.execute, 'bad parameter')
|
|
|
+
|
|
|
+class RequestLoaderTest(unittest.TestCase):
|
|
|
+ # Note: loading ACLs is tested in other test cases.
|
|
|
+
|
|
|
+ def test_construct(self):
|
|
|
+ # at least for now, we don't allow direct construction.
|
|
|
+ self.assertRaises(Error, RequestLoader)
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ unittest.main()
|