|
@@ -141,6 +141,29 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
self.assertEqual(rcode.to_text(), "NOERROR")
|
|
|
self.assertTrue(self.xfrsess._tsig_ctx is not None)
|
|
|
|
|
|
+ # ACL checks, put some ACL inside
|
|
|
+ self.xfrsess._acl = isc.acl.dns.REQUEST_LOADER.load([
|
|
|
+ {
|
|
|
+ "from": "127.0.0.1",
|
|
|
+ "action": "ACCEPT"
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "from": "192.0.2.1",
|
|
|
+ "action": "DROP"
|
|
|
+ }
|
|
|
+ ])
|
|
|
+ # Localhost (the default in this test) is accepted
|
|
|
+ rcode, msg = self.xfrsess._parse_query_message(self.mdata)
|
|
|
+ self.assertEqual(rcode.to_text(), "NOERROR")
|
|
|
+ # This should be dropped completely, therefore returning None
|
|
|
+ self.xfrsess._remote = ('192.0.2.1', 12345)
|
|
|
+ rcode, msg = self.xfrsess._parse_query_message(self.mdata)
|
|
|
+ self.assertTrue(rcode is None)
|
|
|
+ # This should be rejected, therefore NOTAUTH
|
|
|
+ self.xfrsess._remote = ('192.0.2.2', 12345)
|
|
|
+ rcode, msg = self.xfrsess._parse_query_message(self.mdata)
|
|
|
+ self.assertEqual(rcode.to_text(), "REFUSED")
|
|
|
+
|
|
|
def test_get_query_zone_name(self):
|
|
|
msg = self.getmsg()
|
|
|
self.assertEqual(self.xfrsess._get_query_zone_name(msg), "example.com.")
|