123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- # Copyright (C) 2011 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- import unittest
- import socket
- from isc.acl.acl import LoaderError, Error, ACCEPT, REJECT, DROP
- from isc.acl.dns import *
- def get_sockaddr(address, port):
- '''This is a simple shortcut wrapper for getaddrinfo'''
- ai = socket.getaddrinfo(address, port, 0, 0, 0, socket.AI_NUMERICHOST)[0]
- return ai[4]
- def get_acl(prefix):
- '''This is a simple shortcut for creating an ACL containing single rule
- that accepts addresses for the given IP prefix (and reject any others
- by default)
- '''
- return load_request_acl('[{"action": "ACCEPT", "from": "' + prefix + '"}]')
- def get_context(address):
- '''This is a simple shortcut wrapper for creating a RequestContext
- object with a given IP address. Port number doesn't matter in the test
- (as of the initial implementation), so it's fixed for simplicity.
- '''
- return RequestContext(get_sockaddr(address, 53000))
- # These are commonly used RequestContext object
- CONTEXT4 = get_context('192.0.2.1')
- CONTEXT6 = get_context('2001:db8::1')
- class RequestContextTest(unittest.TestCase):
- def test_construct(self):
- # Construct the context from IPv4/IPv6 addresses, check the object
- # by printing it.
- self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
- 'remote_addr=[192.0.2.1]:53001>',
- RequestContext(('192.0.2.1', 53001)).__str__())
- self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
- 'remote_addr=[2001:db8::1234]:53006>',
- RequestContext(('2001:db8::1234', 53006,
- 0, 0)).__str__())
- # Unusual case: port number overflows (this constructor allows that,
- # although it should be rare anyway; the socket address should
- # normally come from the Python socket module.
- self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
- 'remote_addr=[192.0.2.1]:0>',
- RequestContext(('192.0.2.1', 65536)).__str__())
- # same test using socket.getaddrinfo() to ensure it accepts the sock
- # address representation used in the Python socket module.
- self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
- 'remote_addr=[192.0.2.1]:53001>',
- RequestContext(get_sockaddr('192.0.2.1',
- 53001)).__str__())
- self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
- 'remote_addr=[2001:db8::1234]:53006>',
- RequestContext(get_sockaddr('2001:db8::1234',
- 53006)).__str__())
- #
- # Invalid parameters (in our expected usage this should not happen
- # because the sockaddr would come from the Python socket module, but
- # validation should still be performed correctly)
- #
- # not a tuple
- self.assertRaises(TypeError, RequestContext, 1)
- # invalid number of parameters
- self.assertRaises(TypeError, RequestContext, ('192.0.2.1', 53), 0)
- # tuple is not in the form of sockaddr
- self.assertRaises(TypeError, RequestContext, (0, 53))
- self.assertRaises(TypeError, RequestContext, ('192.0.2.1', 'http'))
- self.assertRaises(TypeError, RequestContext, ('::', 0, 'flow', 0))
- # invalid address
- self.assertRaises(Error, RequestContext, ('example.com', 5300))
- self.assertRaises(Error, RequestContext, ('192.0.2.1.1', 5300))
- self.assertRaises(Error, RequestContext, ('2001:db8:::1', 5300))
- class RequestACLTest(unittest.TestCase):
- def test_direct_construct(self):
- self.assertRaises(TypeError, RequestACL)
- def test_request_loader(self):
- # these shouldn't raise an exception
- load_request_acl('[{"action": "DROP"}]')
- load_request_acl('[{"action": "DROP", "from": "192.0.2.1"}]')
- # Invalid types
- self.assertRaises(TypeError, load_request_acl, 1)
- self.assertRaises(TypeError, load_request_acl, [])
- # Incorrect number of arguments
- self.assertRaises(TypeError, load_request_acl,
- '[{"action": "DROP"}]', 0)
- def test_bad_acl_syntax(self):
- # the following are derived from loader_test.cc
- self.assertRaises(LoaderError, load_request_acl, '{}');
- self.assertRaises(LoaderError, load_request_acl, '42');
- self.assertRaises(LoaderError, load_request_acl, 'true');
- self.assertRaises(LoaderError, load_request_acl, 'null');
- self.assertRaises(LoaderError, load_request_acl, '"hello"');
- self.assertRaises(LoaderError, load_request_acl, '[42]');
- self.assertRaises(LoaderError, load_request_acl, '["hello"]');
- self.assertRaises(LoaderError, load_request_acl, '[[]]');
- self.assertRaises(LoaderError, load_request_acl, '[true]');
- self.assertRaises(LoaderError, load_request_acl, '[null]');
- self.assertRaises(LoaderError, load_request_acl, '[{}]');
- # the following are derived from dns_test.cc
- self.assertRaises(LoaderError, load_request_acl,
- '[{"action": "ACCEPT", "bad": "192.0.2.1"}]')
- self.assertRaises(LoaderError, load_request_acl,
- '[{"action": "ACCEPT", "from": 4}]')
- self.assertRaises(LoaderError, load_request_acl,
- '[{"action": "ACCEPT", "from": []}]')
- self.assertRaises(LoaderError, load_request_acl,
- '[{"action": "ACCEPT", "from": "bad"}]')
- self.assertRaises(LoaderError, load_request_acl,
- '[{"action": "ACCEPT", "from": null}]')
- def test_bad_acl_ipsyntax(self):
- # this test is derived from ip_check_unittest.cc
- self.assertRaises(LoaderError, load_request_acl,
- '[{"action": "DROP", "from": "192.0.2.43/-1"}]')
- self.assertRaises(LoaderError, load_request_acl,
- '[{"action": "DROP", "from": "192.0.2.43//1"')
- self.assertRaises(LoaderError, load_request_acl,
- '[{"action": "DROP", "from": "192.0.2.43/1/"')
- self.assertRaises(LoaderError, load_request_acl,
- '[{"action": "DROP", "from": "/192.0.2.43/1"')
- self.assertRaises(LoaderError, load_request_acl,
- '[{"action": "DROP", "from": "2001:db8::/xxxx"')
- self.assertRaises(LoaderError, load_request_acl,
- '[{"action": "DROP", "from": "2001:db8::/32/s"')
- self.assertRaises(LoaderError, load_request_acl,
- '[{"action": "DROP", "from": "1/"')
- self.assertRaises(LoaderError, load_request_acl,
- '[{"action": "DROP", "from": "/1"')
- self.assertRaises(LoaderError, load_request_acl,
- '[{"action": "DROP", "from": "192.0.2.0/33"')
- self.assertRaises(LoaderError, load_request_acl,
- '[{"action": "DROP", "from": "::1/129"')
- def test_execute(self):
- # tests derived from dns_test.cc. We don't directly expose checks
- # in the python wrapper, so we test it via execute().
- self.assertEqual(ACCEPT, get_acl('192.0.2.1').execute(CONTEXT4))
- self.assertEqual(REJECT, get_acl('192.0.2.53').execute(CONTEXT4))
- self.assertEqual(ACCEPT, get_acl('192.0.2.0/24').execute(CONTEXT4))
- self.assertEqual(REJECT, get_acl('192.0.1.0/24').execute(CONTEXT4))
- self.assertEqual(REJECT, get_acl('192.0.1.0/24').execute(CONTEXT4))
- self.assertEqual(ACCEPT, get_acl('2001:db8::1').execute(CONTEXT6))
- self.assertEqual(REJECT, get_acl('2001:db8::53').execute(CONTEXT6))
- self.assertEqual(ACCEPT, get_acl('2001:db8::/64').execute(CONTEXT6))
- self.assertEqual(REJECT, get_acl('2001:db8:1::/64').execute(CONTEXT6))
- self.assertEqual(REJECT, get_acl('32.1.13.184').execute(CONTEXT6))
- # A bit more complicated example, derived from resolver_config_unittest
- acl = load_request_acl('[ {"action": "ACCEPT", ' +
- ' "from": "192.0.2.1"},' +
- ' {"action": "REJECT",' +
- ' "from": "192.0.2.0/24"},' +
- ' {"action": "DROP",' +
- ' "from": "2001:db8::1"},' +
- '] }')
- self.assertEqual(ACCEPT, acl.execute(CONTEXT4))
- self.assertEqual(REJECT, acl.execute(get_context('192.0.2.2')))
- self.assertEqual(DROP, acl.execute(get_context('2001:db8::1')))
- self.assertEqual(REJECT, acl.execute(get_context('2001:db8::2')))
- def test_bad_execute(self):
- acl = get_acl('192.0.2.1')
- # missing parameter
- self.assertRaises(TypeError, acl.execute)
- # too many parameters
- self.assertRaises(TypeError, acl.execute, get_context('192.0.2.2'), 0)
- # type mismatch
- self.assertRaises(TypeError, acl.execute, 'bad parameter')
- if __name__ == '__main__':
- unittest.main()
|