|
@@ -277,6 +277,7 @@ class MockXfrinConnection(XfrinConnection):
|
|
rcode=Rcode.NOERROR(),
|
|
rcode=Rcode.NOERROR(),
|
|
questions=default_questions,
|
|
questions=default_questions,
|
|
answers=default_answers,
|
|
answers=default_answers,
|
|
|
|
+ authorities=[],
|
|
tsig_ctx=None):
|
|
tsig_ctx=None):
|
|
resp = Message(Message.RENDER)
|
|
resp = Message(Message.RENDER)
|
|
qid = self.qid
|
|
qid = self.qid
|
|
@@ -291,6 +292,7 @@ class MockXfrinConnection(XfrinConnection):
|
|
resp.set_header_flag(Message.HEADERFLAG_AA)
|
|
resp.set_header_flag(Message.HEADERFLAG_AA)
|
|
[resp.add_question(q) for q in questions]
|
|
[resp.add_question(q) for q in questions]
|
|
[resp.add_rrset(Message.SECTION_ANSWER, a) for a in answers]
|
|
[resp.add_rrset(Message.SECTION_ANSWER, a) for a in answers]
|
|
|
|
+ [resp.add_rrset(Message.SECTION_AUTHORITY, a) for a in authorities]
|
|
|
|
|
|
renderer = MessageRenderer()
|
|
renderer = MessageRenderer()
|
|
if tsig_ctx is not None:
|
|
if tsig_ctx is not None:
|
|
@@ -603,6 +605,7 @@ class TestXfrinConnection(unittest.TestCase):
|
|
'auth': True,
|
|
'auth': True,
|
|
'rcode': Rcode.NOERROR(),
|
|
'rcode': Rcode.NOERROR(),
|
|
'answers': default_answers,
|
|
'answers': default_answers,
|
|
|
|
+ 'authorities': [],
|
|
'tsig': False,
|
|
'tsig': False,
|
|
'axfr_after_soa': self._create_normal_response_data
|
|
'axfr_after_soa': self._create_normal_response_data
|
|
}
|
|
}
|
|
@@ -661,8 +664,9 @@ class TestXfrinConnection(unittest.TestCase):
|
|
response=self.soa_response_params['response'],
|
|
response=self.soa_response_params['response'],
|
|
auth=self.soa_response_params['auth'],
|
|
auth=self.soa_response_params['auth'],
|
|
rcode=self.soa_response_params['rcode'],
|
|
rcode=self.soa_response_params['rcode'],
|
|
- answers=self.soa_response_params['answers'],
|
|
|
|
questions=self.soa_response_params['questions'],
|
|
questions=self.soa_response_params['questions'],
|
|
|
|
+ answers=self.soa_response_params['answers'],
|
|
|
|
+ authorities=self.soa_response_params['authorities'],
|
|
tsig_ctx=verify_ctx)
|
|
tsig_ctx=verify_ctx)
|
|
if self.soa_response_params['axfr_after_soa'] != None:
|
|
if self.soa_response_params['axfr_after_soa'] != None:
|
|
self.conn.response_generator = \
|
|
self.conn.response_generator = \
|
|
@@ -846,8 +850,10 @@ class TestAXFR(TestXfrinConnection):
|
|
self.conn._tsig_key = TSIG_KEY
|
|
self.conn._tsig_key = TSIG_KEY
|
|
# server tsig check fail, return with RCODE 9 (NOTAUTH)
|
|
# server tsig check fail, return with RCODE 9 (NOTAUTH)
|
|
self.conn._send_query(RRType.SOA())
|
|
self.conn._send_query(RRType.SOA())
|
|
- self.conn.reply_data = self.conn.create_response_data(rcode=Rcode.NOTAUTH())
|
|
|
|
- self.assertRaises(XfrinException, self.conn._handle_xfrin_responses)
|
|
|
|
|
|
+ self.conn.reply_data = \
|
|
|
|
+ self.conn.create_response_data(rcode=Rcode.NOTAUTH())
|
|
|
|
+ self.assertRaises(XfrinProtocolError,
|
|
|
|
+ self.conn._handle_xfrin_responses)
|
|
|
|
|
|
def test_response_without_end_soa(self):
|
|
def test_response_without_end_soa(self):
|
|
self.conn._send_query(RRType.AXFR())
|
|
self.conn._send_query(RRType.AXFR())
|
|
@@ -860,7 +866,8 @@ class TestAXFR(TestXfrinConnection):
|
|
def test_response_bad_qid(self):
|
|
def test_response_bad_qid(self):
|
|
self.conn._send_query(RRType.AXFR())
|
|
self.conn._send_query(RRType.AXFR())
|
|
self.conn.reply_data = self.conn.create_response_data(bad_qid=True)
|
|
self.conn.reply_data = self.conn.create_response_data(bad_qid=True)
|
|
- self.assertRaises(XfrinException, self.conn._handle_xfrin_responses)
|
|
|
|
|
|
+ self.assertRaises(XfrinProtocolError,
|
|
|
|
+ self.conn._handle_xfrin_responses)
|
|
|
|
|
|
def test_response_error_code_bad_sig(self):
|
|
def test_response_error_code_bad_sig(self):
|
|
self.conn._tsig_key = TSIG_KEY
|
|
self.conn._tsig_key = TSIG_KEY
|
|
@@ -871,7 +878,7 @@ class TestAXFR(TestXfrinConnection):
|
|
rcode=Rcode.SERVFAIL())
|
|
rcode=Rcode.SERVFAIL())
|
|
# xfrin should check TSIG before other part of incoming message
|
|
# xfrin should check TSIG before other part of incoming message
|
|
# validate log message for XfrinException
|
|
# validate log message for XfrinException
|
|
- self.__match_exception(XfrinException,
|
|
|
|
|
|
+ self.__match_exception(XfrinProtocolError,
|
|
"TSIG verify fail: BADSIG",
|
|
"TSIG verify fail: BADSIG",
|
|
self.conn._handle_xfrin_responses)
|
|
self.conn._handle_xfrin_responses)
|
|
|
|
|
|
@@ -883,7 +890,7 @@ class TestAXFR(TestXfrinConnection):
|
|
self.conn.reply_data = self.conn.create_response_data(bad_qid=True)
|
|
self.conn.reply_data = self.conn.create_response_data(bad_qid=True)
|
|
# xfrin should check TSIG before other part of incoming message
|
|
# xfrin should check TSIG before other part of incoming message
|
|
# validate log message for XfrinException
|
|
# validate log message for XfrinException
|
|
- self.__match_exception(XfrinException,
|
|
|
|
|
|
+ self.__match_exception(XfrinProtocolError,
|
|
"TSIG verify fail: BADKEY",
|
|
"TSIG verify fail: BADKEY",
|
|
self.conn._handle_xfrin_responses)
|
|
self.conn._handle_xfrin_responses)
|
|
|
|
|
|
@@ -896,18 +903,21 @@ class TestAXFR(TestXfrinConnection):
|
|
self.conn._send_query(RRType.AXFR())
|
|
self.conn._send_query(RRType.AXFR())
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
rcode=Rcode.SERVFAIL())
|
|
rcode=Rcode.SERVFAIL())
|
|
- self.assertRaises(XfrinException, self.conn._handle_xfrin_responses)
|
|
|
|
|
|
+ self.assertRaises(XfrinProtocolError,
|
|
|
|
+ self.conn._handle_xfrin_responses)
|
|
|
|
|
|
def test_response_multi_question(self):
|
|
def test_response_multi_question(self):
|
|
self.conn._send_query(RRType.AXFR())
|
|
self.conn._send_query(RRType.AXFR())
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
questions=[example_axfr_question, example_axfr_question])
|
|
questions=[example_axfr_question, example_axfr_question])
|
|
- self.assertRaises(XfrinException, self.conn._handle_xfrin_responses)
|
|
|
|
|
|
+ self.assertRaises(XfrinProtocolError,
|
|
|
|
+ self.conn._handle_xfrin_responses)
|
|
|
|
|
|
def test_response_non_response(self):
|
|
def test_response_non_response(self):
|
|
self.conn._send_query(RRType.AXFR())
|
|
self.conn._send_query(RRType.AXFR())
|
|
self.conn.reply_data = self.conn.create_response_data(response = False)
|
|
self.conn.reply_data = self.conn.create_response_data(response = False)
|
|
- self.assertRaises(XfrinException, self.conn._handle_xfrin_responses)
|
|
|
|
|
|
+ self.assertRaises(XfrinProtocolError,
|
|
|
|
+ self.conn._handle_xfrin_responses)
|
|
|
|
|
|
def test_soacheck(self):
|
|
def test_soacheck(self):
|
|
# we need to defer the creation until we know the QID, which is
|
|
# we need to defer the creation until we know the QID, which is
|
|
@@ -922,7 +932,7 @@ class TestAXFR(TestXfrinConnection):
|
|
def test_soacheck_badqid(self):
|
|
def test_soacheck_badqid(self):
|
|
self.soa_response_params['bad_qid'] = True
|
|
self.soa_response_params['bad_qid'] = True
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
- self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
|
|
|
+ self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
|
|
def test_soacheck_bad_qid_bad_sig(self):
|
|
def test_soacheck_bad_qid_bad_sig(self):
|
|
self.conn._tsig_key = TSIG_KEY
|
|
self.conn._tsig_key = TSIG_KEY
|
|
@@ -932,19 +942,19 @@ class TestAXFR(TestXfrinConnection):
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
# xfrin should check TSIG before other part of incoming message
|
|
# xfrin should check TSIG before other part of incoming message
|
|
# validate log message for XfrinException
|
|
# validate log message for XfrinException
|
|
- self.__match_exception(XfrinException,
|
|
|
|
|
|
+ self.__match_exception(XfrinProtocolError,
|
|
"TSIG verify fail: BADSIG",
|
|
"TSIG verify fail: BADSIG",
|
|
self.conn._check_soa_serial)
|
|
self.conn._check_soa_serial)
|
|
|
|
|
|
def test_soacheck_non_response(self):
|
|
def test_soacheck_non_response(self):
|
|
self.soa_response_params['response'] = False
|
|
self.soa_response_params['response'] = False
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
- self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
|
|
|
+ self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
|
|
def test_soacheck_error_code(self):
|
|
def test_soacheck_error_code(self):
|
|
self.soa_response_params['rcode'] = Rcode.SERVFAIL()
|
|
self.soa_response_params['rcode'] = Rcode.SERVFAIL()
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
- self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
|
|
|
+ self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
|
|
def test_soacheck_notauth(self):
|
|
def test_soacheck_notauth(self):
|
|
self.soa_response_params['auth'] = False
|
|
self.soa_response_params['auth'] = False
|
|
@@ -996,6 +1006,49 @@ class TestAXFR(TestXfrinConnection):
|
|
RRType.AAAA())]
|
|
RRType.AAAA())]
|
|
self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
|
|
|
|
+ def test_soacheck_no_soa(self):
|
|
|
|
+ # The response just doesn't contain SOA without any other indication
|
|
|
|
+ # of errors.
|
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
|
+ self.soa_response_params['answers'] = []
|
|
|
|
+ self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
+
|
|
|
|
+ def test_soacheck_soa_name_mismatch(self):
|
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
|
+ self.soa_response_params['answers'] = [create_soa(1234,
|
|
|
|
+ Name('example.org'))]
|
|
|
|
+ self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
+
|
|
|
|
+ def test_soacheck_soa_class_mismatch(self):
|
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
|
+ soa = RRset(TEST_ZONE_NAME, RRClass.CH(), RRType.SOA(), RRTTL(0))
|
|
|
|
+ soa.add_rdata(Rdata(RRType.SOA(), RRClass.CH(), 'm. r. 1234 0 0 0 0'))
|
|
|
|
+ self.soa_response_params['answers'] = [soa]
|
|
|
|
+ self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
+
|
|
|
|
+ def test_soacheck_multiple_soa(self):
|
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
|
+ self.soa_response_params['answers'] = [soa_rrset, soa_rrset]
|
|
|
|
+ self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
+
|
|
|
|
+ def test_soacheck_cname_response(self):
|
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
|
+ # Add SOA to answer, too, to make sure that it that deceives the parser
|
|
|
|
+ self.soa_response_params['answers'] = [soa_rrset, create_cname()]
|
|
|
|
+ self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
+
|
|
|
|
+ def test_soacheck_referral_response(self):
|
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
|
+ self.soa_response_params['answers'] = []
|
|
|
|
+ self.soa_response_params['authorities'] = [create_ns('ns.example.com')]
|
|
|
|
+ self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
+
|
|
|
|
+ def test_soacheck_nodata_response(self):
|
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
|
+ self.soa_response_params['answers'] = []
|
|
|
|
+ self.soa_response_params['authorities'] = [soa_rrset]
|
|
|
|
+ self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
+
|
|
def test_soacheck_with_tsig(self):
|
|
def test_soacheck_with_tsig(self):
|
|
# Use a mock tsig context emulating a validly signed response
|
|
# Use a mock tsig context emulating a validly signed response
|
|
self.conn._tsig_key = TSIG_KEY
|
|
self.conn._tsig_key = TSIG_KEY
|
|
@@ -1013,7 +1066,7 @@ class TestAXFR(TestXfrinConnection):
|
|
self.soa_response_params['rcode'] = Rcode.NOTAUTH()
|
|
self.soa_response_params['rcode'] = Rcode.NOTAUTH()
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
|
|
|
- self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
|
|
|
+ self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
|
|
def test_soacheck_with_tsig_noerror_badsig(self):
|
|
def test_soacheck_with_tsig_noerror_badsig(self):
|
|
self.conn._tsig_key = TSIG_KEY
|
|
self.conn._tsig_key = TSIG_KEY
|
|
@@ -1026,7 +1079,7 @@ class TestAXFR(TestXfrinConnection):
|
|
# treat this as a final failure (just as BIND 9 does).
|
|
# treat this as a final failure (just as BIND 9 does).
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
|
|
|
- self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
|
|
|
+ self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
|
|
def test_soacheck_with_tsig_unsigned_response(self):
|
|
def test_soacheck_with_tsig_unsigned_response(self):
|
|
# we can use a real TSIGContext for this. the response doesn't
|
|
# we can use a real TSIGContext for this. the response doesn't
|
|
@@ -1035,14 +1088,14 @@ class TestAXFR(TestXfrinConnection):
|
|
# it as a fatal transaction failure, too.
|
|
# it as a fatal transaction failure, too.
|
|
self.conn._tsig_key = TSIG_KEY
|
|
self.conn._tsig_key = TSIG_KEY
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
- self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
|
|
|
+ self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
|
|
def test_soacheck_with_unexpected_tsig_response(self):
|
|
def test_soacheck_with_unexpected_tsig_response(self):
|
|
# we reject unexpected TSIG in responses (following BIND 9's
|
|
# we reject unexpected TSIG in responses (following BIND 9's
|
|
# behavior)
|
|
# behavior)
|
|
self.soa_response_params['tsig'] = True
|
|
self.soa_response_params['tsig'] = True
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
- self.assertRaises(XfrinException, self.conn._check_soa_serial)
|
|
|
|
|
|
+ self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
|
|
def test_response_shutdown(self):
|
|
def test_response_shutdown(self):
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
@@ -1309,6 +1362,13 @@ class TestAXFR(TestXfrinConnection):
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
self.assertEqual(self.conn.do_xfrin(True), XFRIN_OK)
|
|
self.assertEqual(self.conn.do_xfrin(True), XFRIN_OK)
|
|
|
|
|
|
|
|
+ def test_do_soacheck_protocol_error(self):
|
|
|
|
+ # There are several cases, but at this level it's sufficient to check
|
|
|
|
+ # only one. We use the case where there's no SOA in the response.
|
|
|
|
+ self.soa_response_params['answers'] = []
|
|
|
|
+ self.conn.response_generator = self._create_soa_response_data
|
|
|
|
+ self.assertEqual(self.conn.do_xfrin(True), XFRIN_FAIL)
|
|
|
|
+
|
|
def test_do_soacheck_and_xfrin_with_tsig(self):
|
|
def test_do_soacheck_and_xfrin_with_tsig(self):
|
|
# We are going to have a SOA query/response transaction, followed by
|
|
# We are going to have a SOA query/response transaction, followed by
|
|
# AXFR, all TSIG signed. xfrin should use a new TSIG context for
|
|
# AXFR, all TSIG signed. xfrin should use a new TSIG context for
|