|
@@ -35,8 +35,11 @@ TEST_RRCLASS = RRClass.IN()
|
|
|
TEST_ZONE_RECORD = Question(TEST_ZONE_NAME, TEST_RRCLASS, UPDATE_RRTYPE)
|
|
|
TEST_CLIENT6 = ('2001:db8::1', 53, 0, 0)
|
|
|
TEST_CLIENT4 = ('192.0.2.1', 53)
|
|
|
+# TSIG key for tests when needed. The key name is TEST_ZONE_NAME.
|
|
|
+TEST_TSIG_KEY = TSIGKey("example.org:SFuWd/q99SzF8Yzd1QbB9g==")
|
|
|
|
|
|
-def create_update_msg(zones=[TEST_ZONE_RECORD], prerequisites=[]):
|
|
|
+def create_update_msg(zones=[TEST_ZONE_RECORD], prerequisites=[],
|
|
|
+ tsig_key=None):
|
|
|
msg = Message(Message.RENDER)
|
|
|
msg.set_qid(5353) # arbitrary chosen
|
|
|
msg.set_opcode(Opcode.UPDATE())
|
|
@@ -47,25 +50,35 @@ def create_update_msg(zones=[TEST_ZONE_RECORD], prerequisites=[]):
|
|
|
msg.add_rrset(SECTION_PREREQUISITE, p)
|
|
|
|
|
|
renderer = MessageRenderer()
|
|
|
- msg.to_wire(renderer)
|
|
|
+ if tsig_key is not None:
|
|
|
+ msg.to_wire(renderer, TSIGContext(tsig_key))
|
|
|
+ else:
|
|
|
+ msg.to_wire(renderer)
|
|
|
|
|
|
# re-read the created data in the parse mode
|
|
|
msg.clear(Message.PARSE)
|
|
|
msg.from_wire(renderer.get_data())
|
|
|
|
|
|
- return renderer.get_data(), msg
|
|
|
+ return msg
|
|
|
|
|
|
-class SessionTest(unittest.TestCase):
|
|
|
- '''Session tests'''
|
|
|
+class SesseionTestBase(unittest.TestCase):
|
|
|
+ '''Base class for all sesion related tests.
|
|
|
+
|
|
|
+ It just initializes common test parameters in its setUp() and defines
|
|
|
+ some common utility method(s).
|
|
|
+
|
|
|
+ '''
|
|
|
def setUp(self):
|
|
|
shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
|
|
|
- self.__datasrc_client = DataSourceClient("sqlite3",
|
|
|
- WRITE_ZONE_DB_CONFIG)
|
|
|
- self.__update_msgdata, self.__update_msg = create_update_msg()
|
|
|
- self.__session = UpdateSession(self.__update_msg,
|
|
|
- self.__update_msgdata, TEST_CLIENT4,
|
|
|
- ZoneConfig([], TEST_RRCLASS,
|
|
|
- self.__datasrc_client))
|
|
|
+ self._datasrc_client = DataSourceClient("sqlite3",
|
|
|
+ WRITE_ZONE_DB_CONFIG)
|
|
|
+ self._update_msg = create_update_msg()
|
|
|
+ self._acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
|
|
|
+ REQUEST_LOADER.load([{"action": "ACCEPT"}])}
|
|
|
+ self._session = UpdateSession(self._update_msg, TEST_CLIENT4,
|
|
|
+ ZoneConfig([], TEST_RRCLASS,
|
|
|
+ self._datasrc_client,
|
|
|
+ self._acl_map))
|
|
|
|
|
|
def check_response(self, msg, expected_rcode):
|
|
|
'''Perform common checks on update resposne message.'''
|
|
@@ -79,9 +92,12 @@ class SessionTest(unittest.TestCase):
|
|
|
self.assertEqual(0, msg.get_rr_count(SECTION_UPDATE))
|
|
|
self.assertEqual(0, msg.get_rr_count(Message.SECTION_ADDITIONAL))
|
|
|
|
|
|
+class SessionTest(SesseionTestBase):
|
|
|
+ '''Basic session tests'''
|
|
|
+
|
|
|
def test_handle(self):
|
|
|
'''Basic update case'''
|
|
|
- result, zname, zclass = self.__session.handle()
|
|
|
+ result, zname, zclass = self._session.handle()
|
|
|
self.assertEqual(UPDATE_SUCCESS, result)
|
|
|
self.assertEqual(TEST_ZONE_NAME, zname)
|
|
|
self.assertEqual(TEST_RRCLASS, zclass)
|
|
@@ -92,8 +108,8 @@ class SessionTest(unittest.TestCase):
|
|
|
|
|
|
def test_broken_request(self):
|
|
|
# Zone section is empty
|
|
|
- msg_data, msg = create_update_msg(zones=[])
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT6, None)
|
|
|
+ msg = create_update_msg(zones=[])
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT6, None)
|
|
|
result, zname, zclass = session.handle()
|
|
|
self.assertEqual(UPDATE_ERROR, result)
|
|
|
self.assertEqual(None, zname)
|
|
@@ -101,17 +117,15 @@ class SessionTest(unittest.TestCase):
|
|
|
self.check_response(session.get_message(), Rcode.FORMERR())
|
|
|
|
|
|
# Zone section contains multiple records
|
|
|
- msg_data, msg = create_update_msg(zones=[TEST_ZONE_RECORD,
|
|
|
- TEST_ZONE_RECORD])
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT4, None)
|
|
|
+ msg = create_update_msg(zones=[TEST_ZONE_RECORD, TEST_ZONE_RECORD])
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4, None)
|
|
|
self.assertEqual(UPDATE_ERROR, session.handle()[0])
|
|
|
self.check_response(session.get_message(), Rcode.FORMERR())
|
|
|
|
|
|
# Zone section's type is not SOA
|
|
|
- msg_data, msg = create_update_msg(zones=[Question(TEST_ZONE_NAME,
|
|
|
- TEST_RRCLASS,
|
|
|
- RRType.A())])
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT4, None)
|
|
|
+ msg = create_update_msg(zones=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
+ RRType.A())])
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4, None)
|
|
|
self.assertEqual(UPDATE_ERROR, session.handle()[0])
|
|
|
self.check_response(session.get_message(), Rcode.FORMERR())
|
|
|
|
|
@@ -119,24 +133,20 @@ class SessionTest(unittest.TestCase):
|
|
|
# specified zone is configured as a secondary. Since this
|
|
|
# implementation doesn't support update forwarding, the result
|
|
|
# should be NOTIMP.
|
|
|
- msg_data, msg = create_update_msg(zones=[Question(TEST_ZONE_NAME,
|
|
|
- TEST_RRCLASS,
|
|
|
- RRType.SOA())])
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT4,
|
|
|
+ msg = create_update_msg(zones=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
+ RRType.SOA())])
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4,
|
|
|
ZoneConfig([(TEST_ZONE_NAME, TEST_RRCLASS)],
|
|
|
- TEST_RRCLASS,
|
|
|
- self.__datasrc_client))
|
|
|
+ TEST_RRCLASS, self._datasrc_client))
|
|
|
self.assertEqual(UPDATE_ERROR, session.handle()[0])
|
|
|
self.check_response(session.get_message(), Rcode.NOTIMP())
|
|
|
|
|
|
def check_notauth(self, zname, zclass=TEST_RRCLASS):
|
|
|
'''Common test sequence for the 'notauth' test'''
|
|
|
- msg_data, msg = create_update_msg(zones=[Question(zname, zclass,
|
|
|
- RRType.SOA())])
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT4,
|
|
|
+ msg = create_update_msg(zones=[Question(zname, zclass, RRType.SOA())])
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4,
|
|
|
ZoneConfig([(TEST_ZONE_NAME, TEST_RRCLASS)],
|
|
|
- TEST_RRCLASS,
|
|
|
- self.__datasrc_client))
|
|
|
+ TEST_RRCLASS, self._datasrc_client))
|
|
|
self.assertEqual(UPDATE_ERROR, session.handle()[0])
|
|
|
self.check_response(session.get_message(), Rcode.NOTAUTH())
|
|
|
|
|
@@ -151,10 +161,10 @@ class SessionTest(unittest.TestCase):
|
|
|
self.check_notauth(Name('example.org'), RRClass.CH())
|
|
|
|
|
|
def __prereq_helper(self, method, expected, rrset):
|
|
|
- '''Calls the given method with self.__datasrc_client
|
|
|
+ '''Calls the given method with self._datasrc_client
|
|
|
and the given rrset, and compares the return value.
|
|
|
Function does not do much but makes the code look nicer'''
|
|
|
- self.assertEqual(expected, method(self.__datasrc_client, rrset))
|
|
|
+ self.assertEqual(expected, method(self._datasrc_client, rrset))
|
|
|
|
|
|
def __check_prerequisite_exists_combined(self, method, rrclass, expected):
|
|
|
'''shared code for the checks for the very similar (but reversed
|
|
@@ -232,19 +242,19 @@ class SessionTest(unittest.TestCase):
|
|
|
self.__prereq_helper(method, expected, rrset)
|
|
|
|
|
|
def test_check_prerequisite_exists(self):
|
|
|
- method = self.__session._UpdateSession__prereq_rrset_exists
|
|
|
+ method = self._session._UpdateSession__prereq_rrset_exists
|
|
|
self.__check_prerequisite_exists_combined(method,
|
|
|
isc.dns.RRClass.ANY(),
|
|
|
True)
|
|
|
|
|
|
def test_check_prerequisite_does_not_exist(self):
|
|
|
- method = self.__session._UpdateSession__prereq_rrset_does_not_exist
|
|
|
+ method = self._session._UpdateSession__prereq_rrset_does_not_exist
|
|
|
self.__check_prerequisite_exists_combined(method,
|
|
|
isc.dns.RRClass.NONE(),
|
|
|
False)
|
|
|
|
|
|
def test_check_prerequisite_exists_value(self):
|
|
|
- method = self.__session._UpdateSession__prereq_rrset_exists_value
|
|
|
+ method = self._session._UpdateSession__prereq_rrset_exists_value
|
|
|
|
|
|
rrset = isc.dns.RRset(isc.dns.Name("www.example.org"),
|
|
|
isc.dns.RRClass.IN(), isc.dns.RRType.A(),
|
|
@@ -359,13 +369,13 @@ class SessionTest(unittest.TestCase):
|
|
|
self.__prereq_helper(method, expected, rrset)
|
|
|
|
|
|
def test_check_prerequisite_name_in_use(self):
|
|
|
- method = self.__session._UpdateSession__prereq_name_in_use
|
|
|
+ method = self._session._UpdateSession__prereq_name_in_use
|
|
|
self.__check_prerequisite_name_in_use_combined(method,
|
|
|
isc.dns.RRClass.ANY(),
|
|
|
True)
|
|
|
|
|
|
def test_check_prerequisite_name_not_in_use(self):
|
|
|
- method = self.__session._UpdateSession__prereq_name_not_in_use
|
|
|
+ method = self._session._UpdateSession__prereq_name_not_in_use
|
|
|
self.__check_prerequisite_name_in_use_combined(method,
|
|
|
isc.dns.RRClass.NONE(),
|
|
|
False)
|
|
@@ -375,15 +385,15 @@ class SessionTest(unittest.TestCase):
|
|
|
creates an update session, and fills it with the list of rrsets
|
|
|
from 'prerequisites'. Then checks if __check_prerequisites()
|
|
|
returns the Rcode specified in 'expected'.'''
|
|
|
- msg_data, msg = create_update_msg([TEST_ZONE_RECORD],
|
|
|
- prerequisites)
|
|
|
- zconfig = ZoneConfig([], TEST_RRCLASS, self.__datasrc_client)
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT4, zconfig)
|
|
|
+ msg = create_update_msg([TEST_ZONE_RECORD], prerequisites)
|
|
|
+ zconfig = ZoneConfig([], TEST_RRCLASS, self._datasrc_client,
|
|
|
+ self._acl_map)
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4, zconfig)
|
|
|
# compare the to_text output of the rcodes (nicer error messages)
|
|
|
# This call itself should also be done by handle(),
|
|
|
# but just for better failures, it is first called on its own
|
|
|
self.assertEqual(expected.to_text(),
|
|
|
- session._UpdateSession__check_prerequisites(self.__datasrc_client,
|
|
|
+ session._UpdateSession__check_prerequisites(self._datasrc_client,
|
|
|
TEST_ZONE_NAME,
|
|
|
TEST_RRCLASS).to_text())
|
|
|
# Now see if handle finds the same result
|
|
@@ -485,14 +495,14 @@ class SessionTest(unittest.TestCase):
|
|
|
isc.dns.RRTTL(0))
|
|
|
|
|
|
# Create an UPDATE with all 5 'yes' prereqs
|
|
|
- data, update = create_update_msg([TEST_ZONE_RECORD],
|
|
|
- [
|
|
|
- rrset_exists_yes,
|
|
|
- rrset_does_not_exist_yes,
|
|
|
- name_in_use_yes,
|
|
|
- name_not_in_use_yes,
|
|
|
- rrset_exists_value_yes,
|
|
|
- ])
|
|
|
+ create_update_msg([TEST_ZONE_RECORD],
|
|
|
+ [rrset_exists_yes,
|
|
|
+ rrset_does_not_exist_yes,
|
|
|
+ name_in_use_yes,
|
|
|
+ name_not_in_use_yes,
|
|
|
+ rrset_exists_value_yes,
|
|
|
+ ])
|
|
|
+
|
|
|
# check 'no' result codes
|
|
|
self.check_prerequisite_result(Rcode.NXRRSET(),
|
|
|
[ rrset_exists_no ])
|
|
@@ -599,6 +609,60 @@ class SessionTest(unittest.TestCase):
|
|
|
"foo"))
|
|
|
self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])
|
|
|
|
|
|
+class SessionACLTest(SesseionTestBase):
|
|
|
+ '''ACL related tests for update session.'''
|
|
|
+ def test_update_acl_check(self):
|
|
|
+ '''Test for various ACL checks.
|
|
|
+
|
|
|
+ Note that accepted cases are covered in the basic tests.
|
|
|
+
|
|
|
+ '''
|
|
|
+ # create a separate session, with default (empty) ACL map.
|
|
|
+ session = UpdateSession(self._update_msg,
|
|
|
+ TEST_CLIENT4, ZoneConfig([], TEST_RRCLASS,
|
|
|
+ self._datasrc_client))
|
|
|
+ # then the request should be rejected.
|
|
|
+ self.assertEqual((UPDATE_ERROR, None, None), session.handle())
|
|
|
+
|
|
|
+ # recreate the request message, and test with an ACL that would result
|
|
|
+ # in 'DROP'. get_message() should return None.
|
|
|
+ msg = create_update_msg()
|
|
|
+ acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
|
|
|
+ REQUEST_LOADER.load([{"action": "DROP", "from":
|
|
|
+ TEST_CLIENT4[0]}])}
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4,
|
|
|
+ ZoneConfig([], TEST_RRCLASS,
|
|
|
+ self._datasrc_client, acl_map))
|
|
|
+ self.assertEqual((UPDATE_DROP, None, None), session.handle())
|
|
|
+ self.assertEqual(None, session.get_message())
|
|
|
+
|
|
|
+ def test_update_tsigacl_check(self):
|
|
|
+ '''Test for various ACL checks using TSIG.'''
|
|
|
+ # This ACL will accept requests from TEST_CLIENT4 (any port) *and*
|
|
|
+ # has TSIG signed by TEST_ZONE_NAME; all others will be rejected.
|
|
|
+ acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
|
|
|
+ REQUEST_LOADER.load([{"action": "ACCEPT",
|
|
|
+ "from": TEST_CLIENT4[0],
|
|
|
+ "key": TEST_ZONE_NAME.to_text()}])}
|
|
|
+
|
|
|
+ # If the message doesn't contain TSIG, it doesn't match the ACCEPT
|
|
|
+ # ACL entry, and the request should be rejected.
|
|
|
+ session = UpdateSession(self._update_msg,
|
|
|
+ TEST_CLIENT4, ZoneConfig([], TEST_RRCLASS,
|
|
|
+ self._datasrc_client,
|
|
|
+ acl_map))
|
|
|
+ self.assertEqual((UPDATE_ERROR, None, None), session.handle())
|
|
|
+ self.check_response(session.get_message(), Rcode.REFUSED())
|
|
|
+
|
|
|
+ # If the message contains TSIG, it should match the ACCEPT
|
|
|
+ # ACL entry, and the request should be granted.
|
|
|
+ session = UpdateSession(create_update_msg(tsig_key=TEST_TSIG_KEY),
|
|
|
+ TEST_CLIENT4, ZoneConfig([], TEST_RRCLASS,
|
|
|
+ self._datasrc_client,
|
|
|
+ acl_map))
|
|
|
+ self.assertEqual((UPDATE_SUCCESS, TEST_ZONE_NAME, TEST_RRCLASS),
|
|
|
+ session.handle())
|
|
|
+
|
|
|
if __name__ == "__main__":
|
|
|
isc.log.init("bind10")
|
|
|
isc.log.resetUnitTestRootLogger()
|