|
@@ -53,18 +53,27 @@ def create_update_msg(zones=[TEST_ZONE_RECORD]):
|
|
|
|
|
|
return renderer.get_data(), msg
|
|
|
|
|
|
-class SessionTest(unittest.TestCase):
|
|
|
- '''Session tests'''
|
|
|
+class SesseionTestBase(unittest.TestCase):
|
|
|
+ '''Base class for all sesion related tests.
|
|
|
+
|
|
|
+ It just initializes common test parameters in its setUp().
|
|
|
+
|
|
|
+ '''
|
|
|
def setUp(self):
|
|
|
shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
|
|
|
- self.__datasrc_client = DataSourceClient("sqlite3",
|
|
|
- WRITE_ZONE_DB_CONFIG)
|
|
|
- self.__update_msgdata, self.__update_msg = create_update_msg()
|
|
|
- self.__session = UpdateSession(self.__update_msg,
|
|
|
- self.__update_msgdata, TEST_CLIENT4,
|
|
|
+ self._datasrc_client = DataSourceClient("sqlite3",
|
|
|
+ WRITE_ZONE_DB_CONFIG)
|
|
|
+ self._update_msgdata, self._update_msg = create_update_msg()
|
|
|
+ acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
|
|
|
+ REQUEST_LOADER.load([{"action": "ACCEPT"}])}
|
|
|
+ self._session = UpdateSession(self._update_msg,
|
|
|
+ self._update_msgdata, TEST_CLIENT4,
|
|
|
ZoneConfig([], TEST_RRCLASS,
|
|
|
- self.__datasrc_client))
|
|
|
+ self._datasrc_client,
|
|
|
+ acl_map))
|
|
|
|
|
|
+class SessionTest(SesseionTestBase):
|
|
|
+ '''Basic session tests'''
|
|
|
def check_response(self, msg, expected_rcode):
|
|
|
'''Perform common checks on update resposne message.'''
|
|
|
self.assertTrue(msg.get_header_flag(Message.HEADERFLAG_QR))
|
|
@@ -79,7 +88,7 @@ class SessionTest(unittest.TestCase):
|
|
|
|
|
|
def test_handle(self):
|
|
|
'''Basic update case'''
|
|
|
- result, zname, zclass = self.__session.handle()
|
|
|
+ result, zname, zclass = self._session.handle()
|
|
|
self.assertEqual(UPDATE_SUCCESS, result)
|
|
|
self.assertEqual(TEST_ZONE_NAME, zname)
|
|
|
self.assertEqual(TEST_RRCLASS, zclass)
|
|
@@ -123,7 +132,7 @@ class SessionTest(unittest.TestCase):
|
|
|
session = UpdateSession(msg, msg_data, TEST_CLIENT4,
|
|
|
ZoneConfig([(TEST_ZONE_NAME, TEST_RRCLASS)],
|
|
|
TEST_RRCLASS,
|
|
|
- self.__datasrc_client))
|
|
|
+ self._datasrc_client))
|
|
|
self.assertEqual(UPDATE_ERROR, session.handle()[0])
|
|
|
self.check_response(session.get_message(), Rcode.NOTIMP())
|
|
|
|
|
@@ -134,7 +143,7 @@ class SessionTest(unittest.TestCase):
|
|
|
session = UpdateSession(msg, msg_data, TEST_CLIENT4,
|
|
|
ZoneConfig([(TEST_ZONE_NAME, TEST_RRCLASS)],
|
|
|
TEST_RRCLASS,
|
|
|
- self.__datasrc_client))
|
|
|
+ self._datasrc_client))
|
|
|
self.assertEqual(UPDATE_ERROR, session.handle()[0])
|
|
|
self.check_response(session.get_message(), Rcode.NOTAUTH())
|
|
|
|
|
@@ -148,6 +157,24 @@ class SessionTest(unittest.TestCase):
|
|
|
# zone class doesn't match
|
|
|
self.check_notauth(Name('example.org'), RRClass.CH())
|
|
|
|
|
|
+class SessionACLTest(SesseionTestBase):
|
|
|
+ '''ACL related tests for update session.'''
|
|
|
+ def test_update_acl_check(self):
|
|
|
+ '''Test for various ACL checks.
|
|
|
+
|
|
|
+ Note that accepted cases are covered in the basic tests.
|
|
|
+
|
|
|
+ '''
|
|
|
+ # create a separate session, with default (empty) ACL map.
|
|
|
+ session = UpdateSession(self._update_msg, self._update_msgdata,
|
|
|
+ TEST_CLIENT4, ZoneConfig([], TEST_RRCLASS,
|
|
|
+ self._datasrc_client))
|
|
|
+ # then the request should be rejected.
|
|
|
+ result, zname, zclass = session.handle()
|
|
|
+ self.assertEqual(UPDATE_ERROR, result)
|
|
|
+ self.assertEqual(None, zname)
|
|
|
+ self.assertEqual(None, zclass)
|
|
|
+
|
|
|
if __name__ == "__main__":
|
|
|
isc.log.init("bind10")
|
|
|
isc.log.resetUnitTestRootLogger()
|