123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- # Copyright (C) 2012 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- import isc.log
- from isc.dns import *
- from isc.datasrc import DataSourceClient
- from isc.ddns.zone_config import *
- import isc.acl.dns
- from isc.acl.acl import ACCEPT, REJECT, DROP, LoaderError
- import unittest
- import socket
- # Some common test parameters
- TEST_ZONE_NAME = Name('example.org')
- TEST_SECONDARY_ZONE_NAME = Name('example.com')
- TEST_RRCLASS = RRClass.IN()
- TEST_TSIG_KEY = TSIGKey("example.com:SFuWd/q99SzF8Yzd1QbB9g==")
- TEST_ACL_CONTEXT = isc.acl.dns.RequestContext(
- socket.getaddrinfo("192.0.2.1", 1234, 0, socket.SOCK_DGRAM,
- socket.IPPROTO_UDP, socket.AI_NUMERICHOST)[0][4])
- class FakeDataSourceClient:
- '''Faked data source client used in the ZoneConfigTest.
- It emulates isc.datasrc.DataSourceClient, but only has to provide
- the find_zone() interface (and only the first element of the return
- value matters). By default it returns 'SUCCESS' (exact match) for
- any input. It can be dynamically customized via the set_find_result()
- method.
- '''
- def __init__(self):
- self.__find_result = DataSourceClient.SUCCESS
- def find_zone(self, zname):
- return (self.__find_result, None)
- def set_find_result(self, result):
- self.__find_result = result
- class ZoneConfigTest(unittest.TestCase):
- '''Some basic tests for the ZoneConfig class.'''
- def setUp(self):
- self.__datasrc_client = FakeDataSourceClient()
- self.zconfig = ZoneConfig({(TEST_SECONDARY_ZONE_NAME, TEST_RRCLASS)},
- TEST_RRCLASS, self.__datasrc_client)
- def test_find_zone(self):
- # Primay zone case: zone is in the data source, and not in secondaries
- self.assertEqual((ZONE_PRIMARY, self.__datasrc_client),
- (self.zconfig.find_zone(TEST_ZONE_NAME, TEST_RRCLASS)))
- # Secondary zone case: zone is in the data source and in secondaries.
- self.assertEqual((ZONE_SECONDARY, None),
- (self.zconfig.find_zone(TEST_SECONDARY_ZONE_NAME,
- TEST_RRCLASS)))
- # 'not found' case: zone not in the data source.
- self.__datasrc_client.set_find_result(DataSourceClient.NOTFOUND)
- self.assertEqual((ZONE_NOTFOUND, None),
- (self.zconfig.find_zone(Name('example'),
- TEST_RRCLASS)))
- # same for the partial match
- self.__datasrc_client.set_find_result(DataSourceClient.PARTIALMATCH)
- self.assertEqual((ZONE_NOTFOUND, None),
- (self.zconfig.find_zone(Name('example'),
- TEST_RRCLASS)))
- # a bit unusual case: zone not in the data source, but in secondaries.
- # this is probably a configuration error, but ZoneConfig doesn't do
- # this level check.
- self.__datasrc_client.set_find_result(DataSourceClient.NOTFOUND)
- self.assertEqual((ZONE_NOTFOUND, None),
- (self.zconfig.find_zone(TEST_ZONE_NAME,
- TEST_RRCLASS)))
- # zone class doesn't match (but zone name matches)
- self.__datasrc_client.set_find_result(DataSourceClient.SUCCESS)
- zconfig = ZoneConfig({(TEST_SECONDARY_ZONE_NAME, TEST_RRCLASS)},
- RRClass.CH(), self.__datasrc_client)
- self.assertEqual((ZONE_NOTFOUND, None),
- (zconfig.find_zone(TEST_ZONE_NAME, TEST_RRCLASS)))
- # similar to the previous case, but also in the secondary list
- zconfig = ZoneConfig({(TEST_ZONE_NAME, TEST_RRCLASS)},
- RRClass.CH(), self.__datasrc_client)
- self.assertEqual((ZONE_NOTFOUND, None),
- (zconfig.find_zone(TEST_ZONE_NAME, TEST_RRCLASS)))
- # check some basic tests varying the secondary list.
- # empty secondary list doesn't cause any disruption.
- zconfig = ZoneConfig(set(), TEST_RRCLASS, self.__datasrc_client)
- self.assertEqual((ZONE_PRIMARY, self.__datasrc_client),
- self.zconfig.find_zone(TEST_ZONE_NAME, TEST_RRCLASS))
- # adding some mulitle tuples, including subdomain of the test zone
- # name, and the same zone name but a different class
- zconfig = ZoneConfig({(TEST_SECONDARY_ZONE_NAME, TEST_RRCLASS),
- (Name('example'), TEST_RRCLASS),
- (Name('sub.example.org'), TEST_RRCLASS),
- (TEST_ZONE_NAME, RRClass.CH())},
- TEST_RRCLASS, self.__datasrc_client)
- self.assertEqual((ZONE_PRIMARY, self.__datasrc_client),
- self.zconfig.find_zone(TEST_ZONE_NAME, TEST_RRCLASS))
- class ACLConfigTest(unittest.TestCase):
- def setUp(self):
- self.__datasrc_client = FakeDataSourceClient()
- self.__zconfig = ZoneConfig({(TEST_SECONDARY_ZONE_NAME, TEST_RRCLASS)},
- TEST_RRCLASS, self.__datasrc_client)
- def test_get_update_acl(self):
- # By default, no ACL is set, and the default ACL is "reject all"
- acl = self.__zconfig.get_update_acl(TEST_ZONE_NAME, TEST_RRCLASS)
- self.assertEqual(REJECT, acl.execute(TEST_ACL_CONTEXT))
- # Add a map entry that would match the request, and it should now be
- # accepted.
- acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
- REQUEST_LOADER.load([{"action": "ACCEPT"}])}
- self.__zconfig.set_update_acl_map(acl_map)
- acl = self.__zconfig.get_update_acl(TEST_ZONE_NAME, TEST_RRCLASS)
- self.assertEqual(ACCEPT, acl.execute(TEST_ACL_CONTEXT))
- # 'All reject' ACL will still apply for any other zones
- acl = self.__zconfig.get_update_acl(Name('example.com'), TEST_RRCLASS)
- self.assertEqual(REJECT, acl.execute(TEST_ACL_CONTEXT))
- acl = self.__zconfig.get_update_acl(TEST_ZONE_NAME, RRClass.CH())
- self.assertEqual(REJECT, acl.execute(TEST_ACL_CONTEXT))
- # Test with a map with a few more ACL entries. Should be nothing
- # special.
- acl_map = {(Name('example.com'), TEST_RRCLASS):
- REQUEST_LOADER.load([{"action": "REJECT"}]),
- (TEST_ZONE_NAME, TEST_RRCLASS):
- REQUEST_LOADER.load([{"action": "ACCEPT"}]),
- (TEST_ZONE_NAME, RRClass.CH()):
- REQUEST_LOADER.load([{"action": "DROP"}])}
- self.__zconfig.set_update_acl_map(acl_map)
- acl = self.__zconfig.get_update_acl(TEST_ZONE_NAME, TEST_RRCLASS)
- self.assertEqual(ACCEPT, acl.execute(TEST_ACL_CONTEXT))
- acl = self.__zconfig.get_update_acl(Name('example.com'), TEST_RRCLASS)
- self.assertEqual(REJECT, acl.execute(TEST_ACL_CONTEXT))
- acl = self.__zconfig.get_update_acl(TEST_ZONE_NAME, RRClass.CH())
- self.assertEqual(DROP, acl.execute(TEST_ACL_CONTEXT))
- if __name__ == "__main__":
- isc.log.init("bind10")
- isc.log.resetUnitTestRootLogger()
- unittest.main()
|