|
@@ -132,7 +132,12 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
[{"action": "ACCEPT"}]),
|
|
|
{})
|
|
|
self.mdata = self.create_request_data(False)
|
|
|
- self.soa_record = (4, 3, 'example.com.', 'com.example.', 3600, 'SOA', None, 'master.example.com. admin.example.com. 1234 3600 1800 2419200 7200')
|
|
|
+ self.soa_rrset = RRset(Name('example.com'), RRClass.IN(), RRType.SOA(),
|
|
|
+ RRTTL(3600))
|
|
|
+ self.soa_rrset.add_rdata(Rdata(RRType.SOA(), RRClass.IN(),
|
|
|
+ 'master.Example.com. ' +
|
|
|
+ 'admin.exAmple.com. ' +
|
|
|
+ '1234 3600 1800 2419200 7200'))
|
|
|
|
|
|
def test_parse_query_message(self):
|
|
|
[get_rcode, get_msg] = self.xfrsess._parse_query_message(self.mdata)
|
|
@@ -313,10 +318,13 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
def test_send_message(self):
|
|
|
msg = self.getmsg()
|
|
|
msg.make_response()
|
|
|
- # soa record data with different cases
|
|
|
- soa_record = (4, 3, 'Example.com.', 'com.Example.', 3600, 'SOA', None, 'master.Example.com. admin.exAmple.com. 1234 3600 1800 2419200 7200')
|
|
|
- rrset_soa = self.xfrsess._create_rrset_from_db_record(soa_record)
|
|
|
- msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
|
|
|
+ # SOA record data with different cases
|
|
|
+ soa_rrset = RRset(Name('Example.com.'), RRClass.IN(), RRType.SOA(),
|
|
|
+ RRTTL(3600))
|
|
|
+ soa_rrset.add_rdata(Rdata(RRType.SOA(), RRClass.IN(),
|
|
|
+ 'master.Example.com. admin.exAmple.com. ' +
|
|
|
+ '1234 3600 1800 2419200 7200'))
|
|
|
+ msg.add_rrset(Message.SECTION_ANSWER, soa_rrset)
|
|
|
self.xfrsess._send_message(self.sock, msg)
|
|
|
send_out_data = self.sock.readsent()[2:]
|
|
|
|
|
@@ -345,23 +353,15 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
self.assertEqual(msg.get_rcode(), rcode)
|
|
|
self.assertTrue(msg.get_header_flag(Message.HEADERFLAG_AA))
|
|
|
|
|
|
- def test_create_rrset_from_db_record(self):
|
|
|
- rrset = self.xfrsess._create_rrset_from_db_record(self.soa_record)
|
|
|
- self.assertEqual(rrset.get_name().to_text(), "example.com.")
|
|
|
- self.assertEqual(rrset.get_class(), RRClass("IN"))
|
|
|
- self.assertEqual(rrset.get_type().to_text(), "SOA")
|
|
|
- rdata = rrset.get_rdata()
|
|
|
- self.assertEqual(rdata[0].to_text(), self.soa_record[7])
|
|
|
-
|
|
|
def test_send_message_with_last_soa(self):
|
|
|
- rrset_soa = self.xfrsess._create_rrset_from_db_record(self.soa_record)
|
|
|
msg = self.getmsg()
|
|
|
msg.make_response()
|
|
|
|
|
|
# packet number less than TSIG_SIGN_EVERY_NTH
|
|
|
packet_neet_not_sign = xfrout.TSIG_SIGN_EVERY_NTH - 1
|
|
|
- self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa,
|
|
|
- 0, packet_neet_not_sign)
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock,
|
|
|
+ self.soa_rrset, 0,
|
|
|
+ packet_neet_not_sign)
|
|
|
get_msg = self.sock.read_msg()
|
|
|
# tsig context is not exist
|
|
|
self.assertFalse(self.message_has_tsig(get_msg))
|
|
@@ -376,12 +376,13 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
self.assertEqual(answer.get_class(), RRClass("IN"))
|
|
|
self.assertEqual(answer.get_type().to_text(), "SOA")
|
|
|
rdata = answer.get_rdata()
|
|
|
- self.assertEqual(rdata[0].to_text(), self.soa_record[7])
|
|
|
+ self.assertEqual(rdata[0], self.soa_rrset.get_rdata()[0])
|
|
|
|
|
|
# msg is the TSIG_SIGN_EVERY_NTH one
|
|
|
# sending the message with last soa together
|
|
|
- self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa,
|
|
|
- 0, TSIG_SIGN_EVERY_NTH)
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock,
|
|
|
+ self.soa_rrset, 0,
|
|
|
+ TSIG_SIGN_EVERY_NTH)
|
|
|
get_msg = self.sock.read_msg()
|
|
|
# tsig context is not exist
|
|
|
self.assertFalse(self.message_has_tsig(get_msg))
|
|
@@ -390,7 +391,6 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
# create tsig context
|
|
|
self.xfrsess._tsig_ctx = self.create_mock_tsig_ctx(TSIGError.NOERROR)
|
|
|
|
|
|
- rrset_soa = self.xfrsess._create_rrset_from_db_record(self.soa_record)
|
|
|
msg = self.getmsg()
|
|
|
msg.make_response()
|
|
|
|
|
@@ -398,8 +398,9 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
packet_neet_not_sign = xfrout.TSIG_SIGN_EVERY_NTH - 1
|
|
|
# msg is not the TSIG_SIGN_EVERY_NTH one
|
|
|
# sending the message with last soa together
|
|
|
- self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa,
|
|
|
- 0, packet_neet_not_sign)
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock,
|
|
|
+ self.soa_rrset, 0,
|
|
|
+ packet_neet_not_sign)
|
|
|
get_msg = self.sock.read_msg()
|
|
|
self.assertTrue(self.message_has_tsig(get_msg))
|
|
|
|
|
@@ -409,22 +410,23 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
|
|
|
# msg is the TSIG_SIGN_EVERY_NTH one
|
|
|
# sending the message with last soa together
|
|
|
- self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa,
|
|
|
- 0, TSIG_SIGN_EVERY_NTH)
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock,
|
|
|
+ self.soa_rrset, 0,
|
|
|
+ TSIG_SIGN_EVERY_NTH)
|
|
|
get_msg = self.sock.read_msg()
|
|
|
self.assertTrue(self.message_has_tsig(get_msg))
|
|
|
|
|
|
def test_trigger_send_message_with_last_soa(self):
|
|
|
rrset_a = RRset(Name("example.com"), RRClass.IN(), RRType.A(), RRTTL(3600))
|
|
|
rrset_a.add_rdata(Rdata(RRType.A(), RRClass.IN(), "192.0.2.1"))
|
|
|
- rrset_soa = self.xfrsess._create_rrset_from_db_record(self.soa_record)
|
|
|
|
|
|
msg = self.getmsg()
|
|
|
msg.make_response()
|
|
|
msg.add_rrset(Message.SECTION_ANSWER, rrset_a)
|
|
|
|
|
|
# length larger than MAX-len(rrset)
|
|
|
- length_need_split = xfrout.XFROUT_MAX_MESSAGE_SIZE - get_rrset_len(rrset_soa) + 1
|
|
|
+ length_need_split = xfrout.XFROUT_MAX_MESSAGE_SIZE - \
|
|
|
+ get_rrset_len(self.soa_rrset) + 1
|
|
|
# packet number less than TSIG_SIGN_EVERY_NTH
|
|
|
packet_neet_not_sign = xfrout.TSIG_SIGN_EVERY_NTH - 1
|
|
|
|
|
@@ -432,7 +434,9 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
# this should have triggered the sending of two messages
|
|
|
# (1 with the rrset we added manually, and 1 that triggered
|
|
|
# the sending in _with_last_soa)
|
|
|
- self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa, length_need_split,
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock,
|
|
|
+ self.soa_rrset,
|
|
|
+ length_need_split,
|
|
|
packet_neet_not_sign)
|
|
|
get_msg = self.sock.read_msg()
|
|
|
self.assertFalse(self.message_has_tsig(get_msg))
|
|
@@ -459,20 +463,20 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
self.assertEqual(answer.get_class(), RRClass("IN"))
|
|
|
self.assertEqual(answer.get_type().to_text(), "SOA")
|
|
|
rdata = answer.get_rdata()
|
|
|
- self.assertEqual(rdata[0].to_text(), self.soa_record[7])
|
|
|
+ self.assertEqual(rdata[0], self.soa_rrset.get_rdata()[0])
|
|
|
|
|
|
# and it should not have sent anything else
|
|
|
self.assertEqual(0, len(self.sock.sendqueue))
|
|
|
|
|
|
def test_trigger_send_message_with_last_soa_with_tsig(self):
|
|
|
self.xfrsess._tsig_ctx = self.create_mock_tsig_ctx(TSIGError.NOERROR)
|
|
|
- rrset_soa = self.xfrsess._create_rrset_from_db_record(self.soa_record)
|
|
|
msg = self.getmsg()
|
|
|
msg.make_response()
|
|
|
- msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
|
|
|
+ msg.add_rrset(Message.SECTION_ANSWER, self.soa_rrset)
|
|
|
|
|
|
# length larger than MAX-len(rrset)
|
|
|
- length_need_split = xfrout.XFROUT_MAX_MESSAGE_SIZE - get_rrset_len(rrset_soa) + 1
|
|
|
+ length_need_split = xfrout.XFROUT_MAX_MESSAGE_SIZE - \
|
|
|
+ get_rrset_len(self.soa_rrset) + 1
|
|
|
# packet number less than TSIG_SIGN_EVERY_NTH
|
|
|
packet_neet_not_sign = xfrout.TSIG_SIGN_EVERY_NTH - 1
|
|
|
|
|
@@ -480,7 +484,9 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
# this should have triggered the sending of two messages
|
|
|
# (1 with the rrset we added manually, and 1 that triggered
|
|
|
# the sending in _with_last_soa)
|
|
|
- self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa, length_need_split,
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock,
|
|
|
+ self.soa_rrset,
|
|
|
+ length_need_split,
|
|
|
packet_neet_not_sign)
|
|
|
get_msg = self.sock.read_msg()
|
|
|
# msg is not the TSIG_SIGN_EVERY_NTH one, it shouldn't be tsig signed
|
|
@@ -493,7 +499,9 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
|
|
|
|
|
|
# msg is the TSIG_SIGN_EVERY_NTH one, it should be tsig signed
|
|
|
- self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa, length_need_split,
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock,
|
|
|
+ self.soa_rrset,
|
|
|
+ length_need_split,
|
|
|
xfrout.TSIG_SIGN_EVERY_NTH)
|
|
|
get_msg = self.sock.read_msg()
|
|
|
self.assertTrue(self.message_has_tsig(get_msg))
|
|
@@ -504,8 +512,7 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
self.assertEqual(0, len(self.sock.sendqueue))
|
|
|
|
|
|
def test_get_rrset_len(self):
|
|
|
- rrset_soa = self.xfrsess._create_rrset_from_db_record(self.soa_record)
|
|
|
- self.assertEqual(82, get_rrset_len(rrset_soa))
|
|
|
+ self.assertEqual(82, get_rrset_len(self.soa_rrset))
|
|
|
|
|
|
def test_check_xfrout_available(self):
|
|
|
class MockDataSrcClient:
|
|
@@ -569,14 +576,13 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
|
|
|
def test_reply_xfrout_query_noerror(self):
|
|
|
global sqlite3_ds
|
|
|
- def get_zone_soa(zonename, file):
|
|
|
- return self.soa_record
|
|
|
-
|
|
|
def get_zone_datas(zone, file):
|
|
|
- return [self.soa_record]
|
|
|
+ return [(4, 3, 'example.com.', 'com.example.', 3600, 'SOA', None,
|
|
|
+ 'master.example.com. admin.example.com. ' +
|
|
|
+ '1234 3600 1800 2419200 7200')]
|
|
|
|
|
|
- sqlite3_ds.get_zone_soa = get_zone_soa
|
|
|
sqlite3_ds.get_zone_datas = get_zone_datas
|
|
|
+ self.xfrsess._soa = self.soa_rrset
|
|
|
self.xfrsess._reply_xfrout_query(self.getmsg(), self.sock, "example.com.")
|
|
|
reply_msg = self.sock.read_msg()
|
|
|
self.assertEqual(reply_msg.get_rr_count(Message.SECTION_ANSWER), 2)
|
|
@@ -585,8 +591,6 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
rrset_data = (4, 3, 'a.example.com.', 'com.example.', 3600, 'A', None, '192.168.1.1')
|
|
|
global sqlite3_ds
|
|
|
global xfrout
|
|
|
- def get_zone_soa(zonename, file):
|
|
|
- return self.soa_record
|
|
|
|
|
|
def get_zone_datas(zone, file):
|
|
|
zone_rrsets = []
|
|
@@ -597,8 +601,8 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
def get_rrset_len(rrset):
|
|
|
return 65520
|
|
|
|
|
|
- sqlite3_ds.get_zone_soa = get_zone_soa
|
|
|
sqlite3_ds.get_zone_datas = get_zone_datas
|
|
|
+ self.xfrsess._soa = self.soa_rrset
|
|
|
xfrout.get_rrset_len = get_rrset_len
|
|
|
|
|
|
self.xfrsess._tsig_ctx = self.create_mock_tsig_ctx(TSIGError.NOERROR)
|