|
@@ -172,6 +172,55 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
self.assertEqual(rcode.to_text(), "NOTAUTH")
|
|
|
self.assertTrue(self.xfrsess._tsig_ctx is not None)
|
|
|
|
|
|
+ # ACL using TSIG: successful case
|
|
|
+ self.xfrsess._acl = isc.acl.dns.REQUEST_LOADER.load([
|
|
|
+ {"key": "example.com", "action": "ACCEPT"}, {"action": "REJECT"}
|
|
|
+ ])
|
|
|
+ self.xfrsess._tsig_key_ring.add(TSIG_KEY)
|
|
|
+ [rcode, msg] = self.xfrsess._parse_query_message(request_data)
|
|
|
+ self.assertEqual(rcode.to_text(), "NOERROR")
|
|
|
+
|
|
|
+ # ACL using TSIG: key name doesn't match; should be rejected
|
|
|
+ self.xfrsess._acl = isc.acl.dns.REQUEST_LOADER.load([
|
|
|
+ {"key": "example.org", "action": "ACCEPT"}, {"action": "REJECT"}
|
|
|
+ ])
|
|
|
+ self.xfrsess._tsig_key_ring.add(TSIG_KEY)
|
|
|
+ [rcode, msg] = self.xfrsess._parse_query_message(request_data)
|
|
|
+ self.assertEqual(rcode.to_text(), "REFUSED")
|
|
|
+
|
|
|
+ # ACL using TSIG: no TSIG; should be rejected
|
|
|
+ self.xfrsess._acl = isc.acl.dns.REQUEST_LOADER.load([
|
|
|
+ {"key": "example.org", "action": "ACCEPT"}, {"action": "REJECT"}
|
|
|
+ ])
|
|
|
+ self.xfrsess._tsig_key_ring.add(TSIG_KEY)
|
|
|
+ [rcode, msg] = self.xfrsess._parse_query_message(self.mdata)
|
|
|
+ self.assertEqual(rcode.to_text(), "REFUSED")
|
|
|
+
|
|
|
+ #
|
|
|
+ # ACL using IP + TSIG: both should match
|
|
|
+ #
|
|
|
+ self.xfrsess._acl = isc.acl.dns.REQUEST_LOADER.load([
|
|
|
+ {"ALL": [{"key": "example.com"}, {"from": "192.0.2.1"}],
|
|
|
+ "action": "ACCEPT"},
|
|
|
+ {"action": "REJECT"}
|
|
|
+ ])
|
|
|
+ # both matches
|
|
|
+ self.xfrsess._remote = ('192.0.2.1', 12345)
|
|
|
+ [rcode, msg] = self.xfrsess._parse_query_message(request_data)
|
|
|
+ self.assertEqual(rcode.to_text(), "NOERROR")
|
|
|
+ # TSIG matches, but address doesn't
|
|
|
+ self.xfrsess._remote = ('192.0.2.2', 12345)
|
|
|
+ [rcode, msg] = self.xfrsess._parse_query_message(request_data)
|
|
|
+ self.assertEqual(rcode.to_text(), "REFUSED")
|
|
|
+ # Address matches, but TSIG doesn't (not included)
|
|
|
+ self.xfrsess._remote = ('192.0.2.1', 12345)
|
|
|
+ [rcode, msg] = self.xfrsess._parse_query_message(self.mdata)
|
|
|
+ self.assertEqual(rcode.to_text(), "REFUSED")
|
|
|
+ # Neither address nor TSIG matches
|
|
|
+ self.xfrsess._remote = ('192.0.2.2', 12345)
|
|
|
+ [rcode, msg] = self.xfrsess._parse_query_message(self.mdata)
|
|
|
+ self.assertEqual(rcode.to_text(), "REFUSED")
|
|
|
+
|
|
|
def test_get_query_zone_name(self):
|
|
|
msg = self.getmsg()
|
|
|
self.assertEqual(self.xfrsess._get_query_zone_name(msg), "example.com.")
|