|
@@ -1,4 +1,4 @@
|
|
|
-# Copyright (C) 2009 Internet Systems Consortium.
|
|
|
+# Copyright (C) 2010 Internet Systems Consortium.
|
|
|
#
|
|
|
# Permission to use, copy, modify, and distribute this software for any
|
|
|
# purpose with or without fee is hereby granted, provided that the above
|
|
@@ -14,7 +14,7 @@
|
|
|
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
|
|
|
|
#
|
|
|
-# Tests for the rrtype part of the libdns_python module
|
|
|
+# Tests for the message part of the libdns_python module
|
|
|
#
|
|
|
|
|
|
import unittest
|
|
@@ -59,7 +59,7 @@ class OpcodeTest(unittest.TestCase):
|
|
|
|
|
|
def test_to_text(self):
|
|
|
self.assertEqual("QUERY", Opcode.QUERY().to_text())
|
|
|
- self.assertEqual("QUERY", Opcode.QUERY().__str__())
|
|
|
+ self.assertEqual("QUERY", str(Opcode.QUERY()))
|
|
|
self.assertEqual("IQUERY", Opcode.IQUERY().to_text())
|
|
|
self.assertEqual("STATUS", Opcode.STATUS().to_text())
|
|
|
self.assertEqual("RESERVED3", Opcode.RESERVED3().to_text())
|
|
@@ -81,24 +81,29 @@ class OpcodeTest(unittest.TestCase):
|
|
|
o2 = Opcode.NOTIFY()
|
|
|
o3 = Opcode.NOTIFY()
|
|
|
self.assertTrue(o2 == o3)
|
|
|
+ self.assertFalse(o2 != o3)
|
|
|
self.assertTrue(o1 != o2)
|
|
|
self.assertFalse(o1 == 1)
|
|
|
self.assertFalse(o1 == o2)
|
|
|
# can't use assertRaises here...
|
|
|
try:
|
|
|
o1 < o2
|
|
|
+ self.fail("operation that should have raised an error unexpectedly succeeded")
|
|
|
except Exception as err:
|
|
|
self.assertEqual(TypeError, type(err))
|
|
|
try:
|
|
|
o1 <= o2
|
|
|
+ self.fail("operation that should have raised an error unexpectedly succeeded")
|
|
|
except Exception as err:
|
|
|
self.assertEqual(TypeError, type(err))
|
|
|
try:
|
|
|
o1 > o2
|
|
|
+ self.fail("operation that should have raised an error unexpectedly succeeded")
|
|
|
except Exception as err:
|
|
|
self.assertEqual(TypeError, type(err))
|
|
|
try:
|
|
|
o1 >= o2
|
|
|
+ self.fail("operation that should have raised an error unexpectedly succeeded")
|
|
|
except Exception as err:
|
|
|
self.assertEqual(TypeError, type(err))
|
|
|
|
|
@@ -106,6 +111,14 @@ class RcodeTest(unittest.TestCase):
|
|
|
def test_init(self):
|
|
|
self.assertRaises(TypeError, Rcode, "wrong")
|
|
|
self.assertRaises(OverflowError, Rcode, 65536)
|
|
|
+ self.assertEqual(Rcode(0).get_code(), 0)
|
|
|
+
|
|
|
+ self.assertEqual(0, Rcode(0).get_code())
|
|
|
+ self.assertEqual(0xfff, Rcode(0xfff).get_code()) # possible max code
|
|
|
+
|
|
|
+ # should fail on attempt of construction with an out of range code
|
|
|
+ self.assertRaises(OverflowError, Rcode, 0x1000)
|
|
|
+ self.assertRaises(OverflowError, Rcode, 0xffff)
|
|
|
|
|
|
def test_get_code(self):
|
|
|
self.assertEqual(0, Rcode.NOERROR().get_code())
|
|
@@ -124,6 +137,7 @@ class RcodeTest(unittest.TestCase):
|
|
|
self.assertEqual(13, Rcode.RESERVED13().get_code())
|
|
|
self.assertEqual(14, Rcode.RESERVED14().get_code())
|
|
|
self.assertEqual(15, Rcode.RESERVED15().get_code())
|
|
|
+ self.assertEqual(16, Rcode.BADVERS().get_code())
|
|
|
|
|
|
def test_to_text(self):
|
|
|
self.assertEqual("NOERROR", Rcode(0).to_text())
|
|
@@ -143,7 +157,11 @@ class RcodeTest(unittest.TestCase):
|
|
|
self.assertEqual("RESERVED13", Rcode(13).to_text())
|
|
|
self.assertEqual("RESERVED14", Rcode(14).to_text())
|
|
|
self.assertEqual("RESERVED15", Rcode(15).to_text())
|
|
|
+ self.assertEqual("BADVERS", Rcode(16).to_text())
|
|
|
|
|
|
+ self.assertEqual("17", Rcode(Rcode.BADVERS().get_code() + 1).to_text())
|
|
|
+ self.assertEqual("4095", Rcode(0xfff).to_text())
|
|
|
+
|
|
|
def test_richcmp(self):
|
|
|
r1 = Rcode.NOERROR()
|
|
|
r2 = Rcode.FORMERR()
|
|
@@ -155,18 +173,22 @@ class RcodeTest(unittest.TestCase):
|
|
|
# can't use assertRaises here...
|
|
|
try:
|
|
|
r1 < r2
|
|
|
+ self.fail("operation that should have raised an error unexpectedly succeeded")
|
|
|
except Exception as err:
|
|
|
self.assertEqual(TypeError, type(err))
|
|
|
try:
|
|
|
r1 <= r2
|
|
|
+ self.fail("operation that should have raised an error unexpectedly succeeded")
|
|
|
except Exception as err:
|
|
|
self.assertEqual(TypeError, type(err))
|
|
|
try:
|
|
|
r1 > r2
|
|
|
+ self.fail("operation that should have raised an error unexpectedly succeeded")
|
|
|
except Exception as err:
|
|
|
self.assertEqual(TypeError, type(err))
|
|
|
try:
|
|
|
r1 >= r2
|
|
|
+ self.fail("operation that should have raised an error unexpectedly succeeded")
|
|
|
except Exception as err:
|
|
|
self.assertEqual(TypeError, type(err))
|
|
|
|
|
@@ -192,22 +214,79 @@ class SectionTest(unittest.TestCase):
|
|
|
# can't use assertRaises here...
|
|
|
try:
|
|
|
s1 < s2
|
|
|
+ self.fail("operation that should have raised an error unexpectedly succeeded")
|
|
|
except Exception as err:
|
|
|
self.assertEqual(TypeError, type(err))
|
|
|
try:
|
|
|
s1 <= s2
|
|
|
+ self.fail("operation that should have raised an error unexpectedly succeeded")
|
|
|
except Exception as err:
|
|
|
self.assertEqual(TypeError, type(err))
|
|
|
try:
|
|
|
s1 > s2
|
|
|
+ self.fail("operation that should have raised an error unexpectedly succeeded")
|
|
|
except Exception as err:
|
|
|
self.assertEqual(TypeError, type(err))
|
|
|
try:
|
|
|
s1 >= s2
|
|
|
+ self.fail("operation that should have raised an error unexpectedly succeeded")
|
|
|
except Exception as err:
|
|
|
self.assertEqual(TypeError, type(err))
|
|
|
|
|
|
|
|
|
+# helper functions for tests taken from c++ unittests
|
|
|
+if "TESTDATA_PATH" in os.environ:
|
|
|
+ testdata_path = os.environ["TESTDATA_PATH"]
|
|
|
+else:
|
|
|
+ testdata_path = "../tests/testdata"
|
|
|
+
|
|
|
+def read_wire_data(filename):
|
|
|
+ data = bytes()
|
|
|
+ file = open(testdata_path + os.sep + filename, "r")
|
|
|
+ for line in file:
|
|
|
+ line = line.strip()
|
|
|
+ if line == "" or line.startswith("#"):
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ cur_data = bytes.fromhex(line)
|
|
|
+ data += cur_data
|
|
|
+
|
|
|
+ return data
|
|
|
+
|
|
|
+def factoryFromFile(message, file):
|
|
|
+ data = read_wire_data(file)
|
|
|
+ message.from_wire(data)
|
|
|
+ pass
|
|
|
+
|
|
|
+# we don't have direct comparison for rrsets right now (should we?
|
|
|
+# should go in the cpp version first then), so also no direct list
|
|
|
+# comparison. Created a helper function
|
|
|
+def compare_rrset_list(list1, list2):
|
|
|
+ if len(list1) != len(list2):
|
|
|
+ return False
|
|
|
+ for i in range(0, len(list1)):
|
|
|
+ if str(list1[i]) != str(list2[i]):
|
|
|
+ return False
|
|
|
+ return True
|
|
|
+
|
|
|
+# a complete message taken from cpp tests, for testing towire and totext
|
|
|
+def create_message():
|
|
|
+ message_render = Message(Message.RENDER)
|
|
|
+ message_render.set_qid(0x1035)
|
|
|
+ message_render.set_opcode(Opcode.QUERY())
|
|
|
+ message_render.set_rcode(Rcode.NOERROR())
|
|
|
+ message_render.set_header_flag(MessageFlag.QR())
|
|
|
+ message_render.set_header_flag(MessageFlag.RD())
|
|
|
+ message_render.set_header_flag(MessageFlag.AA())
|
|
|
+ message_render.add_question(Question(Name("test.example.com"), RRClass("IN"), RRType("A")))
|
|
|
+ rrset = RRset(Name("test.example.com"), RRClass("IN"),
|
|
|
+ RRType("A"), RRTTL(3600))
|
|
|
+ rrset.add_rdata(Rdata(RRType("A"), RRClass("IN"), "192.0.2.1"))
|
|
|
+ rrset.add_rdata(Rdata(RRType("A"), RRClass("IN"), "192.0.2.2"))
|
|
|
+ message_render.add_rrset(Section.ANSWER(), rrset)
|
|
|
+ return message_render
|
|
|
+
|
|
|
+
|
|
|
class MessageTest(unittest.TestCase):
|
|
|
|
|
|
def setUp(self):
|
|
@@ -222,7 +301,7 @@ class MessageTest(unittest.TestCase):
|
|
|
self.assertRaises(TypeError, self.p.get_header_flag, "wrong")
|
|
|
self.assertFalse(self.p.get_header_flag(MessageFlag.AA()))
|
|
|
|
|
|
- def test_set_header_flag(self):
|
|
|
+ def test_set_clear_header_flag(self):
|
|
|
self.assertRaises(TypeError, self.r.set_header_flag, "wrong")
|
|
|
self.assertRaises(TypeError, self.r.clear_header_flag, "wrong")
|
|
|
|
|
@@ -246,31 +325,93 @@ class MessageTest(unittest.TestCase):
|
|
|
self.r.set_dnssec_supported(False)
|
|
|
self.assertFalse(self.r.is_dnssec_supported())
|
|
|
|
|
|
+ self.assertRaises(InvalidMessageOperation,
|
|
|
+ self.p.set_dnssec_supported, True)
|
|
|
+ self.assertRaises(InvalidMessageOperation,
|
|
|
+ self.p.set_dnssec_supported, False)
|
|
|
+
|
|
|
def test_set_udp_size(self):
|
|
|
self.assertRaises(TypeError, self.r.set_udp_size, "wrong")
|
|
|
+ self.assertRaises(InvalidMessageUDPSize, self.r.set_udp_size, 0)
|
|
|
+ self.assertRaises(InvalidMessageUDPSize, self.r.set_udp_size, 65536)
|
|
|
+ self.assertRaises(InvalidMessageOperation, self.p.set_udp_size, 1024)
|
|
|
+ self.r.set_udp_size(2048)
|
|
|
+ self.assertEqual(2048, self.r.get_udp_size())
|
|
|
|
|
|
def test_set_qid(self):
|
|
|
self.assertRaises(TypeError, self.r.set_qid, "wrong")
|
|
|
self.assertRaises(InvalidMessageOperation,
|
|
|
self.p.set_qid, 123)
|
|
|
+ self.r.set_qid(1234)
|
|
|
+ self.assertEqual(1234, self.r.get_qid())
|
|
|
|
|
|
def test_set_rcode(self):
|
|
|
self.assertRaises(TypeError, self.r.set_rcode, "wrong")
|
|
|
|
|
|
+ rcode = Rcode.BADVERS()
|
|
|
+ self.r.set_rcode(rcode)
|
|
|
+ self.assertEqual(rcode, self.r.get_rcode())
|
|
|
+
|
|
|
+ self.assertRaises(InvalidMessageOperation,
|
|
|
+ self.p.set_rcode, rcode)
|
|
|
+
|
|
|
+
|
|
|
def test_set_opcode(self):
|
|
|
self.assertRaises(TypeError, self.r.set_opcode, "wrong")
|
|
|
|
|
|
+ opcode = Opcode.IQUERY()
|
|
|
+ self.r.set_opcode(opcode)
|
|
|
+ self.assertEqual(opcode, self.r.get_opcode())
|
|
|
+
|
|
|
+ self.assertRaises(InvalidMessageOperation,
|
|
|
+ self.p.set_opcode, opcode)
|
|
|
+
|
|
|
def test_get_section(self):
|
|
|
self.assertRaises(TypeError, self.r.get_section, "wrong")
|
|
|
|
|
|
+ rrset = RRset(Name("example.com"), RRClass("IN"), RRType("A"), RRTTL(3600))
|
|
|
+ rrset.add_rdata(Rdata(RRType("A"), RRClass("IN"), "192.0.2.1"))
|
|
|
+ rrset.add_rdata(Rdata(RRType("A"), RRClass("IN"), "192.0.2.2"))
|
|
|
+ section_rrset = [rrset]
|
|
|
+
|
|
|
+ self.assertRaises(InvalidMessageOperation, self.p.add_rrset,
|
|
|
+ Section.ANSWER(), rrset)
|
|
|
+
|
|
|
+ self.assertFalse(compare_rrset_list(section_rrset, self.r.get_section(Section.ANSWER())))
|
|
|
+ self.assertEqual(0, self.r.get_rr_count(Section.ANSWER()))
|
|
|
+ self.r.add_rrset(Section.ANSWER(), rrset)
|
|
|
+ self.assertTrue(compare_rrset_list(section_rrset, self.r.get_section(Section.ANSWER())))
|
|
|
+ self.assertEqual(2, self.r.get_rr_count(Section.ANSWER()))
|
|
|
+
|
|
|
+ self.assertFalse(compare_rrset_list(section_rrset, self.r.get_section(Section.AUTHORITY())))
|
|
|
+ self.assertEqual(0, self.r.get_rr_count(Section.AUTHORITY()))
|
|
|
+ self.r.add_rrset(Section.AUTHORITY(), rrset)
|
|
|
+ self.assertTrue(compare_rrset_list(section_rrset, self.r.get_section(Section.AUTHORITY())))
|
|
|
+ self.assertEqual(2, self.r.get_rr_count(Section.AUTHORITY()))
|
|
|
+
|
|
|
+ self.assertFalse(compare_rrset_list(section_rrset, self.r.get_section(Section.ADDITIONAL())))
|
|
|
+ self.assertEqual(0, self.r.get_rr_count(Section.ADDITIONAL()))
|
|
|
+ self.r.add_rrset(Section.ADDITIONAL(), rrset)
|
|
|
+ self.assertTrue(compare_rrset_list(section_rrset, self.r.get_section(Section.ADDITIONAL())))
|
|
|
+ self.assertEqual(2, self.r.get_rr_count(Section.ADDITIONAL()))
|
|
|
+
|
|
|
def test_get_rr_count(self):
|
|
|
self.assertRaises(TypeError, self.r.get_rr_count, "wrong")
|
|
|
+ # counts also tested in add_section
|
|
|
|
|
|
def test_add_question(self):
|
|
|
self.assertRaises(TypeError, self.r.add_question, "wrong", "wrong")
|
|
|
+ q = Question(Name("example.com"), RRClass("IN"), RRType("A"))
|
|
|
+ qs = [q]
|
|
|
+ self.assertFalse(compare_rrset_list(qs, self.r.get_question()))
|
|
|
+ self.assertEqual(0, self.r.get_rr_count(Section.QUESTION()))
|
|
|
+ self.r.add_question(q)
|
|
|
+ self.assertTrue(compare_rrset_list(qs, self.r.get_question()))
|
|
|
+ self.assertEqual(1, self.r.get_rr_count(Section.QUESTION()))
|
|
|
|
|
|
def test_add_rrset(self):
|
|
|
self.assertRaises(TypeError, self.r.add_rrset, "wrong")
|
|
|
+ # actual addition already tested in get_section
|
|
|
|
|
|
def test_clear(self):
|
|
|
self.assertEqual(None, self.r.clear(Message.PARSE))
|
|
@@ -283,6 +424,29 @@ class MessageTest(unittest.TestCase):
|
|
|
self.assertRaises(InvalidMessageOperation,
|
|
|
self.p.to_wire, MessageRenderer())
|
|
|
|
|
|
+ message_render = create_message()
|
|
|
+ renderer = MessageRenderer()
|
|
|
+ message_render.to_wire(renderer)
|
|
|
+ self.assertEqual(b'\x105\x85\x00\x00\x01\x00\x02\x00\x00\x00\x00\x04test\x07example\x03com\x00\x00\x01\x00\x01\xc0\x0c\x00\x01\x00\x01\x00\x00\x0e\x10\x00\x04\xc0\x00\x02\x01\xc0\x0c\x00\x01\x00\x01\x00\x00\x0e\x10\x00\x04\xc0\x00\x02\x02',
|
|
|
+ renderer.get_data())
|
|
|
+
|
|
|
+ def test_to_text(self):
|
|
|
+ message_render = create_message()
|
|
|
+
|
|
|
+ msg_str =\
|
|
|
+""";; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 4149
|
|
|
+;; flags: qr aa rd ; QUESTION: 1, ANSWER: 2, AUTHORITY: 0, ADDITIONAL: 0
|
|
|
+
|
|
|
+;; QUESTION SECTION:
|
|
|
+;test.example.com. IN A
|
|
|
+
|
|
|
+;; ANSWER SECTION:
|
|
|
+test.example.com. 3600 IN A 192.0.2.1
|
|
|
+test.example.com. 3600 IN A 192.0.2.2
|
|
|
+"""
|
|
|
+ self.assertEqual(msg_str, message_render.to_text())
|
|
|
+ self.assertEqual(msg_str, str(message_render))
|
|
|
+
|
|
|
def test_from_wire(self):
|
|
|
self.assertRaises(TypeError, self.r.from_wire, 1)
|
|
|
self.assertRaises(InvalidMessageOperation,
|
|
@@ -290,50 +454,6 @@ class MessageTest(unittest.TestCase):
|
|
|
self.assertRaises(MessageTooShort,
|
|
|
Message.from_wire, self.p, bytes())
|
|
|
|
|
|
-# helper functions for tests taken from c++ unittests
|
|
|
-if "TESTDATA_PATH" in os.environ:
|
|
|
- testdata_path = os.environ["TESTDATA_PATH"]
|
|
|
-else:
|
|
|
- testdata_path = "../tests/testdata"
|
|
|
-
|
|
|
-def read_wire_data(filename):
|
|
|
- data = bytes()
|
|
|
- file = open(testdata_path + os.sep + filename, "r")
|
|
|
- for line in file:
|
|
|
- line = line.strip()
|
|
|
- if line == "" or line.startswith("#"):
|
|
|
- pass
|
|
|
- else:
|
|
|
- cur_data = bytes.fromhex(line)
|
|
|
- data += cur_data
|
|
|
-
|
|
|
- return data
|
|
|
-
|
|
|
-def factoryFromFile(message, file):
|
|
|
- data = read_wire_data(file)
|
|
|
- message.from_wire(data)
|
|
|
- pass
|
|
|
-
|
|
|
-class ConvertedUnittests(unittest.TestCase):
|
|
|
-
|
|
|
- # tests below based on c++ unit tests
|
|
|
- def test_RcodeConstruct(self):
|
|
|
- # normal cases
|
|
|
- self.assertEqual(0, Rcode(0).get_code())
|
|
|
- self.assertEqual(0xfff, Rcode(0xfff).get_code()) # possible max code
|
|
|
-
|
|
|
- # should fail on attempt of construction with an out of range code
|
|
|
- self.assertRaises(OverflowError, Rcode, 0x1000)
|
|
|
- self.assertRaises(OverflowError, Rcode, 0xffff)
|
|
|
-
|
|
|
- def test_RcodeToText(self):
|
|
|
- self.assertEqual("NOERROR", Rcode.NOERROR().to_text())
|
|
|
- self.assertEqual("BADVERS", Rcode.BADVERS().to_text())
|
|
|
- self.assertEqual("17", Rcode(Rcode.BADVERS().get_code() + 1).to_text())
|
|
|
- self.assertEqual("4095", Rcode(0xfff).to_text())
|
|
|
-
|
|
|
-
|
|
|
- def test_fromWire(self):
|
|
|
test_name = Name("test.example.com");
|
|
|
|
|
|
message_parse = Message(0)
|
|
@@ -366,7 +486,7 @@ class ConvertedUnittests(unittest.TestCase):
|
|
|
self.assertEqual("192.0.2.1", rdata[0].to_text())
|
|
|
self.assertEqual("192.0.2.2", rdata[1].to_text())
|
|
|
self.assertEqual(2, len(rdata))
|
|
|
-
|
|
|
+
|
|
|
def test_GetEDNS0DOBit(self):
|
|
|
message_parse = Message(Message.PARSE)
|
|
|
## Without EDNS0, DNSSEC is considered to be unsupported.
|
|
@@ -491,43 +611,6 @@ class ConvertedUnittests(unittest.TestCase):
|
|
|
message_parse,
|
|
|
"message_fromWire9")
|
|
|
|
|
|
- def test_to_text_and_wire(self):
|
|
|
- message_render = Message(Message.RENDER)
|
|
|
- message_render.set_qid(0x1035)
|
|
|
- message_render.set_opcode(Opcode.QUERY())
|
|
|
- message_render.set_rcode(Rcode.NOERROR())
|
|
|
- message_render.set_header_flag(MessageFlag.QR())
|
|
|
- message_render.set_header_flag(MessageFlag.RD())
|
|
|
- message_render.set_header_flag(MessageFlag.AA())
|
|
|
- message_render.add_question(Question(Name("test.example.com"), RRClass("IN"), RRType("A")))
|
|
|
- rrset = RRset(Name("test.example.com"), RRClass("IN"),
|
|
|
- RRType("A"), RRTTL(3600))
|
|
|
- rrset.add_rdata(Rdata(RRType("A"), RRClass("IN"), "192.0.2.1"))
|
|
|
- rrset.add_rdata(Rdata(RRType("A"), RRClass("IN"), "192.0.2.2"))
|
|
|
- message_render.add_rrset(Section.ANSWER(), rrset)
|
|
|
-
|
|
|
- self.assertEqual(1, message_render.get_rr_count(Section.QUESTION()))
|
|
|
- self.assertEqual(2, message_render.get_rr_count(Section.ANSWER()))
|
|
|
- self.assertEqual(0, message_render.get_rr_count(Section.AUTHORITY()))
|
|
|
- self.assertEqual(0, message_render.get_rr_count(Section.ADDITIONAL()))
|
|
|
-
|
|
|
- renderer = MessageRenderer()
|
|
|
- message_render.to_wire(renderer)
|
|
|
- self.assertEqual(b'\x105\x85\x00\x00\x01\x00\x02\x00\x00\x00\x00\x04test\x07example\x03com\x00\x00\x01\x00\x01\xc0\x0c\x00\x01\x00\x01\x00\x00\x0e\x10\x00\x04\xc0\x00\x02\x01\xc0\x0c\x00\x01\x00\x01\x00\x00\x0e\x10\x00\x04\xc0\x00\x02\x02',
|
|
|
- renderer.get_data())
|
|
|
- msg_str =\
|
|
|
-""";; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 4149
|
|
|
-;; flags: qr aa rd ; QUESTION: 1, ANSWER: 2, AUTHORITY: 0, ADDITIONAL: 0
|
|
|
-
|
|
|
-;; QUESTION SECTION:
|
|
|
-;test.example.com. IN A
|
|
|
-
|
|
|
-;; ANSWER SECTION:
|
|
|
-test.example.com. 3600 IN A 192.0.2.1
|
|
|
-test.example.com. 3600 IN A 192.0.2.2
|
|
|
-"""
|
|
|
- self.assertEqual(msg_str, message_render.to_text())
|
|
|
- self.assertEqual(msg_str, message_render.__str__())
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
unittest.main()
|