|
@@ -109,7 +109,7 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
msg.add_question(query_question)
|
|
|
|
|
|
renderer = MessageRenderer()
|
|
|
- tsig_ctx = MockTSIGContext(TSIG_KEY)#self.create_mock_tsig_ctx(TSIGError.NOERROR)MockTSIGContext(TSIG_KEY)
|
|
|
+ tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
msg.to_wire(renderer, tsig_ctx)
|
|
|
reply_data = renderer.get_data()
|
|
|
return reply_data
|
|
@@ -219,11 +219,16 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
|
|
|
def test_send_message_with_last_soa(self):
|
|
|
rrset_soa = self.xfrsess._create_rrset_from_db_record(self.soa_record)
|
|
|
-
|
|
|
msg = self.getmsg()
|
|
|
msg.make_response()
|
|
|
- self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa, 0)
|
|
|
+
|
|
|
+ # packet number less than TSIG_SIGN_EVERY_NTH
|
|
|
+ packet_neet_not_sign = xfrout.TSIG_SIGN_EVERY_NTH - 1
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa,
|
|
|
+ 0, packet_neet_not_sign)
|
|
|
get_msg = self.sock.read_msg()
|
|
|
+ # tsig context is not exist
|
|
|
+ self.assertFalse(self.message_has_tsig(get_msg))
|
|
|
|
|
|
self.assertEqual(get_msg.get_rr_count(Message.SECTION_QUESTION), 1)
|
|
|
self.assertEqual(get_msg.get_rr_count(Message.SECTION_ANSWER), 1)
|
|
@@ -237,12 +242,39 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
rdata = answer.get_rdata()
|
|
|
self.assertEqual(rdata[0].to_text(), self.soa_record[7])
|
|
|
|
|
|
+ # msg is the TSIG_SIGN_EVERY_NTH one
|
|
|
+ # sending the message with last soa together
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa,
|
|
|
+ 0, TSIG_SIGN_EVERY_NTH)
|
|
|
+ get_msg = self.sock.read_msg()
|
|
|
+ # tsig context is not exist
|
|
|
+ self.assertFalse(self.message_has_tsig(get_msg))
|
|
|
+
|
|
|
def test_send_message_with_last_soa_with_tsig(self):
|
|
|
+ # create tsig context
|
|
|
self.xfrsess._tsig_ctx = self.create_mock_tsig_ctx(TSIGError.NOERROR)
|
|
|
+
|
|
|
rrset_soa = self.xfrsess._create_rrset_from_db_record(self.soa_record)
|
|
|
msg = self.getmsg()
|
|
|
msg.make_response()
|
|
|
- self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa, 0)
|
|
|
+
|
|
|
+ # packet number less than TSIG_SIGN_EVERY_NTH
|
|
|
+ packet_neet_not_sign = xfrout.TSIG_SIGN_EVERY_NTH - 1
|
|
|
+ # msg is not the TSIG_SIGN_EVERY_NTH one
|
|
|
+ # sending the message with last soa together
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa,
|
|
|
+ 0, packet_neet_not_sign)
|
|
|
+ get_msg = self.sock.read_msg()
|
|
|
+ self.assertTrue(self.message_has_tsig(get_msg))
|
|
|
+
|
|
|
+ self.assertEqual(get_msg.get_rr_count(Message.SECTION_QUESTION), 1)
|
|
|
+ self.assertEqual(get_msg.get_rr_count(Message.SECTION_ANSWER), 1)
|
|
|
+ self.assertEqual(get_msg.get_rr_count(Message.SECTION_AUTHORITY), 0)
|
|
|
+
|
|
|
+ # msg is the TSIG_SIGN_EVERY_NTH one
|
|
|
+ # sending the message with last soa together
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa,
|
|
|
+ 0, TSIG_SIGN_EVERY_NTH)
|
|
|
get_msg = self.sock.read_msg()
|
|
|
self.assertTrue(self.message_has_tsig(get_msg))
|
|
|
|
|
@@ -253,15 +285,21 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
|
|
|
msg = self.getmsg()
|
|
|
msg.make_response()
|
|
|
-
|
|
|
msg.add_rrset(Message.SECTION_ANSWER, rrset_a)
|
|
|
- # give the function a value that is larger than MAX-len(rrset)
|
|
|
- self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa, 65520)
|
|
|
|
|
|
+ # length larger than MAX-len(rrset)
|
|
|
+ length_need_split = xfrout.XFROUT_MAX_MESSAGE_SIZE - get_rrset_len(rrset_soa) + 1
|
|
|
+ # packet number less than TSIG_SIGN_EVERY_NTH
|
|
|
+ packet_neet_not_sign = xfrout.TSIG_SIGN_EVERY_NTH - 1
|
|
|
+
|
|
|
+ # give the function a value that is larger than MAX-len(rrset)
|
|
|
# this should have triggered the sending of two messages
|
|
|
# (1 with the rrset we added manually, and 1 that triggered
|
|
|
# the sending in _with_last_soa)
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa, length_need_split,
|
|
|
+ packet_neet_not_sign)
|
|
|
get_msg = self.sock.read_msg()
|
|
|
+ self.assertFalse(self.message_has_tsig(get_msg))
|
|
|
self.assertEqual(get_msg.get_rr_count(Message.SECTION_QUESTION), 1)
|
|
|
self.assertEqual(get_msg.get_rr_count(Message.SECTION_ANSWER), 1)
|
|
|
self.assertEqual(get_msg.get_rr_count(Message.SECTION_AUTHORITY), 0)
|
|
@@ -274,6 +312,7 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
self.assertEqual(rdata[0].to_text(), "192.0.2.1")
|
|
|
|
|
|
get_msg = self.sock.read_msg()
|
|
|
+ self.assertFalse(self.message_has_tsig(get_msg))
|
|
|
self.assertEqual(get_msg.get_rr_count(Message.SECTION_QUESTION), 0)
|
|
|
self.assertEqual(get_msg.get_rr_count(Message.SECTION_ANSWER), 1)
|
|
|
self.assertEqual(get_msg.get_rr_count(Message.SECTION_AUTHORITY), 0)
|
|
@@ -295,12 +334,36 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
msg = self.getmsg()
|
|
|
msg.make_response()
|
|
|
msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
|
|
|
- self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa, 65520)
|
|
|
+
|
|
|
+ # length larger than MAX-len(rrset)
|
|
|
+ length_need_split = xfrout.XFROUT_MAX_MESSAGE_SIZE - get_rrset_len(rrset_soa) + 1
|
|
|
+ # packet number less than TSIG_SIGN_EVERY_NTH
|
|
|
+ packet_neet_not_sign = xfrout.TSIG_SIGN_EVERY_NTH - 1
|
|
|
+
|
|
|
+ # give the function a value that is larger than MAX-len(rrset)
|
|
|
+ # this should have triggered the sending of two messages
|
|
|
+ # (1 with the rrset we added manually, and 1 that triggered
|
|
|
+ # the sending in _with_last_soa)
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa, length_need_split,
|
|
|
+ packet_neet_not_sign)
|
|
|
get_msg = self.sock.read_msg()
|
|
|
- # the last packet
|
|
|
+ # msg is not the TSIG_SIGN_EVERY_NTH one, it shouldn't be tsig signed
|
|
|
+ self.assertFalse(self.message_has_tsig(get_msg))
|
|
|
+ # the last packet should be tsig signed
|
|
|
get_msg = self.sock.read_msg()
|
|
|
self.assertTrue(self.message_has_tsig(get_msg))
|
|
|
+ # and it should not have sent anything else
|
|
|
+ self.assertEqual(0, len(self.sock.sendqueue))
|
|
|
|
|
|
+
|
|
|
+ # msg is the TSIG_SIGN_EVERY_NTH one, it should be tsig signed
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa, length_need_split,
|
|
|
+ xfrout.TSIG_SIGN_EVERY_NTH)
|
|
|
+ get_msg = self.sock.read_msg()
|
|
|
+ self.assertTrue(self.message_has_tsig(get_msg))
|
|
|
+ # the last packet should be tsig signed
|
|
|
+ get_msg = self.sock.read_msg()
|
|
|
+ self.assertTrue(self.message_has_tsig(get_msg))
|
|
|
# and it should not have sent anything else
|
|
|
self.assertEqual(0, len(self.sock.sendqueue))
|
|
|
|