|
@@ -104,15 +104,15 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
def message_has_tsig(self, msg):
|
|
|
return msg.get_tsig_record() is not None
|
|
|
|
|
|
- def create_request_data(self, with_tsig=False):
|
|
|
+ def create_request_data(self, with_question=True, with_tsig=False):
|
|
|
msg = Message(Message.RENDER)
|
|
|
query_id = 0x1035
|
|
|
msg.set_qid(query_id)
|
|
|
msg.set_opcode(Opcode.QUERY())
|
|
|
msg.set_rcode(Rcode.NOERROR())
|
|
|
- query_question = Question(Name("example.com"), RRClass.IN(),
|
|
|
- RRType.AXFR())
|
|
|
- msg.add_question(query_question)
|
|
|
+ if with_question:
|
|
|
+ msg.add_question(Question(Name("example.com"), RRClass.IN(),
|
|
|
+ RRType.AXFR()))
|
|
|
|
|
|
renderer = MessageRenderer()
|
|
|
if with_tsig:
|
|
@@ -131,7 +131,7 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
isc.acl.dns.REQUEST_LOADER.load(
|
|
|
[{"action": "ACCEPT"}]),
|
|
|
{})
|
|
|
- self.mdata = self.create_request_data(False)
|
|
|
+ self.mdata = self.create_request_data()
|
|
|
self.soa_rrset = RRset(Name('example.com'), RRClass.IN(), RRType.SOA(),
|
|
|
RRTTL(3600))
|
|
|
self.soa_rrset.add_rdata(Rdata(RRType.SOA(), RRClass.IN(),
|
|
@@ -143,8 +143,13 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
[get_rcode, get_msg] = self.xfrsess._parse_query_message(self.mdata)
|
|
|
self.assertEqual(get_rcode.to_text(), "NOERROR")
|
|
|
|
|
|
+ # Broken request: no question
|
|
|
+ request_data = self.create_request_data(with_question=False)
|
|
|
+ rcode, msg = self.xfrsess._parse_query_message(request_data)
|
|
|
+ self.assertEqual(Rcode.FORMERR(), rcode)
|
|
|
+
|
|
|
# tsig signed query message
|
|
|
- request_data = self.create_request_data(True)
|
|
|
+ request_data = self.create_request_data(with_tsig=True)
|
|
|
# BADKEY
|
|
|
[rcode, msg] = self.xfrsess._parse_query_message(request_data)
|
|
|
self.assertEqual(rcode.to_text(), "NOTAUTH")
|
|
@@ -181,7 +186,7 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
self.assertEqual(rcode.to_text(), "REFUSED")
|
|
|
|
|
|
# TSIG signed request
|
|
|
- request_data = self.create_request_data(True)
|
|
|
+ request_data = self.create_request_data(with_tsig=True)
|
|
|
|
|
|
# If the TSIG check fails, it should not check ACL
|
|
|
# (If it checked ACL as well, it would just drop the request)
|