|
@@ -21,6 +21,251 @@ import unittest
|
|
import os
|
|
import os
|
|
from libdns_python import *
|
|
from libdns_python import *
|
|
|
|
|
|
|
|
+
|
|
|
|
+class MessageFlagTest(unittest.TestCase):
|
|
|
|
+ def test_init(self):
|
|
|
|
+ self.assertRaises(NotImplementedError, MessageFlag)
|
|
|
|
+
|
|
|
|
+ def test_get_bit(self):
|
|
|
|
+ self.assertEqual(0x8000, MessageFlag.QR().get_bit())
|
|
|
|
+ self.assertEqual(0x0400, MessageFlag.AA().get_bit())
|
|
|
|
+ self.assertEqual(0x0200, MessageFlag.TC().get_bit())
|
|
|
|
+ self.assertEqual(0x0100, MessageFlag.RD().get_bit())
|
|
|
|
+ self.assertEqual(0x0080, MessageFlag.RA().get_bit())
|
|
|
|
+ self.assertEqual(0x0020, MessageFlag.AD().get_bit())
|
|
|
|
+ self.assertEqual(0x0010, MessageFlag.CD().get_bit())
|
|
|
|
+
|
|
|
|
+class OpcodeTest(unittest.TestCase):
|
|
|
|
+ def test_init(self):
|
|
|
|
+ self.assertRaises(NotImplementedError, Opcode)
|
|
|
|
+
|
|
|
|
+ def test_get_code(self):
|
|
|
|
+ self.assertEqual(0, Opcode.QUERY().get_code())
|
|
|
|
+ self.assertEqual(1, Opcode.IQUERY().get_code())
|
|
|
|
+ self.assertEqual(2, Opcode.STATUS().get_code())
|
|
|
|
+ self.assertEqual(3, Opcode.RESERVED3().get_code())
|
|
|
|
+ self.assertEqual(4, Opcode.NOTIFY().get_code())
|
|
|
|
+ self.assertEqual(5, Opcode.UPDATE().get_code())
|
|
|
|
+ self.assertEqual(6, Opcode.RESERVED6().get_code())
|
|
|
|
+ self.assertEqual(7, Opcode.RESERVED7().get_code())
|
|
|
|
+ self.assertEqual(8, Opcode.RESERVED8().get_code())
|
|
|
|
+ self.assertEqual(9, Opcode.RESERVED9().get_code())
|
|
|
|
+ self.assertEqual(10, Opcode.RESERVED10().get_code())
|
|
|
|
+ self.assertEqual(11, Opcode.RESERVED11().get_code())
|
|
|
|
+ self.assertEqual(12, Opcode.RESERVED12().get_code())
|
|
|
|
+ self.assertEqual(13, Opcode.RESERVED13().get_code())
|
|
|
|
+ self.assertEqual(14, Opcode.RESERVED14().get_code())
|
|
|
|
+ self.assertEqual(15, Opcode.RESERVED15().get_code())
|
|
|
|
+
|
|
|
|
+ def test_to_text(self):
|
|
|
|
+ self.assertEqual("QUERY", Opcode.QUERY().to_text())
|
|
|
|
+ self.assertEqual("QUERY", Opcode.QUERY().__str__())
|
|
|
|
+ self.assertEqual("IQUERY", Opcode.IQUERY().to_text())
|
|
|
|
+ self.assertEqual("STATUS", Opcode.STATUS().to_text())
|
|
|
|
+ self.assertEqual("RESERVED3", Opcode.RESERVED3().to_text())
|
|
|
|
+ self.assertEqual("NOTIFY", Opcode.NOTIFY().to_text())
|
|
|
|
+ self.assertEqual("UPDATE", Opcode.UPDATE().to_text())
|
|
|
|
+ self.assertEqual("RESERVED6", Opcode.RESERVED6().to_text())
|
|
|
|
+ self.assertEqual("RESERVED7", Opcode.RESERVED7().to_text())
|
|
|
|
+ self.assertEqual("RESERVED8", Opcode.RESERVED8().to_text())
|
|
|
|
+ self.assertEqual("RESERVED9", Opcode.RESERVED9().to_text())
|
|
|
|
+ self.assertEqual("RESERVED10", Opcode.RESERVED10().to_text())
|
|
|
|
+ self.assertEqual("RESERVED11", Opcode.RESERVED11().to_text())
|
|
|
|
+ self.assertEqual("RESERVED12", Opcode.RESERVED12().to_text())
|
|
|
|
+ self.assertEqual("RESERVED13", Opcode.RESERVED13().to_text())
|
|
|
|
+ self.assertEqual("RESERVED14", Opcode.RESERVED14().to_text())
|
|
|
|
+ self.assertEqual("RESERVED15", Opcode.RESERVED15().to_text())
|
|
|
|
+
|
|
|
|
+ def test_richcmp(self):
|
|
|
|
+ o1 = Opcode.QUERY()
|
|
|
|
+ o2 = Opcode.NOTIFY()
|
|
|
|
+ o3 = Opcode.NOTIFY()
|
|
|
|
+ self.assertTrue(o2 == o3)
|
|
|
|
+ self.assertTrue(o1 != o2)
|
|
|
|
+ # can't use assertRaises here...
|
|
|
|
+ try:
|
|
|
|
+ o1 < o2
|
|
|
|
+ except Exception as err:
|
|
|
|
+ self.assertEqual(TypeError, type(err))
|
|
|
|
+ try:
|
|
|
|
+ o1 <= o2
|
|
|
|
+ except Exception as err:
|
|
|
|
+ self.assertEqual(TypeError, type(err))
|
|
|
|
+ try:
|
|
|
|
+ o1 > o2
|
|
|
|
+ except Exception as err:
|
|
|
|
+ self.assertEqual(TypeError, type(err))
|
|
|
|
+ try:
|
|
|
|
+ o1 >= o2
|
|
|
|
+ except Exception as err:
|
|
|
|
+ self.assertEqual(TypeError, type(err))
|
|
|
|
+
|
|
|
|
+class RcodeTest(unittest.TestCase):
|
|
|
|
+ def test_init(self):
|
|
|
|
+ self.assertRaises(TypeError, Rcode, "wrong")
|
|
|
|
+ self.assertRaises(OverflowError, Rcode, 65536)
|
|
|
|
+
|
|
|
|
+ def test_get_code(self):
|
|
|
|
+ self.assertEqual(0, Rcode.NOERROR().get_code())
|
|
|
|
+ self.assertEqual(1, Rcode.FORMERR().get_code())
|
|
|
|
+ self.assertEqual(2, Rcode.SERVFAIL().get_code())
|
|
|
|
+ self.assertEqual(3, Rcode.NXDOMAIN().get_code())
|
|
|
|
+ self.assertEqual(4, Rcode.NOTIMP().get_code())
|
|
|
|
+ self.assertEqual(5, Rcode.REFUSED().get_code())
|
|
|
|
+ self.assertEqual(6, Rcode.YXDOMAIN().get_code())
|
|
|
|
+ self.assertEqual(7, Rcode.YXRRSET().get_code())
|
|
|
|
+ self.assertEqual(8, Rcode.NXRRSET().get_code())
|
|
|
|
+ self.assertEqual(9, Rcode.NOTAUTH().get_code())
|
|
|
|
+ self.assertEqual(10, Rcode.NOTZONE().get_code())
|
|
|
|
+ self.assertEqual(11, Rcode.RESERVED11().get_code())
|
|
|
|
+ self.assertEqual(12, Rcode.RESERVED12().get_code())
|
|
|
|
+ self.assertEqual(13, Rcode.RESERVED13().get_code())
|
|
|
|
+ self.assertEqual(14, Rcode.RESERVED14().get_code())
|
|
|
|
+ self.assertEqual(15, Rcode.RESERVED15().get_code())
|
|
|
|
+
|
|
|
|
+ def test_to_text(self):
|
|
|
|
+ self.assertEqual("NOERROR", Rcode(0).to_text())
|
|
|
|
+ self.assertEqual("NOERROR", Rcode(0).__str__())
|
|
|
|
+ self.assertEqual("FORMERR", Rcode(1).to_text())
|
|
|
|
+ self.assertEqual("SERVFAIL", Rcode(2).to_text())
|
|
|
|
+ self.assertEqual("NXDOMAIN", Rcode(3).to_text())
|
|
|
|
+ self.assertEqual("NOTIMP", Rcode(4).to_text())
|
|
|
|
+ self.assertEqual("REFUSED", Rcode(5).to_text())
|
|
|
|
+ self.assertEqual("YXDOMAIN", Rcode(6).to_text())
|
|
|
|
+ self.assertEqual("YXRRSET", Rcode(7).to_text())
|
|
|
|
+ self.assertEqual("NXRRSET", Rcode(8).to_text())
|
|
|
|
+ self.assertEqual("NOTAUTH", Rcode(9).to_text())
|
|
|
|
+ self.assertEqual("NOTZONE", Rcode(10).to_text())
|
|
|
|
+ self.assertEqual("RESERVED11", Rcode(11).to_text())
|
|
|
|
+ self.assertEqual("RESERVED12", Rcode(12).to_text())
|
|
|
|
+ self.assertEqual("RESERVED13", Rcode(13).to_text())
|
|
|
|
+ self.assertEqual("RESERVED14", Rcode(14).to_text())
|
|
|
|
+ self.assertEqual("RESERVED15", Rcode(15).to_text())
|
|
|
|
+
|
|
|
|
+ def test_richcmp(self):
|
|
|
|
+ r1 = Rcode.NOERROR()
|
|
|
|
+ r2 = Rcode.FORMERR()
|
|
|
|
+ r3 = Rcode.FORMERR()
|
|
|
|
+ self.assertTrue(r2 == r3)
|
|
|
|
+ self.assertTrue(r1 != r2)
|
|
|
|
+ # can't use assertRaises here...
|
|
|
|
+ try:
|
|
|
|
+ r1 < r2
|
|
|
|
+ except Exception as err:
|
|
|
|
+ self.assertEqual(TypeError, type(err))
|
|
|
|
+ try:
|
|
|
|
+ r1 <= r2
|
|
|
|
+ except Exception as err:
|
|
|
|
+ self.assertEqual(TypeError, type(err))
|
|
|
|
+ try:
|
|
|
|
+ r1 > r2
|
|
|
|
+ except Exception as err:
|
|
|
|
+ self.assertEqual(TypeError, type(err))
|
|
|
|
+ try:
|
|
|
|
+ r1 >= r2
|
|
|
|
+ except Exception as err:
|
|
|
|
+ self.assertEqual(TypeError, type(err))
|
|
|
|
+
|
|
|
|
+class SectionTest(unittest.TestCase):
|
|
|
|
+
|
|
|
|
+ def test_init(self):
|
|
|
|
+ self.assertRaises(NotImplementedError, Section)
|
|
|
|
+
|
|
|
|
+ def test_get_code(self):
|
|
|
|
+ self.assertEqual(0, Section.QUESTION().get_code())
|
|
|
|
+ self.assertEqual(1, Section.ANSWER().get_code())
|
|
|
|
+ self.assertEqual(2, Section.AUTHORITY().get_code())
|
|
|
|
+ self.assertEqual(3, Section.ADDITIONAL().get_code())
|
|
|
|
+
|
|
|
|
+ def test_richcmp(self):
|
|
|
|
+ s1 = Section.QUESTION()
|
|
|
|
+ s2 = Section.ANSWER()
|
|
|
|
+ s3 = Section.ANSWER()
|
|
|
|
+ self.assertTrue(s2 == s3)
|
|
|
|
+ self.assertTrue(s1 != s2)
|
|
|
|
+ # can't use assertRaises here...
|
|
|
|
+ try:
|
|
|
|
+ s1 < s2
|
|
|
|
+ except Exception as err:
|
|
|
|
+ self.assertEqual(TypeError, type(err))
|
|
|
|
+ try:
|
|
|
|
+ s1 <= s2
|
|
|
|
+ except Exception as err:
|
|
|
|
+ self.assertEqual(TypeError, type(err))
|
|
|
|
+ try:
|
|
|
|
+ s1 > s2
|
|
|
|
+ except Exception as err:
|
|
|
|
+ self.assertEqual(TypeError, type(err))
|
|
|
|
+ try:
|
|
|
|
+ s1 >= s2
|
|
|
|
+ except Exception as err:
|
|
|
|
+ self.assertEqual(TypeError, type(err))
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class MessageTest(unittest.TestCase):
|
|
|
|
+
|
|
|
|
+ def setUp(self):
|
|
|
|
+ self.p = Message(PARSE)
|
|
|
|
+ self.r = Message(RENDER)
|
|
|
|
+
|
|
|
|
+ def test_init(self):
|
|
|
|
+ self.assertRaises(TypeError, Message, 3)
|
|
|
|
+ self.assertRaises(TypeError, Message, "wrong")
|
|
|
|
+
|
|
|
|
+ def test_get_header_flag(self):
|
|
|
|
+ self.assertRaises(TypeError, self.p.get_header_flag, "wrong")
|
|
|
|
+ self.assertFalse(self.p.get_header_flag(MessageFlag.AA()))
|
|
|
|
+
|
|
|
|
+ def test_set_header_flag(self):
|
|
|
|
+ self.assertRaises(TypeError, self.r.set_header_flag, "wrong")
|
|
|
|
+ self.assertRaises(TypeError, self.r.clear_header_flag, "wrong")
|
|
|
|
+
|
|
|
|
+ self.assertFalse(self.r.get_header_flag(MessageFlag.AA()))
|
|
|
|
+ self.r.set_header_flag(MessageFlag.AA())
|
|
|
|
+ self.assertTrue(self.r.get_header_flag(MessageFlag.AA()))
|
|
|
|
+ self.r.clear_header_flag(MessageFlag.AA())
|
|
|
|
+ self.assertFalse(self.r.get_header_flag(MessageFlag.AA()))
|
|
|
|
+
|
|
|
|
+ def test_set_DNSSEC_supported(self):
|
|
|
|
+ self.assertRaises(TypeError, self.r.set_dnssec_supported, "wrong")
|
|
|
|
+
|
|
|
|
+ self.assertFalse(self.r.is_dnssec_supported())
|
|
|
|
+ self.r.set_dnssec_supported(True)
|
|
|
|
+ self.assertTrue(self.r.is_dnssec_supported())
|
|
|
|
+ self.r.set_dnssec_supported(False)
|
|
|
|
+ self.assertFalse(self.r.is_dnssec_supported())
|
|
|
|
+
|
|
|
|
+ def test_set_udp_size(self):
|
|
|
|
+ self.assertRaises(TypeError, self.r.set_udp_size, "wrong")
|
|
|
|
+
|
|
|
|
+ def test_set_qid(self):
|
|
|
|
+ self.assertRaises(TypeError, self.r.set_qid, "wrong")
|
|
|
|
+
|
|
|
|
+ def test_set_rcode(self):
|
|
|
|
+ self.assertRaises(TypeError, self.r.set_rcode, "wrong")
|
|
|
|
+
|
|
|
|
+ def test_set_opcode(self):
|
|
|
|
+ self.assertRaises(TypeError, self.r.set_opcode, "wrong")
|
|
|
|
+
|
|
|
|
+ def test_get_section(self):
|
|
|
|
+ self.assertRaises(TypeError, self.r.get_section, "wrong")
|
|
|
|
+
|
|
|
|
+ def test_add_rrset(self):
|
|
|
|
+ self.assertRaises(TypeError, self.r.add_rrset, "wrong")
|
|
|
|
+
|
|
|
|
+ def test_clear(self):
|
|
|
|
+ self.assertEqual(None, self.r.clear(PARSE))
|
|
|
|
+ self.assertEqual(None, self.r.clear(RENDER))
|
|
|
|
+ self.assertRaises(TypeError, self.r.clear, "wrong")
|
|
|
|
+ self.assertRaises(TypeError, self.r.clear, 3)
|
|
|
|
+
|
|
|
|
+ def test_to_wire(self):
|
|
|
|
+ self.assertRaises(TypeError, self.r.to_wire, 1)
|
|
|
|
+
|
|
|
|
+ def test_from_wire(self):
|
|
|
|
+ self.assertRaises(TypeError, self.r.from_wire, 1)
|
|
|
|
+
|
|
|
|
+# helper functions for tests taken from c++ unittests
|
|
if "TESTDATA_PATH" in os.environ:
|
|
if "TESTDATA_PATH" in os.environ:
|
|
testdata_path = os.environ["TESTDATA_PATH"]
|
|
testdata_path = os.environ["TESTDATA_PATH"]
|
|
else:
|
|
else:
|
|
@@ -44,8 +289,9 @@ def factoryFromFile(message, file):
|
|
message.from_wire(data)
|
|
message.from_wire(data)
|
|
pass
|
|
pass
|
|
|
|
|
|
-class MessageTest(unittest.TestCase):
|
|
|
|
|
|
+class ConvertedUnittests(unittest.TestCase):
|
|
|
|
|
|
|
|
+ # tests below based on c++ unit tests
|
|
def test_RcodeConstruct(self):
|
|
def test_RcodeConstruct(self):
|
|
# normal cases
|
|
# normal cases
|
|
self.assertEqual(0, Rcode(0).get_code())
|
|
self.assertEqual(0, Rcode(0).get_code())
|
|
@@ -220,7 +466,7 @@ class MessageTest(unittest.TestCase):
|
|
message_parse,
|
|
message_parse,
|
|
"message_fromWire9")
|
|
"message_fromWire9")
|
|
|
|
|
|
- def test_toWire(self):
|
|
|
|
|
|
+ def test_to_text_and_wire(self):
|
|
message_render = Message(RENDER)
|
|
message_render = Message(RENDER)
|
|
message_render.set_qid(0x1035)
|
|
message_render.set_qid(0x1035)
|
|
message_render.set_opcode(Opcode.QUERY())
|
|
message_render.set_opcode(Opcode.QUERY())
|
|
@@ -228,25 +474,35 @@ class MessageTest(unittest.TestCase):
|
|
message_render.set_header_flag(MessageFlag.QR())
|
|
message_render.set_header_flag(MessageFlag.QR())
|
|
message_render.set_header_flag(MessageFlag.RD())
|
|
message_render.set_header_flag(MessageFlag.RD())
|
|
message_render.set_header_flag(MessageFlag.AA())
|
|
message_render.set_header_flag(MessageFlag.AA())
|
|
- #message_render.addQuestion(Question(Name("test.example.com"), RRClass.IN(),
|
|
|
|
- #RRType.A()))
|
|
|
|
|
|
+ message_render.add_question(Question(Name("test.example.com"), RRClass("IN"), RRType("A")))
|
|
rrset = RRset(Name("test.example.com"), RRClass("IN"),
|
|
rrset = RRset(Name("test.example.com"), RRClass("IN"),
|
|
RRType("A"), RRTTL(3600))
|
|
RRType("A"), RRTTL(3600))
|
|
- #rrset.add_rdata(in.A("192.0.2.1"))
|
|
|
|
- #rrset.addRdata(in.A("192.0.2.2"))
|
|
|
|
- #message_render.addRRset(Section.ANSWER(), rrset)
|
|
|
|
|
|
+ rrset.add_rdata(Rdata(RRType("A"), RRClass("IN"), "192.0.2.1"))
|
|
|
|
+ rrset.add_rdata(Rdata(RRType("A"), RRClass("IN"), "192.0.2.2"))
|
|
|
|
+ message_render.add_rrset(Section.ANSWER(), rrset)
|
|
|
|
|
|
- #self.assertEqual(1, message_render.get_rr_count(Section.QUESTION()))
|
|
|
|
- #self.assertEqual(2, message_render.get_rr_count(Section.ANSWER()))
|
|
|
|
|
|
+ self.assertEqual(1, message_render.get_rr_count(Section.QUESTION()))
|
|
|
|
+ self.assertEqual(2, message_render.get_rr_count(Section.ANSWER()))
|
|
self.assertEqual(0, message_render.get_rr_count(Section.AUTHORITY()))
|
|
self.assertEqual(0, message_render.get_rr_count(Section.AUTHORITY()))
|
|
self.assertEqual(0, message_render.get_rr_count(Section.ADDITIONAL()))
|
|
self.assertEqual(0, message_render.get_rr_count(Section.ADDITIONAL()))
|
|
|
|
|
|
renderer = MessageRenderer()
|
|
renderer = MessageRenderer()
|
|
message_render.to_wire(renderer)
|
|
message_render.to_wire(renderer)
|
|
- #vector<unsigned char> data;
|
|
|
|
- #UnitTestUtil.readWireData("testdata/message_toWire1", data)
|
|
|
|
- #EXPECT_PRED_FORMAT4(UnitTestUtil.matchWireData, obuffer.getData(),
|
|
|
|
- #obuffer.getLength(), &data[0], data.size())
|
|
|
|
|
|
+ self.assertEqual(b'\x105\x85\x00\x00\x01\x00\x02\x00\x00\x00\x00\x04test\x07example\x03com\x00\x00\x01\x00\x01\xc0\x0c\x00\x01\x00\x01\x00\x00\x0e\x10\x00\x04\xc0\x00\x02\x01\xc0\x0c\x00\x01\x00\x01\x00\x00\x0e\x10\x00\x04\xc0\x00\x02\x02',
|
|
|
|
+ renderer.get_data())
|
|
|
|
+ msg_str =\
|
|
|
|
+""";; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 4149
|
|
|
|
+;; flags: qr aa rd ; QUESTION: 1, ANSWER: 2, AUTHORITY: 0, ADDITIONAL: 0
|
|
|
|
+
|
|
|
|
+;; QUESTION SECTION:
|
|
|
|
+;test.example.com. IN A
|
|
|
|
+
|
|
|
|
+;; ANSWER SECTION:
|
|
|
|
+test.example.com. 3600 IN A 192.0.2.1
|
|
|
|
+test.example.com. 3600 IN A 192.0.2.2
|
|
|
|
+"""
|
|
|
|
+ self.assertEqual(msg_str, message_render.to_text())
|
|
|
|
+ self.assertEqual(msg_str, message_render.__str__())
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if __name__ == '__main__':
|
|
unittest.main()
|
|
unittest.main()
|