|
@@ -35,8 +35,10 @@ TEST_RRCLASS = RRClass.IN()
|
|
|
TEST_ZONE_RECORD = Question(TEST_ZONE_NAME, TEST_RRCLASS, UPDATE_RRTYPE)
|
|
|
TEST_CLIENT6 = ('2001:db8::1', 53, 0, 0)
|
|
|
TEST_CLIENT4 = ('192.0.2.1', 53)
|
|
|
+# TSIG key for tests when needed. The key name is TEST_ZONE_NAME.
|
|
|
+TEST_TSIG_KEY = TSIGKey("example.org:SFuWd/q99SzF8Yzd1QbB9g==")
|
|
|
|
|
|
-def create_update_msg(zones=[TEST_ZONE_RECORD]):
|
|
|
+def create_update_msg(zones=[TEST_ZONE_RECORD], tsig_key=None):
|
|
|
msg = Message(Message.RENDER)
|
|
|
msg.set_qid(5353) # arbitrary chosen
|
|
|
msg.set_opcode(Opcode.UPDATE())
|
|
@@ -45,35 +47,36 @@ def create_update_msg(zones=[TEST_ZONE_RECORD]):
|
|
|
msg.add_question(z)
|
|
|
|
|
|
renderer = MessageRenderer()
|
|
|
- msg.to_wire(renderer)
|
|
|
+ if tsig_key is not None:
|
|
|
+ msg.to_wire(renderer, TSIGContext(tsig_key))
|
|
|
+ else:
|
|
|
+ msg.to_wire(renderer)
|
|
|
|
|
|
# re-read the created data in the parse mode
|
|
|
msg.clear(Message.PARSE)
|
|
|
msg.from_wire(renderer.get_data())
|
|
|
|
|
|
- return renderer.get_data(), msg
|
|
|
+ return msg
|
|
|
|
|
|
class SesseionTestBase(unittest.TestCase):
|
|
|
'''Base class for all sesion related tests.
|
|
|
|
|
|
- It just initializes common test parameters in its setUp().
|
|
|
+ It just initializes common test parameters in its setUp() and defines
|
|
|
+ some common utility method(s).
|
|
|
|
|
|
'''
|
|
|
def setUp(self):
|
|
|
shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
|
|
|
self._datasrc_client = DataSourceClient("sqlite3",
|
|
|
WRITE_ZONE_DB_CONFIG)
|
|
|
- self._update_msgdata, self._update_msg = create_update_msg()
|
|
|
+ self._update_msg = create_update_msg()
|
|
|
acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
|
|
|
REQUEST_LOADER.load([{"action": "ACCEPT"}])}
|
|
|
- self._session = UpdateSession(self._update_msg,
|
|
|
- self._update_msgdata, TEST_CLIENT4,
|
|
|
+ self._session = UpdateSession(self._update_msg, TEST_CLIENT4,
|
|
|
ZoneConfig([], TEST_RRCLASS,
|
|
|
self._datasrc_client,
|
|
|
acl_map))
|
|
|
|
|
|
-class SessionTest(SesseionTestBase):
|
|
|
- '''Basic session tests'''
|
|
|
def check_response(self, msg, expected_rcode):
|
|
|
'''Perform common checks on update resposne message.'''
|
|
|
self.assertTrue(msg.get_header_flag(Message.HEADERFLAG_QR))
|
|
@@ -86,6 +89,9 @@ class SessionTest(SesseionTestBase):
|
|
|
self.assertEqual(0, msg.get_rr_count(SECTION_UPDATE))
|
|
|
self.assertEqual(0, msg.get_rr_count(Message.SECTION_ADDITIONAL))
|
|
|
|
|
|
+class SessionTest(SesseionTestBase):
|
|
|
+ '''Basic session tests'''
|
|
|
+
|
|
|
def test_handle(self):
|
|
|
'''Basic update case'''
|
|
|
result, zname, zclass = self._session.handle()
|
|
@@ -99,8 +105,8 @@ class SessionTest(SesseionTestBase):
|
|
|
|
|
|
def test_broken_request(self):
|
|
|
# Zone section is empty
|
|
|
- msg_data, msg = create_update_msg(zones=[])
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT6, None)
|
|
|
+ msg = create_update_msg(zones=[])
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT6, None)
|
|
|
result, zname, zclass = session.handle()
|
|
|
self.assertEqual(UPDATE_ERROR, result)
|
|
|
self.assertEqual(None, zname)
|
|
@@ -108,17 +114,15 @@ class SessionTest(SesseionTestBase):
|
|
|
self.check_response(session.get_message(), Rcode.FORMERR())
|
|
|
|
|
|
# Zone section contains multiple records
|
|
|
- msg_data, msg = create_update_msg(zones=[TEST_ZONE_RECORD,
|
|
|
- TEST_ZONE_RECORD])
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT4, None)
|
|
|
+ msg = create_update_msg(zones=[TEST_ZONE_RECORD, TEST_ZONE_RECORD])
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4, None)
|
|
|
self.assertEqual(UPDATE_ERROR, session.handle()[0])
|
|
|
self.check_response(session.get_message(), Rcode.FORMERR())
|
|
|
|
|
|
# Zone section's type is not SOA
|
|
|
- msg_data, msg = create_update_msg(zones=[Question(TEST_ZONE_NAME,
|
|
|
- TEST_RRCLASS,
|
|
|
- RRType.A())])
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT4, None)
|
|
|
+ msg = create_update_msg(zones=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
+ RRType.A())])
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4, None)
|
|
|
self.assertEqual(UPDATE_ERROR, session.handle()[0])
|
|
|
self.check_response(session.get_message(), Rcode.FORMERR())
|
|
|
|
|
@@ -126,24 +130,20 @@ class SessionTest(SesseionTestBase):
|
|
|
# specified zone is configured as a secondary. Since this
|
|
|
# implementation doesn't support update forwarding, the result
|
|
|
# should be NOTIMP.
|
|
|
- msg_data, msg = create_update_msg(zones=[Question(TEST_ZONE_NAME,
|
|
|
- TEST_RRCLASS,
|
|
|
- RRType.SOA())])
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT4,
|
|
|
+ msg = create_update_msg(zones=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
+ RRType.SOA())])
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4,
|
|
|
ZoneConfig([(TEST_ZONE_NAME, TEST_RRCLASS)],
|
|
|
- TEST_RRCLASS,
|
|
|
- self._datasrc_client))
|
|
|
+ TEST_RRCLASS, self._datasrc_client))
|
|
|
self.assertEqual(UPDATE_ERROR, session.handle()[0])
|
|
|
self.check_response(session.get_message(), Rcode.NOTIMP())
|
|
|
|
|
|
def check_notauth(self, zname, zclass=TEST_RRCLASS):
|
|
|
'''Common test sequence for the 'notauth' test'''
|
|
|
- msg_data, msg = create_update_msg(zones=[Question(zname, zclass,
|
|
|
- RRType.SOA())])
|
|
|
- session = UpdateSession(msg, msg_data, TEST_CLIENT4,
|
|
|
+ msg = create_update_msg(zones=[Question(zname, zclass, RRType.SOA())])
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4,
|
|
|
ZoneConfig([(TEST_ZONE_NAME, TEST_RRCLASS)],
|
|
|
- TEST_RRCLASS,
|
|
|
- self._datasrc_client))
|
|
|
+ TEST_RRCLASS, self._datasrc_client))
|
|
|
self.assertEqual(UPDATE_ERROR, session.handle()[0])
|
|
|
self.check_response(session.get_message(), Rcode.NOTAUTH())
|
|
|
|
|
@@ -166,7 +166,7 @@ class SessionACLTest(SesseionTestBase):
|
|
|
|
|
|
'''
|
|
|
# create a separate session, with default (empty) ACL map.
|
|
|
- session = UpdateSession(self._update_msg, self._update_msgdata,
|
|
|
+ session = UpdateSession(self._update_msg,
|
|
|
TEST_CLIENT4, ZoneConfig([], TEST_RRCLASS,
|
|
|
self._datasrc_client))
|
|
|
# then the request should be rejected.
|
|
@@ -174,16 +174,43 @@ class SessionACLTest(SesseionTestBase):
|
|
|
|
|
|
# recreate the request message, and test with an ACL that would result
|
|
|
# in 'DROP'. get_message() should return None.
|
|
|
- msgdata, msg = create_update_msg()
|
|
|
+ msg = create_update_msg()
|
|
|
acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
|
|
|
REQUEST_LOADER.load([{"action": "DROP", "from":
|
|
|
TEST_CLIENT4[0]}])}
|
|
|
- session = UpdateSession(msg, msgdata, TEST_CLIENT4,
|
|
|
+ session = UpdateSession(msg, TEST_CLIENT4,
|
|
|
ZoneConfig([], TEST_RRCLASS,
|
|
|
self._datasrc_client, acl_map))
|
|
|
self.assertEqual((UPDATE_DROP, None, None), session.handle())
|
|
|
self.assertEqual(None, session.get_message())
|
|
|
|
|
|
+ def test_update_tsigacl_check(self):
|
|
|
+ '''Test for various ACL checks using TSIG.'''
|
|
|
+ # This ACL will accept requests from TEST_CLIENT4 (any port) *and*
|
|
|
+ # has TSIG signed by TEST_ZONE_NAME; all others will be rejected.
|
|
|
+ acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
|
|
|
+ REQUEST_LOADER.load([{"action": "ACCEPT",
|
|
|
+ "from": TEST_CLIENT4[0],
|
|
|
+ "key": TEST_ZONE_NAME.to_text()}])}
|
|
|
+
|
|
|
+ # If the message doesn't contain TSIG, it doesn't match the ACCEPT
|
|
|
+ # ACL entry, and the request should be rejected.
|
|
|
+ session = UpdateSession(self._update_msg,
|
|
|
+ TEST_CLIENT4, ZoneConfig([], TEST_RRCLASS,
|
|
|
+ self._datasrc_client,
|
|
|
+ acl_map))
|
|
|
+ self.assertEqual((UPDATE_ERROR, None, None), session.handle())
|
|
|
+ self.check_response(session.get_message(), Rcode.REFUSED())
|
|
|
+
|
|
|
+ # If the message contains TSIG, it should match the ACCEPT
|
|
|
+ # ACL entry, and the request should be granted.
|
|
|
+ session = UpdateSession(create_update_msg(tsig_key=TEST_TSIG_KEY),
|
|
|
+ TEST_CLIENT4, ZoneConfig([], TEST_RRCLASS,
|
|
|
+ self._datasrc_client,
|
|
|
+ acl_map))
|
|
|
+ self.assertEqual((UPDATE_SUCCESS, TEST_ZONE_NAME, TEST_RRCLASS),
|
|
|
+ session.handle())
|
|
|
+
|
|
|
if __name__ == "__main__":
|
|
|
isc.log.init("bind10")
|
|
|
isc.log.resetUnitTestRootLogger()
|