|
@@ -35,9 +35,11 @@ TEST_RRCLASS = RRClass.IN()
|
|
|
TEST_ZONE_RECORD = Question(TEST_ZONE_NAME, TEST_RRCLASS, UPDATE_RRTYPE)
|
|
|
TEST_CLIENT6 = ('2001:db8::1', 53, 0, 0)
|
|
|
TEST_CLIENT4 = ('192.0.2.1', 53)
|
|
|
+# TSIG key for tests when needed. The key name is TEST_ZONE_NAME.
|
|
|
+TEST_TSIG_KEY = TSIGKey("example.org:SFuWd/q99SzF8Yzd1QbB9g==")
|
|
|
|
|
|
def create_update_msg(zones=[TEST_ZONE_RECORD], prerequisites=[],
|
|
|
- updates=[]):
|
|
|
+ updates=[], tsig_key=None):
|
|
|
msg = Message(Message.RENDER)
|
|
|
msg.set_qid(5353) # arbitrary chosen
|
|
|
msg.set_opcode(Opcode.UPDATE())
|
|
@@ -50,13 +52,16 @@ def create_update_msg(zones=[TEST_ZONE_RECORD], prerequisites=[],
|
|
|
msg.add_rrset(SECTION_UPDATE, u)
|
|
|
|
|
|
renderer = MessageRenderer()
|
|
|
- msg.to_wire(renderer)
|
|
|
+ if tsig_key is not None:
|
|
|
+ msg.to_wire(renderer, TSIGContext(tsig_key))
|
|
|
+ else:
|
|
|
+ msg.to_wire(renderer)
|
|
|
|
|
|
# re-read the created data in the parse mode
|
|
|
msg.clear(Message.PARSE)
|
|
|
msg.from_wire(renderer.get_data(), Message.PRESERVE_ORDER)
|
|
|
|
|
|
- return renderer.get_data(), msg
|
|
|
+ return msg
|
|
|
|
|
|
def add_rdata(rrset, rdata):
|
|
|
'''
|
|
@@ -89,18 +94,25 @@ def create_rrset(name, rrclass, rrtype, ttl, rdatas = []):
|
|
|
add_rdata(rrset, rdata)
|
|
|
return rrset
|
|
|
|
|
|
-class SessionTest(unittest.TestCase):
|
|
|
- '''Session tests'''
|
|
|
+class SesseionTestBase(unittest.TestCase):
|
|
|
+ '''Base class for all sesion related tests.
|
|
|
+
|
|
|
+ It just initializes common test parameters in its setUp() and defines
|
|
|
+ some common utility method(s).
|
|
|
+
|
|
|
+ '''
|
|
|
def setUp(self):
|
|
|
shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
|
|
|
- self.__datasrc_client = DataSourceClient("sqlite3",
|
|
|
- WRITE_ZONE_DB_CONFIG)
|
|
|
- self.__update_msgdata, self.__update_msg = create_update_msg()
|
|
|
- self.__session = UpdateSession(self.__update_msg,
|
|
|
- self.__update_msgdata, TEST_CLIENT4,
|
|
|
- ZoneConfig([], TEST_RRCLASS,
|
|
|
- self.__datasrc_client))
|
|
|
- self.__session._UpdateSession__get_update_zone()
|
|
|
+ self._datasrc_client = DataSourceClient("sqlite3",
|
|
|
+ WRITE_ZONE_DB_CONFIG)
|
|
|
+ self._update_msg = create_update_msg()
|
|
|
+ self._acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
|
|
|
+ REQUEST_LOADER.load([{"action": "ACCEPT"}])}
|
|
|
+ self._session = UpdateSession(self._update_msg, TEST_CLIENT4,
|
|
|
+ ZoneConfig([], TEST_RRCLASS,
|
|
|
+ self._datasrc_client,
|
|
|
+ self._acl_map))
|
|
|
+ self._session._UpdateSession__get_update_zone()
|
|
|
|
|
|
def check_response(self, msg, expected_rcode):
|
|
|
'''Perform common checks on update resposne message.'''
|
|
@@ -114,9 +126,12 @@ class SessionTest(unittest.TestCase):
|
|
|
self.assertEqual(0, msg.get_rr_count(SECTION_UPDATE))
|
|
|
self.assertEqual(0, msg.get_rr_count(Message.SECTION_ADDITIONAL))
|
|
|
|
|
|
+class SessionTest(SesseionTestBase):
|
|
|
+ '''Basic session tests'''
|
|
|
+
|
|
|
def test_handle(self):
|
|
|
'''Basic update case'''
|
|
|
- result, zname, zclass = self.__session.handle()
|
|
|
+ result, zname, zclass = self._session.handle()
|
|
|
self.assertEqual(UPDATE_SUCCESS, result)
|
|
|
self.assertEqual(TEST_ZONE_NAME, zname)
|
|
|
self.assertEqual(TEST_RRCLASS, zclass)
|
|
@@ -127,8 +142,8 @@ class SessionTest(unittest.TestCase):
|
|
|
|
|
|
def test_broken_request(self):
|
|
|
# Zone section is empty
|
|
|
- msg_data, msg = create_update_msg(zones=[])
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT6, None)
|
|
|
+ msg = create_update_msg(zones=[])
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT6, None)
|
|
|
result, zname, zclass = session.handle()
|
|
|
self.assertEqual(UPDATE_ERROR, result)
|
|
|
self.assertEqual(None, zname)
|
|
@@ -136,17 +151,15 @@ class SessionTest(unittest.TestCase):
|
|
|
self.check_response(session.get_message(), Rcode.FORMERR())
|
|
|
|
|
|
# Zone section contains multiple records
|
|
|
- msg_data, msg = create_update_msg(zones=[TEST_ZONE_RECORD,
|
|
|
- TEST_ZONE_RECORD])
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT4, None)
|
|
|
+ msg = create_update_msg(zones=[TEST_ZONE_RECORD, TEST_ZONE_RECORD])
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4, None)
|
|
|
self.assertEqual(UPDATE_ERROR, session.handle()[0])
|
|
|
self.check_response(session.get_message(), Rcode.FORMERR())
|
|
|
|
|
|
# Zone section's type is not SOA
|
|
|
- msg_data, msg = create_update_msg(zones=[Question(TEST_ZONE_NAME,
|
|
|
- TEST_RRCLASS,
|
|
|
- RRType.A())])
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT4, None)
|
|
|
+ msg = create_update_msg(zones=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
+ RRType.A())])
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4, None)
|
|
|
self.assertEqual(UPDATE_ERROR, session.handle()[0])
|
|
|
self.check_response(session.get_message(), Rcode.FORMERR())
|
|
|
|
|
@@ -154,24 +167,20 @@ class SessionTest(unittest.TestCase):
|
|
|
# specified zone is configured as a secondary. Since this
|
|
|
# implementation doesn't support update forwarding, the result
|
|
|
# should be NOTIMP.
|
|
|
- msg_data, msg = create_update_msg(zones=[Question(TEST_ZONE_NAME,
|
|
|
- TEST_RRCLASS,
|
|
|
- RRType.SOA())])
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT4,
|
|
|
+ msg = create_update_msg(zones=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
+ RRType.SOA())])
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4,
|
|
|
ZoneConfig([(TEST_ZONE_NAME, TEST_RRCLASS)],
|
|
|
- TEST_RRCLASS,
|
|
|
- self.__datasrc_client))
|
|
|
+ TEST_RRCLASS, self._datasrc_client))
|
|
|
self.assertEqual(UPDATE_ERROR, session.handle()[0])
|
|
|
self.check_response(session.get_message(), Rcode.NOTIMP())
|
|
|
|
|
|
def check_notauth(self, zname, zclass=TEST_RRCLASS):
|
|
|
'''Common test sequence for the 'notauth' test'''
|
|
|
- msg_data, msg = create_update_msg(zones=[Question(zname, zclass,
|
|
|
- RRType.SOA())])
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT4,
|
|
|
+ msg = create_update_msg(zones=[Question(zname, zclass, RRType.SOA())])
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4,
|
|
|
ZoneConfig([(TEST_ZONE_NAME, TEST_RRCLASS)],
|
|
|
- TEST_RRCLASS,
|
|
|
- self.__datasrc_client))
|
|
|
+ TEST_RRCLASS, self._datasrc_client))
|
|
|
self.assertEqual(UPDATE_ERROR, session.handle()[0])
|
|
|
self.check_response(session.get_message(), Rcode.NOTAUTH())
|
|
|
|
|
@@ -274,7 +283,7 @@ class SessionTest(unittest.TestCase):
|
|
|
self.assertEqual(expected, strings)
|
|
|
|
|
|
def __prereq_helper(self, method, expected, rrset):
|
|
|
- '''Calls the given method with self.__datasrc_client
|
|
|
+ '''Calls the given method with self._datasrc_client
|
|
|
and the given rrset, and compares the return value.
|
|
|
Function does not do much but makes the code look nicer'''
|
|
|
self.assertEqual(expected, method(rrset))
|
|
@@ -331,19 +340,19 @@ class SessionTest(unittest.TestCase):
|
|
|
self.__prereq_helper(method, expected, rrset)
|
|
|
|
|
|
def test_check_prerequisite_exists(self):
|
|
|
- method = self.__session._UpdateSession__prereq_rrset_exists
|
|
|
+ method = self._session._UpdateSession__prereq_rrset_exists
|
|
|
self.__check_prerequisite_exists_combined(method,
|
|
|
RRClass.ANY(),
|
|
|
True)
|
|
|
|
|
|
def test_check_prerequisite_does_not_exist(self):
|
|
|
- method = self.__session._UpdateSession__prereq_rrset_does_not_exist
|
|
|
+ method = self._session._UpdateSession__prereq_rrset_does_not_exist
|
|
|
self.__check_prerequisite_exists_combined(method,
|
|
|
RRClass.NONE(),
|
|
|
False)
|
|
|
|
|
|
def test_check_prerequisite_exists_value(self):
|
|
|
- method = self.__session._UpdateSession__prereq_rrset_exists_value
|
|
|
+ method = self._session._UpdateSession__prereq_rrset_exists_value
|
|
|
|
|
|
rrset = create_rrset("www.example.org", RRClass.IN(), RRType.A(), 0)
|
|
|
# empty one should not match
|
|
@@ -419,13 +428,13 @@ class SessionTest(unittest.TestCase):
|
|
|
self.__prereq_helper(method, expected, rrset)
|
|
|
|
|
|
def test_check_prerequisite_name_in_use(self):
|
|
|
- method = self.__session._UpdateSession__prereq_name_in_use
|
|
|
+ method = self._session._UpdateSession__prereq_name_in_use
|
|
|
self.__check_prerequisite_name_in_use_combined(method,
|
|
|
RRClass.ANY(),
|
|
|
True)
|
|
|
|
|
|
def test_check_prerequisite_name_not_in_use(self):
|
|
|
- method = self.__session._UpdateSession__prereq_name_not_in_use
|
|
|
+ method = self._session._UpdateSession__prereq_name_not_in_use
|
|
|
self.__check_prerequisite_name_in_use_combined(method,
|
|
|
RRClass.NONE(),
|
|
|
False)
|
|
@@ -435,10 +444,10 @@ class SessionTest(unittest.TestCase):
|
|
|
creates an update session, and fills it with the list of rrsets
|
|
|
from 'prerequisites'. Then checks if __check_prerequisites()
|
|
|
returns the Rcode specified in 'expected'.'''
|
|
|
- msg_data, msg = create_update_msg([TEST_ZONE_RECORD],
|
|
|
- prerequisites)
|
|
|
- zconfig = ZoneConfig([], TEST_RRCLASS, self.__datasrc_client)
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT4, zconfig)
|
|
|
+ msg = create_update_msg([TEST_ZONE_RECORD], prerequisites)
|
|
|
+ zconfig = ZoneConfig([], TEST_RRCLASS, self._datasrc_client,
|
|
|
+ self._acl_map)
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4, zconfig)
|
|
|
session._UpdateSession__get_update_zone()
|
|
|
# compare the to_text output of the rcodes (nicer error messages)
|
|
|
# This call itself should also be done by handle(),
|
|
@@ -447,8 +456,8 @@ class SessionTest(unittest.TestCase):
|
|
|
session._UpdateSession__check_prerequisites().to_text())
|
|
|
# Now see if handle finds the same result
|
|
|
(result, _, _) = session.handle()
|
|
|
- self.assertEqual(expected,
|
|
|
- session._UpdateSession__message.get_rcode())
|
|
|
+ self.assertEqual(expected.to_text(),
|
|
|
+ session._UpdateSession__message.get_rcode().to_text())
|
|
|
# And that the result looks right
|
|
|
if expected == Rcode.NOERROR():
|
|
|
self.assertEqual(UPDATE_SUCCESS, result)
|
|
@@ -460,10 +469,10 @@ class SessionTest(unittest.TestCase):
|
|
|
creates an update session, and fills it with the list of rrsets
|
|
|
from 'updates'. Then checks if __do_prescan()
|
|
|
returns the Rcode specified in 'expected'.'''
|
|
|
- msg_data, msg = create_update_msg([TEST_ZONE_RECORD],
|
|
|
- [], updates)
|
|
|
- zconfig = ZoneConfig([], TEST_RRCLASS, self.__datasrc_client)
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT4, zconfig)
|
|
|
+ msg = create_update_msg([TEST_ZONE_RECORD], [], updates)
|
|
|
+ zconfig = ZoneConfig([], TEST_RRCLASS, self._datasrc_client,
|
|
|
+ self._acl_map)
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4, zconfig)
|
|
|
session._UpdateSession__get_update_zone()
|
|
|
# compare the to_text output of the rcodes (nicer error messages)
|
|
|
# This call itself should also be done by handle(),
|
|
@@ -479,10 +488,10 @@ class SessionTest(unittest.TestCase):
|
|
|
creates an update session, and fills it with the list of rrsets
|
|
|
from 'updates'. Then checks if __handle()
|
|
|
results in a response with rcode 'expected'.'''
|
|
|
- msg_data, msg = create_update_msg([TEST_ZONE_RECORD],
|
|
|
- [], updates)
|
|
|
- zconfig = ZoneConfig([], TEST_RRCLASS, self.__datasrc_client)
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT4, zconfig)
|
|
|
+ msg = create_update_msg([TEST_ZONE_RECORD], [], updates)
|
|
|
+ zconfig = ZoneConfig([], TEST_RRCLASS, self._datasrc_client,
|
|
|
+ self._acl_map)
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4, zconfig)
|
|
|
|
|
|
# Now see if handle finds the same result
|
|
|
(result, _, _) = session.handle()
|
|
@@ -544,7 +553,6 @@ class SessionTest(unittest.TestCase):
|
|
|
|
|
|
name_not_in_use_no = create_rrset("www.example.org", RRClass.NONE(),
|
|
|
RRType.ANY(), 0)
|
|
|
-
|
|
|
# check 'no' result codes
|
|
|
self.check_prerequisite_result(Rcode.NXRRSET(),
|
|
|
[ rrset_exists_no ])
|
|
@@ -639,9 +647,8 @@ class SessionTest(unittest.TestCase):
|
|
|
[ "foo" ])
|
|
|
self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])
|
|
|
|
|
|
-
|
|
|
def __prereq_helper(self, method, expected, rrset):
|
|
|
- '''Calls the given method with self.__datasrc_client
|
|
|
+ '''Calls the given method with self._datasrc_client
|
|
|
and the given rrset, and compares the return value.
|
|
|
Function does not do much but makes the code look nicer'''
|
|
|
self.assertEqual(expected, method(rrset))
|
|
@@ -828,7 +835,7 @@ class SessionTest(unittest.TestCase):
|
|
|
then checks if the result matches the expected result.
|
|
|
If so, and if expected_rrset is given, they are compared as
|
|
|
well.'''
|
|
|
- _, finder = self.__datasrc_client.find_zone(TEST_ZONE_NAME)
|
|
|
+ _, finder = self._datasrc_client.find_zone(TEST_ZONE_NAME)
|
|
|
result, found_rrset, _ = finder.find(name, rrtype,
|
|
|
finder.NO_WILDCARD |
|
|
|
finder.FIND_GLUE_OK)
|
|
@@ -1202,9 +1209,63 @@ class SessionTest(unittest.TestCase):
|
|
|
def test_uncaught_exception(self):
|
|
|
def my_exc():
|
|
|
raise Exception("foo")
|
|
|
- self.__session._UpdateSession__update_soa = my_exc
|
|
|
+ self._session._UpdateSession__update_soa = my_exc
|
|
|
self.assertEqual(Rcode.SERVFAIL().to_text(),
|
|
|
- self.__session._UpdateSession__do_update().to_text())
|
|
|
+ self._session._UpdateSession__do_update().to_text())
|
|
|
+
|
|
|
+class SessionACLTest(SesseionTestBase):
|
|
|
+ '''ACL related tests for update session.'''
|
|
|
+ def test_update_acl_check(self):
|
|
|
+ '''Test for various ACL checks.
|
|
|
+
|
|
|
+ Note that accepted cases are covered in the basic tests.
|
|
|
+
|
|
|
+ '''
|
|
|
+ # create a separate session, with default (empty) ACL map.
|
|
|
+ session = UpdateSession(self._update_msg,
|
|
|
+ TEST_CLIENT4, ZoneConfig([], TEST_RRCLASS,
|
|
|
+ self._datasrc_client))
|
|
|
+ # then the request should be rejected.
|
|
|
+ self.assertEqual((UPDATE_ERROR, None, None), session.handle())
|
|
|
+
|
|
|
+ # recreate the request message, and test with an ACL that would result
|
|
|
+ # in 'DROP'. get_message() should return None.
|
|
|
+ msg = create_update_msg()
|
|
|
+ acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
|
|
|
+ REQUEST_LOADER.load([{"action": "DROP", "from":
|
|
|
+ TEST_CLIENT4[0]}])}
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4,
|
|
|
+ ZoneConfig([], TEST_RRCLASS,
|
|
|
+ self._datasrc_client, acl_map))
|
|
|
+ self.assertEqual((UPDATE_DROP, None, None), session.handle())
|
|
|
+ self.assertEqual(None, session.get_message())
|
|
|
+
|
|
|
+ def test_update_tsigacl_check(self):
|
|
|
+ '''Test for various ACL checks using TSIG.'''
|
|
|
+ # This ACL will accept requests from TEST_CLIENT4 (any port) *and*
|
|
|
+ # has TSIG signed by TEST_ZONE_NAME; all others will be rejected.
|
|
|
+ acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
|
|
|
+ REQUEST_LOADER.load([{"action": "ACCEPT",
|
|
|
+ "from": TEST_CLIENT4[0],
|
|
|
+ "key": TEST_ZONE_NAME.to_text()}])}
|
|
|
+
|
|
|
+ # If the message doesn't contain TSIG, it doesn't match the ACCEPT
|
|
|
+ # ACL entry, and the request should be rejected.
|
|
|
+ session = UpdateSession(self._update_msg,
|
|
|
+ TEST_CLIENT4, ZoneConfig([], TEST_RRCLASS,
|
|
|
+ self._datasrc_client,
|
|
|
+ acl_map))
|
|
|
+ self.assertEqual((UPDATE_ERROR, None, None), session.handle())
|
|
|
+ self.check_response(session.get_message(), Rcode.REFUSED())
|
|
|
+
|
|
|
+ # If the message contains TSIG, it should match the ACCEPT
|
|
|
+ # ACL entry, and the request should be granted.
|
|
|
+ session = UpdateSession(create_update_msg(tsig_key=TEST_TSIG_KEY),
|
|
|
+ TEST_CLIENT4, ZoneConfig([], TEST_RRCLASS,
|
|
|
+ self._datasrc_client,
|
|
|
+ acl_map))
|
|
|
+ self.assertEqual((UPDATE_SUCCESS, TEST_ZONE_NAME, TEST_RRCLASS),
|
|
|
+ session.handle())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
isc.log.init("bind10")
|