|
@@ -18,11 +18,14 @@
|
|
|
|
|
|
import unittest
|
|
|
import os
|
|
|
+from isc.testutils.tsigctx_mock import MockTSIGContext
|
|
|
from isc.cc.session import *
|
|
|
from pydnspp import *
|
|
|
from xfrout import *
|
|
|
import xfrout
|
|
|
|
|
|
+TSIG_KEY = TSIGKey("example.com:SFuWd/q99SzF8Yzd1QbB9g==")
|
|
|
+
|
|
|
# our fake socket, where we can read and insert messages
|
|
|
class MySocket():
|
|
|
def __init__(self, family, type):
|
|
@@ -85,10 +88,36 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
msg.from_wire(self.mdata)
|
|
|
return msg
|
|
|
|
|
|
+ def create_mock_tsig_ctx(self, error):
|
|
|
+ # This helper function creates a MockTSIGContext for a given key
|
|
|
+ # and TSIG error to be used as a result of verify (normally faked
|
|
|
+ # one)
|
|
|
+ mock_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
+ mock_ctx.error = error
|
|
|
+ return mock_ctx
|
|
|
+
|
|
|
+ def message_has_tsig(self, msg):
|
|
|
+ return msg.get_tsig_record() is not None
|
|
|
+
|
|
|
+ def create_request_data_with_tsig(self):
|
|
|
+ msg = Message(Message.RENDER)
|
|
|
+ query_id = 0x1035
|
|
|
+ msg.set_qid(query_id)
|
|
|
+ msg.set_opcode(Opcode.QUERY())
|
|
|
+ msg.set_rcode(Rcode.NOERROR())
|
|
|
+ query_question = Question(Name("example.com."), RRClass.IN(), RRType.AXFR())
|
|
|
+ msg.add_question(query_question)
|
|
|
+
|
|
|
+ renderer = MessageRenderer()
|
|
|
+ tsig_ctx = MockTSIGContext(TSIG_KEY)
|
|
|
+ msg.to_wire(renderer, tsig_ctx)
|
|
|
+ reply_data = renderer.get_data()
|
|
|
+ return reply_data
|
|
|
+
|
|
|
def setUp(self):
|
|
|
self.sock = MySocket(socket.AF_INET,socket.SOCK_STREAM)
|
|
|
self.log = isc.log.NSLogger('xfrout', '', severity = 'critical', log_to_console = False )
|
|
|
- self.xfrsess = MyXfroutSession(self.sock, None, Dbserver(), self.log)
|
|
|
+ self.xfrsess = MyXfroutSession(self.sock, None, Dbserver(), self.log, TSIGKeyRing())
|
|
|
self.mdata = bytes(b'\xd6=\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\x03com\x00\x00\xfc\x00\x01')
|
|
|
self.soa_record = (4, 3, 'example.com.', 'com.example.', 3600, 'SOA', None, 'master.example.com. admin.example.com. 1234 3600 1800 2419200 7200')
|
|
|
|
|
@@ -96,6 +125,18 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
[get_rcode, get_msg] = self.xfrsess._parse_query_message(self.mdata)
|
|
|
self.assertEqual(get_rcode.to_text(), "NOERROR")
|
|
|
|
|
|
+ # tsig signed query message
|
|
|
+ request_data = self.create_request_data_with_tsig()
|
|
|
+ # BADKEY
|
|
|
+ [rcode, msg] = self.xfrsess._parse_query_message(request_data)
|
|
|
+ self.assertEqual(rcode.to_text(), "NOTAUTH")
|
|
|
+ self.assertTrue(self.xfrsess._tsig_ctx is not None)
|
|
|
+ # NOERROR
|
|
|
+ self.xfrsess._tsig_key_ring.add(TSIG_KEY)
|
|
|
+ [rcode, msg] = self.xfrsess._parse_query_message(request_data)
|
|
|
+ self.assertEqual(rcode.to_text(), "NOERROR")
|
|
|
+ self.assertTrue(self.xfrsess._tsig_ctx is not None)
|
|
|
+
|
|
|
def test_get_query_zone_name(self):
|
|
|
msg = self.getmsg()
|
|
|
self.assertEqual(self.xfrsess._get_query_zone_name(msg), "example.com.")
|
|
@@ -111,6 +152,14 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
get_msg = self.sock.read_msg()
|
|
|
self.assertEqual(get_msg.get_rcode().to_text(), "NXDOMAIN")
|
|
|
|
|
|
+ # tsig signed message
|
|
|
+ msg = self.getmsg()
|
|
|
+ self.xfrsess._tsig_ctx = self.create_mock_tsig_ctx(TSIGError.NOERROR)
|
|
|
+ self.xfrsess._reply_query_with_error_rcode(msg, self.sock, Rcode(3))
|
|
|
+ get_msg = self.sock.read_msg()
|
|
|
+ self.assertEqual(get_msg.get_rcode().to_text(), "NXDOMAIN")
|
|
|
+ self.assertTrue(self.message_has_tsig(get_msg))
|
|
|
+
|
|
|
def test_send_message(self):
|
|
|
msg = self.getmsg()
|
|
|
msg.make_response()
|
|
@@ -152,6 +201,14 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
get_msg = self.sock.read_msg()
|
|
|
self.assertEqual(get_msg.get_rcode().to_text(), "FORMERR")
|
|
|
|
|
|
+ # tsig signed message
|
|
|
+ msg = self.getmsg()
|
|
|
+ self.xfrsess._tsig_ctx = self.create_mock_tsig_ctx(TSIGError.NOERROR)
|
|
|
+ self.xfrsess._reply_query_with_format_error(msg, self.sock)
|
|
|
+ get_msg = self.sock.read_msg()
|
|
|
+ self.assertEqual(get_msg.get_rcode().to_text(), "FORMERR")
|
|
|
+ self.assertTrue(self.message_has_tsig(get_msg))
|
|
|
+
|
|
|
def test_create_rrset_from_db_record(self):
|
|
|
rrset = self.xfrsess._create_rrset_from_db_record(self.soa_record)
|
|
|
self.assertEqual(rrset.get_name().to_text(), "example.com.")
|
|
@@ -162,11 +219,16 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
|
|
|
def test_send_message_with_last_soa(self):
|
|
|
rrset_soa = self.xfrsess._create_rrset_from_db_record(self.soa_record)
|
|
|
-
|
|
|
msg = self.getmsg()
|
|
|
msg.make_response()
|
|
|
- self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa, 0)
|
|
|
+
|
|
|
+ # packet number less than TSIG_SIGN_EVERY_NTH
|
|
|
+ packet_neet_not_sign = xfrout.TSIG_SIGN_EVERY_NTH - 1
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa,
|
|
|
+ 0, packet_neet_not_sign)
|
|
|
get_msg = self.sock.read_msg()
|
|
|
+ # tsig context is not exist
|
|
|
+ self.assertFalse(self.message_has_tsig(get_msg))
|
|
|
|
|
|
self.assertEqual(get_msg.get_rr_count(Message.SECTION_QUESTION), 1)
|
|
|
self.assertEqual(get_msg.get_rr_count(Message.SECTION_ANSWER), 1)
|
|
@@ -180,6 +242,42 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
rdata = answer.get_rdata()
|
|
|
self.assertEqual(rdata[0].to_text(), self.soa_record[7])
|
|
|
|
|
|
+ # msg is the TSIG_SIGN_EVERY_NTH one
|
|
|
+ # sending the message with last soa together
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa,
|
|
|
+ 0, TSIG_SIGN_EVERY_NTH)
|
|
|
+ get_msg = self.sock.read_msg()
|
|
|
+ # tsig context is not exist
|
|
|
+ self.assertFalse(self.message_has_tsig(get_msg))
|
|
|
+
|
|
|
+ def test_send_message_with_last_soa_with_tsig(self):
|
|
|
+ # create tsig context
|
|
|
+ self.xfrsess._tsig_ctx = self.create_mock_tsig_ctx(TSIGError.NOERROR)
|
|
|
+
|
|
|
+ rrset_soa = self.xfrsess._create_rrset_from_db_record(self.soa_record)
|
|
|
+ msg = self.getmsg()
|
|
|
+ msg.make_response()
|
|
|
+
|
|
|
+ # packet number less than TSIG_SIGN_EVERY_NTH
|
|
|
+ packet_neet_not_sign = xfrout.TSIG_SIGN_EVERY_NTH - 1
|
|
|
+ # msg is not the TSIG_SIGN_EVERY_NTH one
|
|
|
+ # sending the message with last soa together
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa,
|
|
|
+ 0, packet_neet_not_sign)
|
|
|
+ get_msg = self.sock.read_msg()
|
|
|
+ self.assertTrue(self.message_has_tsig(get_msg))
|
|
|
+
|
|
|
+ self.assertEqual(get_msg.get_rr_count(Message.SECTION_QUESTION), 1)
|
|
|
+ self.assertEqual(get_msg.get_rr_count(Message.SECTION_ANSWER), 1)
|
|
|
+ self.assertEqual(get_msg.get_rr_count(Message.SECTION_AUTHORITY), 0)
|
|
|
+
|
|
|
+ # msg is the TSIG_SIGN_EVERY_NTH one
|
|
|
+ # sending the message with last soa together
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa,
|
|
|
+ 0, TSIG_SIGN_EVERY_NTH)
|
|
|
+ get_msg = self.sock.read_msg()
|
|
|
+ self.assertTrue(self.message_has_tsig(get_msg))
|
|
|
+
|
|
|
def test_trigger_send_message_with_last_soa(self):
|
|
|
rrset_a = RRset(Name("example.com"), RRClass.IN(), RRType.A(), RRTTL(3600))
|
|
|
rrset_a.add_rdata(Rdata(RRType.A(), RRClass.IN(), "192.0.2.1"))
|
|
@@ -187,15 +285,21 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
|
|
|
msg = self.getmsg()
|
|
|
msg.make_response()
|
|
|
-
|
|
|
msg.add_rrset(Message.SECTION_ANSWER, rrset_a)
|
|
|
- # give the function a value that is larger than MAX-len(rrset)
|
|
|
- self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa, 65520)
|
|
|
|
|
|
+ # length larger than MAX-len(rrset)
|
|
|
+ length_need_split = xfrout.XFROUT_MAX_MESSAGE_SIZE - get_rrset_len(rrset_soa) + 1
|
|
|
+ # packet number less than TSIG_SIGN_EVERY_NTH
|
|
|
+ packet_neet_not_sign = xfrout.TSIG_SIGN_EVERY_NTH - 1
|
|
|
+
|
|
|
+ # give the function a value that is larger than MAX-len(rrset)
|
|
|
# this should have triggered the sending of two messages
|
|
|
# (1 with the rrset we added manually, and 1 that triggered
|
|
|
# the sending in _with_last_soa)
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa, length_need_split,
|
|
|
+ packet_neet_not_sign)
|
|
|
get_msg = self.sock.read_msg()
|
|
|
+ self.assertFalse(self.message_has_tsig(get_msg))
|
|
|
self.assertEqual(get_msg.get_rr_count(Message.SECTION_QUESTION), 1)
|
|
|
self.assertEqual(get_msg.get_rr_count(Message.SECTION_ANSWER), 1)
|
|
|
self.assertEqual(get_msg.get_rr_count(Message.SECTION_AUTHORITY), 0)
|
|
@@ -208,6 +312,7 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
self.assertEqual(rdata[0].to_text(), "192.0.2.1")
|
|
|
|
|
|
get_msg = self.sock.read_msg()
|
|
|
+ self.assertFalse(self.message_has_tsig(get_msg))
|
|
|
self.assertEqual(get_msg.get_rr_count(Message.SECTION_QUESTION), 0)
|
|
|
self.assertEqual(get_msg.get_rr_count(Message.SECTION_ANSWER), 1)
|
|
|
self.assertEqual(get_msg.get_rr_count(Message.SECTION_AUTHORITY), 0)
|
|
@@ -223,6 +328,45 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
# and it should not have sent anything else
|
|
|
self.assertEqual(0, len(self.sock.sendqueue))
|
|
|
|
|
|
+ def test_trigger_send_message_with_last_soa_with_tsig(self):
|
|
|
+ self.xfrsess._tsig_ctx = self.create_mock_tsig_ctx(TSIGError.NOERROR)
|
|
|
+ rrset_soa = self.xfrsess._create_rrset_from_db_record(self.soa_record)
|
|
|
+ msg = self.getmsg()
|
|
|
+ msg.make_response()
|
|
|
+ msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
|
|
|
+
|
|
|
+ # length larger than MAX-len(rrset)
|
|
|
+ length_need_split = xfrout.XFROUT_MAX_MESSAGE_SIZE - get_rrset_len(rrset_soa) + 1
|
|
|
+ # packet number less than TSIG_SIGN_EVERY_NTH
|
|
|
+ packet_neet_not_sign = xfrout.TSIG_SIGN_EVERY_NTH - 1
|
|
|
+
|
|
|
+ # give the function a value that is larger than MAX-len(rrset)
|
|
|
+ # this should have triggered the sending of two messages
|
|
|
+ # (1 with the rrset we added manually, and 1 that triggered
|
|
|
+ # the sending in _with_last_soa)
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa, length_need_split,
|
|
|
+ packet_neet_not_sign)
|
|
|
+ get_msg = self.sock.read_msg()
|
|
|
+ # msg is not the TSIG_SIGN_EVERY_NTH one, it shouldn't be tsig signed
|
|
|
+ self.assertFalse(self.message_has_tsig(get_msg))
|
|
|
+ # the last packet should be tsig signed
|
|
|
+ get_msg = self.sock.read_msg()
|
|
|
+ self.assertTrue(self.message_has_tsig(get_msg))
|
|
|
+ # and it should not have sent anything else
|
|
|
+ self.assertEqual(0, len(self.sock.sendqueue))
|
|
|
+
|
|
|
+
|
|
|
+ # msg is the TSIG_SIGN_EVERY_NTH one, it should be tsig signed
|
|
|
+ self.xfrsess._send_message_with_last_soa(msg, self.sock, rrset_soa, length_need_split,
|
|
|
+ xfrout.TSIG_SIGN_EVERY_NTH)
|
|
|
+ get_msg = self.sock.read_msg()
|
|
|
+ self.assertTrue(self.message_has_tsig(get_msg))
|
|
|
+ # the last packet should be tsig signed
|
|
|
+ get_msg = self.sock.read_msg()
|
|
|
+ self.assertTrue(self.message_has_tsig(get_msg))
|
|
|
+ # and it should not have sent anything else
|
|
|
+ self.assertEqual(0, len(self.sock.sendqueue))
|
|
|
+
|
|
|
def test_get_rrset_len(self):
|
|
|
rrset_soa = self.xfrsess._create_rrset_from_db_record(self.soa_record)
|
|
|
self.assertEqual(82, get_rrset_len(rrset_soa))
|
|
@@ -313,6 +457,51 @@ class TestXfroutSession(unittest.TestCase):
|
|
|
reply_msg = self.sock.read_msg()
|
|
|
self.assertEqual(reply_msg.get_rr_count(Message.SECTION_ANSWER), 2)
|
|
|
|
|
|
+ def test_reply_xfrout_query_noerror_with_tsig(self):
|
|
|
+ rrset_data = (4, 3, 'a.example.com.', 'com.example.', 3600, 'A', None, '192.168.1.1')
|
|
|
+ global sqlite3_ds
|
|
|
+ global xfrout
|
|
|
+ def get_zone_soa(zonename, file):
|
|
|
+ return self.soa_record
|
|
|
+
|
|
|
+ def get_zone_datas(zone, file):
|
|
|
+ zone_rrsets = []
|
|
|
+ for i in range(0, 100):
|
|
|
+ zone_rrsets.insert(i, rrset_data)
|
|
|
+ return zone_rrsets
|
|
|
+
|
|
|
+ def get_rrset_len(rrset):
|
|
|
+ return 65520
|
|
|
+
|
|
|
+ sqlite3_ds.get_zone_soa = get_zone_soa
|
|
|
+ sqlite3_ds.get_zone_datas = get_zone_datas
|
|
|
+ xfrout.get_rrset_len = get_rrset_len
|
|
|
+
|
|
|
+ self.xfrsess._tsig_ctx = self.create_mock_tsig_ctx(TSIGError.NOERROR)
|
|
|
+ self.xfrsess._reply_xfrout_query(self.getmsg(), self.sock, "example.com.")
|
|
|
+
|
|
|
+ # tsig signed first package
|
|
|
+ reply_msg = self.sock.read_msg()
|
|
|
+ self.assertEqual(reply_msg.get_rr_count(Message.SECTION_ANSWER), 1)
|
|
|
+ self.assertTrue(self.message_has_tsig(reply_msg))
|
|
|
+ # (TSIG_SIGN_EVERY_NTH - 1) packets have no tsig
|
|
|
+ for i in range(0, xfrout.TSIG_SIGN_EVERY_NTH - 1):
|
|
|
+ reply_msg = self.sock.read_msg()
|
|
|
+ self.assertFalse(self.message_has_tsig(reply_msg))
|
|
|
+ # TSIG_SIGN_EVERY_NTH packet has tsig
|
|
|
+ reply_msg = self.sock.read_msg()
|
|
|
+ self.assertTrue(self.message_has_tsig(reply_msg))
|
|
|
+
|
|
|
+ for i in range(0, 100 - TSIG_SIGN_EVERY_NTH):
|
|
|
+ reply_msg = self.sock.read_msg()
|
|
|
+ self.assertFalse(self.message_has_tsig(reply_msg))
|
|
|
+ # tsig signed last package
|
|
|
+ reply_msg = self.sock.read_msg()
|
|
|
+ self.assertTrue(self.message_has_tsig(reply_msg))
|
|
|
+
|
|
|
+ # and it should not have sent anything else
|
|
|
+ self.assertEqual(0, len(self.sock.sendqueue))
|
|
|
+
|
|
|
class MyCCSession():
|
|
|
def __init__(self):
|
|
|
pass
|
|
@@ -347,8 +536,23 @@ class TestUnixSockServer(unittest.TestCase):
|
|
|
self.assertEqual(recv_msg, send_msg)
|
|
|
|
|
|
def test_updata_config_data(self):
|
|
|
+ tsig_key_str = 'example.com:SFuWd/q99SzF8Yzd1QbB9g=='
|
|
|
+ tsig_key_list = [tsig_key_str]
|
|
|
+ bad_key_list = ['bad..example.com:SFuWd/q99SzF8Yzd1QbB9g==']
|
|
|
self.unix.update_config_data({'transfers_out':10 })
|
|
|
self.assertEqual(self.unix._max_transfers_out, 10)
|
|
|
+ self.assertTrue(self.unix.tsig_key_ring is not None)
|
|
|
+
|
|
|
+ self.unix.update_config_data({'transfers_out':9, 'tsig_key_ring':tsig_key_list})
|
|
|
+ self.assertEqual(self.unix._max_transfers_out, 9)
|
|
|
+ self.assertEqual(self.unix.tsig_key_ring.size(), 1)
|
|
|
+ self.unix.tsig_key_ring.remove(Name("example.com."))
|
|
|
+ self.assertEqual(self.unix.tsig_key_ring.size(), 0)
|
|
|
+
|
|
|
+ # bad tsig key
|
|
|
+ config_data = {'transfers_out':9, 'tsig_key_ring': bad_key_list}
|
|
|
+ self.assertRaises(None, self.unix.update_config_data(config_data))
|
|
|
+ self.assertEqual(self.unix.tsig_key_ring.size(), 0)
|
|
|
|
|
|
def test_get_db_file(self):
|
|
|
self.assertEqual(self.unix.get_db_file(), "initdb.file")
|