|
@@ -32,7 +32,7 @@ else:
|
|
def factoryFromFile(message, file):
|
|
def factoryFromFile(message, file):
|
|
data = read_wire_data(file)
|
|
data = read_wire_data(file)
|
|
message.from_wire(data)
|
|
message.from_wire(data)
|
|
- pass
|
|
|
|
|
|
+ return data
|
|
|
|
|
|
# we don't have direct comparison for rrsets right now (should we?
|
|
# we don't have direct comparison for rrsets right now (should we?
|
|
# should go in the cpp version first then), so also no direct list
|
|
# should go in the cpp version first then), so also no direct list
|
|
@@ -45,6 +45,15 @@ def compare_rrset_list(list1, list2):
|
|
return False
|
|
return False
|
|
return True
|
|
return True
|
|
|
|
|
|
|
|
+# These are used for TSIG + TC tests
|
|
|
|
+LONG_TXT1 = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcde";
|
|
|
|
+
|
|
|
|
+LONG_TXT2 = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456";
|
|
|
|
+
|
|
|
|
+LONG_TXT3 = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef01";
|
|
|
|
+
|
|
|
|
+LONG_TXT4 = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0";
|
|
|
|
+
|
|
# a complete message taken from cpp tests, for testing towire and totext
|
|
# a complete message taken from cpp tests, for testing towire and totext
|
|
def create_message():
|
|
def create_message():
|
|
message_render = Message(Message.RENDER)
|
|
message_render = Message(Message.RENDER)
|
|
@@ -286,12 +295,18 @@ class MessageTest(unittest.TestCase):
|
|
self.assertRaises(InvalidMessageOperation, self.r.to_wire,
|
|
self.assertRaises(InvalidMessageOperation, self.r.to_wire,
|
|
MessageRenderer())
|
|
MessageRenderer())
|
|
|
|
|
|
- def __common_tsigquery_setup(self, flags=[Message.HEADERFLAG_RD],
|
|
|
|
- rrtype=RRType("A")):
|
|
|
|
|
|
+ def __common_tsigmessage_setup(self, flags=[Message.HEADERFLAG_RD],
|
|
|
|
+ rrtype=RRType("A"), answer_data=None):
|
|
self.r.set_opcode(Opcode.QUERY())
|
|
self.r.set_opcode(Opcode.QUERY())
|
|
self.r.set_rcode(Rcode.NOERROR())
|
|
self.r.set_rcode(Rcode.NOERROR())
|
|
for flag in flags:
|
|
for flag in flags:
|
|
self.r.set_header_flag(flag)
|
|
self.r.set_header_flag(flag)
|
|
|
|
+ if answer_data is not None:
|
|
|
|
+ rrset = RRset(Name("www.example.com"), RRClass("IN"),
|
|
|
|
+ rrtype, RRTTL(86400))
|
|
|
|
+ for rdata in answer_data:
|
|
|
|
+ rrset.add_rdata(Rdata(rrtype, RRClass("IN"), rdata))
|
|
|
|
+ self.r.add_rrset(Message.SECTION_ANSWER, rrset)
|
|
self.r.add_question(Question(Name("www.example.com"),
|
|
self.r.add_question(Question(Name("www.example.com"),
|
|
RRClass("IN"), rrtype))
|
|
RRClass("IN"), rrtype))
|
|
|
|
|
|
@@ -303,18 +318,75 @@ class MessageTest(unittest.TestCase):
|
|
def test_to_wire_with_tsig(self):
|
|
def test_to_wire_with_tsig(self):
|
|
fix_current_time(0x4da8877a)
|
|
fix_current_time(0x4da8877a)
|
|
self.r.set_qid(0x2d65)
|
|
self.r.set_qid(0x2d65)
|
|
- self.__common_tsigquery_setup()
|
|
|
|
|
|
+ self.__common_tsigmessage_setup()
|
|
self.__common_tsig_checks("message_toWire2.wire")
|
|
self.__common_tsig_checks("message_toWire2.wire")
|
|
|
|
|
|
def test_to_wire_with_edns_tsig(self):
|
|
def test_to_wire_with_edns_tsig(self):
|
|
fix_current_time(0x4db60d1f)
|
|
fix_current_time(0x4db60d1f)
|
|
self.r.set_qid(0x6cd)
|
|
self.r.set_qid(0x6cd)
|
|
- self.__common_tsigquery_setup()
|
|
|
|
|
|
+ self.__common_tsigmessage_setup()
|
|
edns = EDNS()
|
|
edns = EDNS()
|
|
edns.set_udp_size(4096)
|
|
edns.set_udp_size(4096)
|
|
self.r.set_edns(edns)
|
|
self.r.set_edns(edns)
|
|
self.__common_tsig_checks("message_toWire3.wire")
|
|
self.__common_tsig_checks("message_toWire3.wire")
|
|
|
|
|
|
|
|
+ def test_to_wire_tsig_truncation(self):
|
|
|
|
+ fix_current_time(0x4e179212)
|
|
|
|
+ data = factoryFromFile(self.p, "message_fromWire17.wire")
|
|
|
|
+ self.assertEqual(TSIGError.NOERROR,
|
|
|
|
+ self.tsig_ctx.verify(self.p.get_tsig_record(), data))
|
|
|
|
+ self.r.set_qid(0x22c2)
|
|
|
|
+ self.__common_tsigmessage_setup([Message.HEADERFLAG_QR,
|
|
|
|
+ Message.HEADERFLAG_AA,
|
|
|
|
+ Message.HEADERFLAG_RD],
|
|
|
|
+ RRType("TXT"),
|
|
|
|
+ [LONG_TXT1, LONG_TXT2])
|
|
|
|
+ self.__common_tsig_checks("message_toWire4.wire")
|
|
|
|
+
|
|
|
|
+ def test_to_wire_tsig_truncation2(self):
|
|
|
|
+ fix_current_time(0x4e179212)
|
|
|
|
+ data = factoryFromFile(self.p, "message_fromWire17.wire")
|
|
|
|
+ self.assertEqual(TSIGError.NOERROR,
|
|
|
|
+ self.tsig_ctx.verify(self.p.get_tsig_record(), data))
|
|
|
|
+ self.r.set_qid(0x22c2)
|
|
|
|
+ self.__common_tsigmessage_setup([Message.HEADERFLAG_QR,
|
|
|
|
+ Message.HEADERFLAG_AA,
|
|
|
|
+ Message.HEADERFLAG_RD],
|
|
|
|
+ RRType("TXT"),
|
|
|
|
+ [LONG_TXT1, LONG_TXT3])
|
|
|
|
+ self.__common_tsig_checks("message_toWire4.wire")
|
|
|
|
+
|
|
|
|
+ def test_to_wire_tsig_no_truncation(self):
|
|
|
|
+ fix_current_time(0x4e17b38d)
|
|
|
|
+ data = factoryFromFile(self.p, "message_fromWire18.wire")
|
|
|
|
+ self.assertEqual(TSIGError.NOERROR,
|
|
|
|
+ self.tsig_ctx.verify(self.p.get_tsig_record(), data))
|
|
|
|
+ self.r.set_qid(0xd6e2)
|
|
|
|
+ self.__common_tsigmessage_setup([Message.HEADERFLAG_QR,
|
|
|
|
+ Message.HEADERFLAG_AA,
|
|
|
|
+ Message.HEADERFLAG_RD],
|
|
|
|
+ RRType("TXT"),
|
|
|
|
+ [LONG_TXT1, LONG_TXT4])
|
|
|
|
+ self.__common_tsig_checks("message_toWire5.wire")
|
|
|
|
+
|
|
|
|
+ def test_to_wire_tsig_length_errors(self):
|
|
|
|
+ renderer = MessageRenderer()
|
|
|
|
+ renderer.set_length_limit(84) # 84 = expected TSIG length - 1
|
|
|
|
+ self.__common_tsigmessage_setup()
|
|
|
|
+ self.assertRaises(TSIGContextError,
|
|
|
|
+ self.r.to_wire, renderer, self.tsig_ctx)
|
|
|
|
+
|
|
|
|
+ renderer.clear()
|
|
|
|
+ self.r.clear(Message.RENDER)
|
|
|
|
+ renderer.set_length_limit(86) # 86 = expected TSIG length + 1
|
|
|
|
+ self.__common_tsigmessage_setup()
|
|
|
|
+ self.assertRaises(TSIGContextError,
|
|
|
|
+ self.r.to_wire, renderer, self.tsig_ctx)
|
|
|
|
+
|
|
|
|
+ # skip the last test of the corresponding C++ test: it requires
|
|
|
|
+ # subclassing MessageRenderer, which is (currently) not possible
|
|
|
|
+ # for python. In any case, it's very unlikely to happen in practice.
|
|
|
|
+
|
|
def test_to_text(self):
|
|
def test_to_text(self):
|
|
message_render = create_message()
|
|
message_render = create_message()
|
|
|
|
|