|
@@ -40,7 +40,7 @@ import sqlite3
|
|
|
#
|
|
|
TEST_ZONE_NAME_STR = "example.com."
|
|
|
TEST_ZONE_NAME = Name(TEST_ZONE_NAME_STR)
|
|
|
-TEST_RRCLASS = RRClass.IN()
|
|
|
+TEST_RRCLASS = RRClass.IN
|
|
|
TEST_RRCLASS_STR = 'IN'
|
|
|
TEST_DB_FILE = 'db_file'
|
|
|
TEST_MASTER_IPV4_ADDRESS = '127.0.0.1'
|
|
@@ -59,21 +59,21 @@ TEST_MASTER_PORT = '53535'
|
|
|
TSIG_KEY = TSIGKey("example.com:SFuWd/q99SzF8Yzd1QbB9g==")
|
|
|
|
|
|
# SOA intended to be used for the new SOA as a result of transfer.
|
|
|
-soa_rdata = Rdata(RRType.SOA(), TEST_RRCLASS,
|
|
|
+soa_rdata = Rdata(RRType.SOA, TEST_RRCLASS,
|
|
|
'master.example.com. admin.example.com. ' +
|
|
|
'1234 3600 1800 2419200 7200')
|
|
|
-soa_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA(), RRTTL(3600))
|
|
|
+soa_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA, RRTTL(3600))
|
|
|
soa_rrset.add_rdata(soa_rdata)
|
|
|
|
|
|
# SOA intended to be used for the current SOA at the secondary side.
|
|
|
# Note that its serial is smaller than that of soa_rdata.
|
|
|
-begin_soa_rdata = Rdata(RRType.SOA(), TEST_RRCLASS,
|
|
|
+begin_soa_rdata = Rdata(RRType.SOA, TEST_RRCLASS,
|
|
|
'master.example.com. admin.example.com. ' +
|
|
|
'1230 3600 1800 2419200 7200')
|
|
|
-begin_soa_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA(), RRTTL(3600))
|
|
|
+begin_soa_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA, RRTTL(3600))
|
|
|
begin_soa_rrset.add_rdata(begin_soa_rdata)
|
|
|
-example_axfr_question = Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.AXFR())
|
|
|
-example_soa_question = Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA())
|
|
|
+example_axfr_question = Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.AXFR)
|
|
|
+example_soa_question = Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA)
|
|
|
default_questions = [example_axfr_question]
|
|
|
default_answers = [soa_rrset]
|
|
|
|
|
@@ -208,12 +208,12 @@ class MockDataSourceClient():
|
|
|
zone names.
|
|
|
|
|
|
'''
|
|
|
- if name == TEST_ZONE_NAME and rrtype == RRType.SOA():
|
|
|
+ if name == TEST_ZONE_NAME and rrtype == RRType.SOA:
|
|
|
return (ZoneFinder.SUCCESS, begin_soa_rrset, 0)
|
|
|
if name == Name('no-soa.example'):
|
|
|
return (ZoneFinder.NXDOMAIN, None, 0)
|
|
|
if name == Name('dup-soa.example'):
|
|
|
- dup_soa_rrset = RRset(name, TEST_RRCLASS, RRType.SOA(), RRTTL(0))
|
|
|
+ dup_soa_rrset = RRset(name, TEST_RRCLASS, RRType.SOA, RRTTL(0))
|
|
|
dup_soa_rrset.add_rdata(begin_soa_rdata)
|
|
|
dup_soa_rrset.add_rdata(soa_rdata)
|
|
|
return (ZoneFinder.SUCCESS, dup_soa_rrset, 0)
|
|
@@ -329,7 +329,7 @@ class MockXfrinConnection(XfrinConnection):
|
|
|
return len(data)
|
|
|
|
|
|
def create_response_data(self, response=True, auth=True, bad_qid=False,
|
|
|
- rcode=Rcode.NOERROR(),
|
|
|
+ rcode=Rcode.NOERROR,
|
|
|
questions=default_questions,
|
|
|
answers=default_answers,
|
|
|
authorities=[],
|
|
@@ -339,7 +339,7 @@ class MockXfrinConnection(XfrinConnection):
|
|
|
if bad_qid:
|
|
|
qid += 1
|
|
|
resp.set_qid(qid)
|
|
|
- resp.set_opcode(Opcode.QUERY())
|
|
|
+ resp.set_opcode(Opcode.QUERY)
|
|
|
resp.set_rcode(rcode)
|
|
|
if response:
|
|
|
resp.set_header_flag(Message.HEADERFLAG_QR)
|
|
@@ -366,17 +366,17 @@ class TestXfrinState(unittest.TestCase):
|
|
|
TEST_RRCLASS, None, threading.Event(),
|
|
|
TEST_MASTER_IPV4_ADDRINFO)
|
|
|
self.conn.init_socket()
|
|
|
- self.begin_soa = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA(),
|
|
|
+ self.begin_soa = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA,
|
|
|
RRTTL(3600))
|
|
|
- self.begin_soa.add_rdata(Rdata(RRType.SOA(), TEST_RRCLASS,
|
|
|
+ self.begin_soa.add_rdata(Rdata(RRType.SOA, TEST_RRCLASS,
|
|
|
'm. r. 1230 0 0 0 0'))
|
|
|
- self.ns_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.NS(),
|
|
|
+ self.ns_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.NS,
|
|
|
RRTTL(3600))
|
|
|
- self.ns_rrset.add_rdata(Rdata(RRType.NS(), TEST_RRCLASS,
|
|
|
+ self.ns_rrset.add_rdata(Rdata(RRType.NS, TEST_RRCLASS,
|
|
|
'ns.example.com.'))
|
|
|
- self.a_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.A(),
|
|
|
+ self.a_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.A,
|
|
|
RRTTL(3600))
|
|
|
- self.a_rrset.add_rdata(Rdata(RRType.A(), TEST_RRCLASS, '192.0.2.1'))
|
|
|
+ self.a_rrset.add_rdata(Rdata(RRType.A, TEST_RRCLASS, '192.0.2.1'))
|
|
|
|
|
|
self.conn._datasrc_client = MockDataSourceClient()
|
|
|
self.conn._diff = Diff(self.conn._datasrc_client, TEST_ZONE_NAME)
|
|
@@ -408,14 +408,14 @@ class TestXfrinInitialSOA(TestXfrinState):
|
|
|
self.ns_rrset)
|
|
|
|
|
|
def test_handle_ixfr_uptodate(self):
|
|
|
- self.conn._request_type = RRType.IXFR()
|
|
|
+ self.conn._request_type = RRType.IXFR
|
|
|
self.conn._request_serial = isc.dns.Serial(1234) # same as soa_rrset
|
|
|
self.assertTrue(self.state.handle_rr(self.conn, soa_rrset))
|
|
|
self.assertEqual(type(XfrinIXFRUptodate()),
|
|
|
type(self.conn.get_xfrstate()))
|
|
|
|
|
|
def test_handle_ixfr_uptodate2(self):
|
|
|
- self.conn._request_type = RRType.IXFR()
|
|
|
+ self.conn._request_type = RRType.IXFR
|
|
|
self.conn._request_serial = isc.dns.Serial(1235) # > soa_rrset
|
|
|
self.assertTrue(self.state.handle_rr(self.conn, soa_rrset))
|
|
|
self.assertEqual(type(XfrinIXFRUptodate()),
|
|
@@ -424,7 +424,7 @@ class TestXfrinInitialSOA(TestXfrinState):
|
|
|
def test_handle_ixfr_uptodate3(self):
|
|
|
# Similar to the previous case, but checking serial number arithmetic
|
|
|
# comparison
|
|
|
- self.conn._request_type = RRType.IXFR()
|
|
|
+ self.conn._request_type = RRType.IXFR
|
|
|
self.conn._request_serial = isc.dns.Serial(0xffffffff)
|
|
|
self.assertTrue(self.state.handle_rr(self.conn, soa_rrset))
|
|
|
self.assertEqual(type(XfrinFirstData()),
|
|
@@ -432,7 +432,7 @@ class TestXfrinInitialSOA(TestXfrinState):
|
|
|
|
|
|
def test_handle_axfr_uptodate(self):
|
|
|
# "request serial" should matter only for IXFR
|
|
|
- self.conn._request_type = RRType.AXFR()
|
|
|
+ self.conn._request_type = RRType.AXFR
|
|
|
self.conn._request_serial = isc.dns.Serial(1234) # same as soa_rrset
|
|
|
self.assertTrue(self.state.handle_rr(self.conn, soa_rrset))
|
|
|
self.assertEqual(type(XfrinFirstData()),
|
|
@@ -445,13 +445,13 @@ class TestXfrinFirstData(TestXfrinState):
|
|
|
def setUp(self):
|
|
|
super().setUp()
|
|
|
self.state = XfrinFirstData()
|
|
|
- self.conn._request_type = RRType.IXFR()
|
|
|
+ self.conn._request_type = RRType.IXFR
|
|
|
# arbitrary chosen serial < 1234:
|
|
|
self.conn._request_serial = isc.dns.Serial(1230)
|
|
|
self.conn._diff = None # should be replaced in the AXFR case
|
|
|
|
|
|
def test_handle_ixfr_begin_soa(self):
|
|
|
- self.conn._request_type = RRType.IXFR()
|
|
|
+ self.conn._request_type = RRType.IXFR
|
|
|
self.assertFalse(self.state.handle_rr(self.conn, self.begin_soa))
|
|
|
self.assertEqual(type(XfrinIXFRDeleteSOA()),
|
|
|
type(self.conn.get_xfrstate()))
|
|
@@ -459,7 +459,7 @@ class TestXfrinFirstData(TestXfrinState):
|
|
|
def test_handle_axfr(self):
|
|
|
# If the original type is AXFR, other conditions aren't considered,
|
|
|
# and AXFR processing will continue
|
|
|
- self.conn._request_type = RRType.AXFR()
|
|
|
+ self.conn._request_type = RRType.AXFR
|
|
|
self.assertFalse(self.state.handle_rr(self.conn, self.begin_soa))
|
|
|
self.assertEqual(type(XfrinAXFR()), type(self.conn.get_xfrstate()))
|
|
|
|
|
@@ -598,9 +598,9 @@ class TestXfrinIXFRAdd(TestXfrinState):
|
|
|
# First, push a starting SOA inside. This should be OK, nothing checked
|
|
|
# yet.
|
|
|
self.state.handle_rr(self.conn, self.begin_soa)
|
|
|
- end_soa_rdata = Rdata(RRType.SOA(), TEST_RRCLASS,
|
|
|
+ end_soa_rdata = Rdata(RRType.SOA, TEST_RRCLASS,
|
|
|
'm. r. 1234 0 0 0 0')
|
|
|
- end_soa_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA(),
|
|
|
+ end_soa_rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA,
|
|
|
RRTTL(3600))
|
|
|
end_soa_rrset.add_rdata(end_soa_rdata)
|
|
|
# This would try to finish up. But the TSIG pretends not everything is
|
|
@@ -724,7 +724,7 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
'bad_qid': False,
|
|
|
'response': True,
|
|
|
'auth': True,
|
|
|
- 'rcode': Rcode.NOERROR(),
|
|
|
+ 'rcode': Rcode.NOERROR,
|
|
|
'answers': default_answers,
|
|
|
'authorities': [],
|
|
|
'tsig': False,
|
|
@@ -817,21 +817,21 @@ class TestXfrinConnection(unittest.TestCase):
|
|
|
self.conn.reply_data += bogus_data
|
|
|
|
|
|
def _create_a(self, address):
|
|
|
- rrset = RRset(Name('a.example.com'), TEST_RRCLASS, RRType.A(),
|
|
|
+ rrset = RRset(Name('a.example.com'), TEST_RRCLASS, RRType.A,
|
|
|
RRTTL(3600))
|
|
|
- rrset.add_rdata(Rdata(RRType.A(), TEST_RRCLASS, address))
|
|
|
+ rrset.add_rdata(Rdata(RRType.A, TEST_RRCLASS, address))
|
|
|
return rrset
|
|
|
|
|
|
def _create_soa(self, serial):
|
|
|
- rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA(),
|
|
|
+ rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA,
|
|
|
RRTTL(3600))
|
|
|
rdata_str = 'm. r. ' + serial + ' 3600 1800 2419200 7200'
|
|
|
- rrset.add_rdata(Rdata(RRType.SOA(), TEST_RRCLASS, rdata_str))
|
|
|
+ rrset.add_rdata(Rdata(RRType.SOA, TEST_RRCLASS, rdata_str))
|
|
|
return rrset
|
|
|
|
|
|
def _create_ns(self, nsname='ns.'+TEST_ZONE_NAME_STR):
|
|
|
- rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.NS(), RRTTL(3600))
|
|
|
- rrset.add_rdata(Rdata(RRType.NS(), TEST_RRCLASS, nsname))
|
|
|
+ rrset = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.NS, RRTTL(3600))
|
|
|
+ rrset.add_rdata(Rdata(RRType.NS, TEST_RRCLASS, nsname))
|
|
|
return rrset
|
|
|
|
|
|
def _set_test_zone(self, zone_name):
|
|
@@ -899,19 +899,19 @@ class TestAXFR(TestXfrinConnection):
|
|
|
c.close()
|
|
|
|
|
|
def test_init_chclass(self):
|
|
|
- c = MockXfrinConnection({}, TEST_ZONE_NAME, RRClass.CH(), None,
|
|
|
+ c = MockXfrinConnection({}, TEST_ZONE_NAME, RRClass.CH, None,
|
|
|
threading.Event(), TEST_MASTER_IPV4_ADDRINFO)
|
|
|
c.init_socket()
|
|
|
- axfrmsg = c._create_query(RRType.AXFR())
|
|
|
+ axfrmsg = c._create_query(RRType.AXFR)
|
|
|
self.assertEqual(axfrmsg.get_question()[0].get_class(),
|
|
|
- RRClass.CH())
|
|
|
+ RRClass.CH)
|
|
|
c.close()
|
|
|
|
|
|
def test_create_query(self):
|
|
|
def check_query(expected_qtype, expected_auth):
|
|
|
'''Helper method to repeat the same pattern of tests'''
|
|
|
- self.assertEqual(Opcode.QUERY(), msg.get_opcode())
|
|
|
- self.assertEqual(Rcode.NOERROR(), msg.get_rcode())
|
|
|
+ self.assertEqual(Opcode.QUERY, msg.get_opcode())
|
|
|
+ self.assertEqual(Rcode.NOERROR, msg.get_rcode())
|
|
|
self.assertEqual(1, msg.get_rr_count(Message.SECTION_QUESTION))
|
|
|
self.assertEqual(TEST_ZONE_NAME, msg.get_question()[0].get_name())
|
|
|
self.assertEqual(expected_qtype, msg.get_question()[0].get_type())
|
|
@@ -936,16 +936,16 @@ class TestAXFR(TestXfrinConnection):
|
|
|
|
|
|
# Actual tests start here
|
|
|
# SOA query
|
|
|
- msg = self.conn._create_query(RRType.SOA())
|
|
|
- check_query(RRType.SOA(), None)
|
|
|
+ msg = self.conn._create_query(RRType.SOA)
|
|
|
+ check_query(RRType.SOA, None)
|
|
|
|
|
|
# AXFR query
|
|
|
- msg = self.conn._create_query(RRType.AXFR())
|
|
|
- check_query(RRType.AXFR(), None)
|
|
|
+ msg = self.conn._create_query(RRType.AXFR)
|
|
|
+ check_query(RRType.AXFR, None)
|
|
|
|
|
|
# IXFR query
|
|
|
- msg = self.conn._create_query(RRType.IXFR())
|
|
|
- check_query(RRType.IXFR(), begin_soa_rrset)
|
|
|
+ msg = self.conn._create_query(RRType.IXFR)
|
|
|
+ check_query(RRType.IXFR, begin_soa_rrset)
|
|
|
self.assertEqual(1230, self.conn._request_serial.get_value())
|
|
|
|
|
|
def test_create_ixfr_query_fail(self):
|
|
@@ -954,20 +954,20 @@ class TestAXFR(TestXfrinConnection):
|
|
|
|
|
|
self._set_test_zone(Name('no-such-zone.example'))
|
|
|
self.assertRaises(XfrinException, self.conn._create_query,
|
|
|
- RRType.IXFR())
|
|
|
+ RRType.IXFR)
|
|
|
|
|
|
self._set_test_zone(Name('partial-match-zone.example'))
|
|
|
self.assertRaises(XfrinException, self.conn._create_query,
|
|
|
- RRType.IXFR())
|
|
|
+ RRType.IXFR)
|
|
|
|
|
|
self._set_test_zone(Name('no-soa.example'))
|
|
|
self.assertRaises(XfrinException, self.conn._create_query,
|
|
|
- RRType.IXFR())
|
|
|
+ RRType.IXFR)
|
|
|
|
|
|
self._set_test_zone(Name('dup-soa.example'))
|
|
|
self.conn._zone_soa = self.conn._get_zone_soa()
|
|
|
self.assertRaises(XfrinException, self.conn._create_query,
|
|
|
- RRType.IXFR())
|
|
|
+ RRType.IXFR)
|
|
|
|
|
|
def test_send_query(self):
|
|
|
def message_has_tsig(data):
|
|
@@ -980,11 +980,11 @@ class TestAXFR(TestXfrinConnection):
|
|
|
|
|
|
# soa request with tsig
|
|
|
self.conn._tsig_key = TSIG_KEY
|
|
|
- self.conn._send_query(RRType.SOA())
|
|
|
+ self.conn._send_query(RRType.SOA)
|
|
|
self.assertTrue(message_has_tsig(self.conn.query_data[2:]))
|
|
|
|
|
|
# axfr request with tsig
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.assertTrue(message_has_tsig(self.conn.query_data[2:]))
|
|
|
|
|
|
def test_response_with_invalid_msg(self):
|
|
@@ -995,14 +995,14 @@ class TestAXFR(TestXfrinConnection):
|
|
|
def test_response_with_tsigfail(self):
|
|
|
self.conn._tsig_key = TSIG_KEY
|
|
|
# server tsig check fail, return with RCODE 9 (NOTAUTH)
|
|
|
- self.conn._send_query(RRType.SOA())
|
|
|
+ self.conn._send_query(RRType.SOA)
|
|
|
self.conn.reply_data = \
|
|
|
- self.conn.create_response_data(rcode=Rcode.NOTAUTH())
|
|
|
+ self.conn.create_response_data(rcode=Rcode.NOTAUTH)
|
|
|
self.assertRaises(XfrinProtocolError,
|
|
|
self.conn._handle_xfrin_responses)
|
|
|
|
|
|
def test_response_without_end_soa(self):
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.conn.reply_data = self.conn.create_response_data()
|
|
|
# This should result in timeout in the asyncore loop. We emulate
|
|
|
# that situation in recv() by emptying the reply data buffer.
|
|
@@ -1010,7 +1010,7 @@ class TestAXFR(TestXfrinConnection):
|
|
|
self.conn._handle_xfrin_responses)
|
|
|
|
|
|
def test_response_bad_qid(self):
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.conn.reply_data = self.conn.create_response_data(bad_qid=True)
|
|
|
self.assertRaises(XfrinProtocolError,
|
|
|
self.conn._handle_xfrin_responses)
|
|
@@ -1019,9 +1019,9 @@ class TestAXFR(TestXfrinConnection):
|
|
|
self.conn._tsig_key = TSIG_KEY
|
|
|
self.conn._tsig_ctx_creator = \
|
|
|
lambda key: self.__create_mock_tsig(key, TSIGError.BAD_SIG)
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
- rcode=Rcode.SERVFAIL())
|
|
|
+ rcode=Rcode.SERVFAIL)
|
|
|
# xfrin should check TSIG before other part of incoming message
|
|
|
# validate log message for XfrinException
|
|
|
self.__match_exception(XfrinProtocolError,
|
|
@@ -1032,7 +1032,7 @@ class TestAXFR(TestXfrinConnection):
|
|
|
self.conn._tsig_key = TSIG_KEY
|
|
|
self.conn._tsig_ctx_creator = \
|
|
|
lambda key: self.__create_mock_tsig(key, TSIGError.BAD_KEY)
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.conn.reply_data = self.conn.create_response_data(bad_qid=True)
|
|
|
# xfrin should check TSIG before other part of incoming message
|
|
|
# validate log message for XfrinException
|
|
@@ -1041,26 +1041,26 @@ class TestAXFR(TestXfrinConnection):
|
|
|
self.conn._handle_xfrin_responses)
|
|
|
|
|
|
def test_response_non_response(self):
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.conn.reply_data = self.conn.create_response_data(response=False)
|
|
|
self.assertRaises(XfrinException, self.conn._handle_xfrin_responses)
|
|
|
|
|
|
def test_response_error_code(self):
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
- rcode=Rcode.SERVFAIL())
|
|
|
+ rcode=Rcode.SERVFAIL)
|
|
|
self.assertRaises(XfrinProtocolError,
|
|
|
self.conn._handle_xfrin_responses)
|
|
|
|
|
|
def test_response_multi_question(self):
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
questions=[example_axfr_question, example_axfr_question])
|
|
|
self.assertRaises(XfrinProtocolError,
|
|
|
self.conn._handle_xfrin_responses)
|
|
|
|
|
|
def test_response_non_response(self):
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.conn.reply_data = self.conn.create_response_data(response = False)
|
|
|
self.assertRaises(XfrinProtocolError,
|
|
|
self.conn._handle_xfrin_responses)
|
|
@@ -1098,7 +1098,7 @@ class TestAXFR(TestXfrinConnection):
|
|
|
self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
|
|
def test_soacheck_error_code(self):
|
|
|
- self.soa_response_params['rcode'] = Rcode.SERVFAIL()
|
|
|
+ self.soa_response_params['rcode'] = Rcode.SERVFAIL
|
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
|
@@ -1146,21 +1146,21 @@ class TestAXFR(TestXfrinConnection):
|
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
self.soa_response_params['questions'] = [Question(Name('example.org'),
|
|
|
TEST_RRCLASS,
|
|
|
- RRType.SOA())]
|
|
|
+ RRType.SOA)]
|
|
|
self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
|
|
def test_soacheck_question_class_mismatch(self):
|
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
self.soa_response_params['questions'] = [Question(TEST_ZONE_NAME,
|
|
|
- RRClass.CH(),
|
|
|
- RRType.SOA())]
|
|
|
+ RRClass.CH,
|
|
|
+ RRType.SOA)]
|
|
|
self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
|
|
def test_soacheck_question_type_mismatch(self):
|
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
self.soa_response_params['questions'] = [Question(TEST_ZONE_NAME,
|
|
|
TEST_RRCLASS,
|
|
|
- RRType.AAAA())]
|
|
|
+ RRType.AAAA)]
|
|
|
self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
|
|
def test_soacheck_no_soa(self):
|
|
@@ -1178,8 +1178,8 @@ class TestAXFR(TestXfrinConnection):
|
|
|
|
|
|
def test_soacheck_soa_class_mismatch(self):
|
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
- soa = RRset(TEST_ZONE_NAME, RRClass.CH(), RRType.SOA(), RRTTL(0))
|
|
|
- soa.add_rdata(Rdata(RRType.SOA(), RRClass.CH(), 'm. r. 1234 0 0 0 0'))
|
|
|
+ soa = RRset(TEST_ZONE_NAME, RRClass.CH, RRType.SOA, RRTTL(0))
|
|
|
+ soa.add_rdata(Rdata(RRType.SOA, RRClass.CH, 'm. r. 1234 0 0 0 0'))
|
|
|
self.soa_response_params['answers'] = [soa]
|
|
|
self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
|
|
|
@@ -1220,7 +1220,7 @@ class TestAXFR(TestXfrinConnection):
|
|
|
self.conn._tsig_key = TSIG_KEY
|
|
|
self.conn._tsig_ctx_creator = \
|
|
|
lambda key: self.__create_mock_tsig(key, TSIGError.BAD_SIG)
|
|
|
- self.soa_response_params['rcode'] = Rcode.NOTAUTH()
|
|
|
+ self.soa_response_params['rcode'] = Rcode.NOTAUTH
|
|
|
self.conn.response_generator = self._create_soa_response_data
|
|
|
|
|
|
self.assertRaises(XfrinProtocolError, self.conn._check_soa_serial)
|
|
@@ -1257,7 +1257,7 @@ class TestAXFR(TestXfrinConnection):
|
|
|
def test_response_shutdown(self):
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
self.conn._shutdown_event.set()
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.assertRaises(XfrinException, self.conn._handle_xfrin_responses)
|
|
|
|
|
|
def test_response_timeout(self):
|
|
@@ -1272,13 +1272,13 @@ class TestAXFR(TestXfrinConnection):
|
|
|
|
|
|
def test_response_bad_message(self):
|
|
|
self.conn.response_generator = self._create_broken_response_data
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.assertRaises(Exception, self.conn._handle_xfrin_responses)
|
|
|
|
|
|
def test_axfr_response(self):
|
|
|
# A simple normal case: AXFR consists of SOA, NS, then trailing SOA.
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.conn._handle_xfrin_responses()
|
|
|
self.assertEqual(type(XfrinAXFREnd()), type(self.conn.get_xfrstate()))
|
|
|
check_diffs(self.assertEqual,
|
|
@@ -1299,7 +1299,7 @@ class TestAXFR(TestXfrinConnection):
|
|
|
self._create_ns(),
|
|
|
soa_rrset]
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.conn._handle_xfrin_responses()
|
|
|
self.assertEqual(type(XfrinAXFREnd()), type(self.conn.get_xfrstate()))
|
|
|
check_diffs(self.assertEqual,
|
|
@@ -1314,10 +1314,10 @@ class TestAXFR(TestXfrinConnection):
|
|
|
'''
|
|
|
ns_rr = self._create_ns()
|
|
|
a_rr = self._create_a('192.0.2.1')
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
- RRType.AXFR())],
|
|
|
+ RRType.AXFR)],
|
|
|
# begin serial=1230, end serial=1234. end will be used.
|
|
|
answers=[begin_soa_rrset, ns_rr, a_rr, soa_rrset])
|
|
|
self.conn._handle_xfrin_responses()
|
|
@@ -1361,10 +1361,10 @@ class TestAXFR(TestXfrinConnection):
|
|
|
'''
|
|
|
ns_rr = self._create_ns()
|
|
|
a_rr = self._create_a('192.0.2.1')
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
- RRType.AXFR())],
|
|
|
+ RRType.AXFR)],
|
|
|
answers=[soa_rrset, ns_rr, a_rr, soa_rrset, a_rr])
|
|
|
self.assertRaises(XfrinProtocolError,
|
|
|
self.conn._handle_xfrin_responses)
|
|
@@ -1378,9 +1378,9 @@ class TestAXFR(TestXfrinConnection):
|
|
|
|
|
|
'''
|
|
|
self.axfr_response_params['question_1st'] = \
|
|
|
- [Question(Name('mismatch.example'), TEST_RRCLASS, RRType.AXFR())]
|
|
|
+ [Question(Name('mismatch.example'), TEST_RRCLASS, RRType.AXFR)]
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.conn._handle_xfrin_responses()
|
|
|
self.assertEqual(type(XfrinAXFREnd()), type(self.conn.get_xfrstate()))
|
|
|
check_diffs(self.assertEqual,
|
|
@@ -1394,9 +1394,9 @@ class TestAXFR(TestXfrinConnection):
|
|
|
|
|
|
'''
|
|
|
self.axfr_response_params['question_1st'] = \
|
|
|
- [Question(TEST_ZONE_NAME, RRClass.CH(), RRType.AXFR())]
|
|
|
+ [Question(TEST_ZONE_NAME, RRClass.CH, RRType.AXFR)]
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.conn._handle_xfrin_responses()
|
|
|
self.assertEqual(type(XfrinAXFREnd()), type(self.conn.get_xfrstate()))
|
|
|
check_diffs(self.assertEqual,
|
|
@@ -1411,9 +1411,9 @@ class TestAXFR(TestXfrinConnection):
|
|
|
'''
|
|
|
# returning IXFR in question to AXFR query
|
|
|
self.axfr_response_params['question_1st'] = \
|
|
|
- [Question(TEST_ZONE_NAME, RRClass.CH(), RRType.IXFR())]
|
|
|
+ [Question(TEST_ZONE_NAME, RRClass.CH, RRType.IXFR)]
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.conn._handle_xfrin_responses()
|
|
|
self.assertEqual(type(XfrinAXFREnd()), type(self.conn.get_xfrstate()))
|
|
|
check_diffs(self.assertEqual,
|
|
@@ -1428,7 +1428,7 @@ class TestAXFR(TestXfrinConnection):
|
|
|
'''
|
|
|
self.axfr_response_params['question_1st'] = []
|
|
|
self.conn.response_generator = self._create_normal_response_data
|
|
|
- self.conn._send_query(RRType.AXFR())
|
|
|
+ self.conn._send_query(RRType.AXFR)
|
|
|
self.conn._handle_xfrin_responses()
|
|
|
self.assertEqual(type(XfrinAXFREnd()), type(self.conn.get_xfrstate()))
|
|
|
check_diffs(self.assertEqual,
|
|
@@ -1617,7 +1617,7 @@ class TestIXFRResponse(TestXfrinConnection):
|
|
|
super().setUp()
|
|
|
self.conn._query_id = self.conn.qid = 1035
|
|
|
self.conn._request_serial = isc.dns.Serial(1230)
|
|
|
- self.conn._request_type = RRType.IXFR()
|
|
|
+ self.conn._request_type = RRType.IXFR
|
|
|
self.conn._datasrc_client = MockDataSourceClient()
|
|
|
XfrinInitialSOA().set_xfrstate(self.conn, XfrinInitialSOA())
|
|
|
|
|
@@ -1631,7 +1631,7 @@ class TestIXFRResponse(TestXfrinConnection):
|
|
|
|
|
|
'''
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
- questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR())],
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR)],
|
|
|
answers=[soa_rrset, begin_soa_rrset, soa_rrset, soa_rrset])
|
|
|
self.conn._handle_xfrin_responses()
|
|
|
self.assertEqual(type(XfrinIXFREnd()), type(self.conn.get_xfrstate()))
|
|
@@ -1666,7 +1666,7 @@ class TestIXFRResponse(TestXfrinConnection):
|
|
|
|
|
|
'''
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
- questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR())],
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR)],
|
|
|
answers=[soa_rrset,
|
|
|
# removing one A in serial 1230
|
|
|
begin_soa_rrset, self._create_a('192.0.2.1'),
|
|
@@ -1706,10 +1706,10 @@ class TestIXFRResponse(TestXfrinConnection):
|
|
|
|
|
|
'''
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
- questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR())],
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR)],
|
|
|
answers=[soa_rrset, begin_soa_rrset, soa_rrset])
|
|
|
self.conn.reply_data += self.conn.create_response_data(
|
|
|
- questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR())],
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR)],
|
|
|
answers=[soa_rrset])
|
|
|
self.conn._handle_xfrin_responses()
|
|
|
self.assertEqual(type(XfrinIXFREnd()), type(self.conn.get_xfrstate()))
|
|
@@ -1720,7 +1720,7 @@ class TestIXFRResponse(TestXfrinConnection):
|
|
|
def test_ixfr_response_uptodate(self):
|
|
|
'''IXFR response indicates the zone is new enough'''
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
- questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR())],
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR)],
|
|
|
answers=[begin_soa_rrset])
|
|
|
self.assertRaises(XfrinZoneUptodate, self.conn._handle_xfrin_responses)
|
|
|
# no diffs should have been committed
|
|
@@ -1733,7 +1733,7 @@ class TestIXFRResponse(TestXfrinConnection):
|
|
|
'''
|
|
|
# SOA sequence is out-of-sync
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
- questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR())],
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR)],
|
|
|
answers=[soa_rrset, begin_soa_rrset, soa_rrset,
|
|
|
self._create_soa('1235')])
|
|
|
self.assertRaises(XfrinProtocolError,
|
|
@@ -1750,7 +1750,7 @@ class TestIXFRResponse(TestXfrinConnection):
|
|
|
specification, but it is how BIND 9 works and we do the same.
|
|
|
'''
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
- questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR())],
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR)],
|
|
|
answers=[soa_rrset, begin_soa_rrset, soa_rrset, soa_rrset,
|
|
|
self._create_a('192.0.2.1')])
|
|
|
self.assertRaises(XfrinProtocolError,
|
|
@@ -1767,7 +1767,7 @@ class TestIXFRResponse(TestXfrinConnection):
|
|
|
|
|
|
'''
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
- questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR())],
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR)],
|
|
|
answers=[begin_soa_rrset, soa_rrset])
|
|
|
self.assertRaises(XfrinProtocolError,
|
|
|
self.conn._handle_xfrin_responses)
|
|
@@ -1784,7 +1784,7 @@ class TestIXFRResponse(TestXfrinConnection):
|
|
|
ns_rr = self._create_ns()
|
|
|
a_rr = self._create_a('192.0.2.1')
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
- questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR())],
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR)],
|
|
|
answers=[soa_rrset, ns_rr, a_rr, soa_rrset])
|
|
|
self.conn._handle_xfrin_responses()
|
|
|
self.assertEqual(type(XfrinAXFREnd()), type(self.conn.get_xfrstate()))
|
|
@@ -1806,7 +1806,7 @@ class TestIXFRResponse(TestXfrinConnection):
|
|
|
ns_rr = self._create_ns()
|
|
|
a_rr = self._create_a('192.0.2.1')
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
- questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR())],
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR)],
|
|
|
answers=[soa_rrset, ns_rr, a_rr, begin_soa_rrset])
|
|
|
self.conn._handle_xfrin_responses()
|
|
|
self.assertEqual(type(XfrinAXFREnd()), type(self.conn.get_xfrstate()))
|
|
@@ -1825,7 +1825,7 @@ class TestIXFRResponse(TestXfrinConnection):
|
|
|
ns_rr = self._create_ns()
|
|
|
a_rr = self._create_a('192.0.2.1')
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
- questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR())],
|
|
|
+ questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS, RRType.IXFR)],
|
|
|
answers=[soa_rrset, ns_rr, a_rr, soa_rrset, a_rr])
|
|
|
self.assertRaises(XfrinProtocolError,
|
|
|
self.conn._handle_xfrin_responses)
|
|
@@ -1852,10 +1852,10 @@ class TestIXFRSession(TestXfrinConnection):
|
|
|
def create_ixfr_response():
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
- RRType.IXFR())],
|
|
|
+ RRType.IXFR)],
|
|
|
answers=[soa_rrset, begin_soa_rrset, soa_rrset, soa_rrset])
|
|
|
self.conn.response_generator = create_ixfr_response
|
|
|
- self.assertEqual(XFRIN_OK, self.conn.do_xfrin(False, RRType.IXFR()))
|
|
|
+ self.assertEqual(XFRIN_OK, self.conn.do_xfrin(False, RRType.IXFR))
|
|
|
|
|
|
# Check some details of the IXFR protocol processing
|
|
|
self.assertEqual(type(XfrinIXFREnd()), type(self.conn.get_xfrstate()))
|
|
@@ -1869,7 +1869,7 @@ class TestIXFRSession(TestXfrinConnection):
|
|
|
qmsg.from_wire(qdata, len(qdata))
|
|
|
self.assertEqual(1, qmsg.get_rr_count(Message.SECTION_QUESTION))
|
|
|
self.assertEqual(TEST_ZONE_NAME, qmsg.get_question()[0].get_name())
|
|
|
- self.assertEqual(RRType.IXFR(), qmsg.get_question()[0].get_type())
|
|
|
+ self.assertEqual(RRType.IXFR, qmsg.get_question()[0].get_type())
|
|
|
|
|
|
self.assertEqual(1, self.conn._transfer_stats.message_count)
|
|
|
self.assertEqual(0, self.conn._transfer_stats.axfr_rr_count)
|
|
@@ -1886,18 +1886,18 @@ class TestIXFRSession(TestXfrinConnection):
|
|
|
def create_ixfr_response():
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
- RRType.IXFR())],
|
|
|
+ RRType.IXFR)],
|
|
|
answers=[soa_rrset, begin_soa_rrset, soa_rrset,
|
|
|
self._create_soa('1235')])
|
|
|
self.conn.response_generator = create_ixfr_response
|
|
|
- self.assertEqual(XFRIN_FAIL, self.conn.do_xfrin(False, RRType.IXFR()))
|
|
|
+ self.assertEqual(XFRIN_FAIL, self.conn.do_xfrin(False, RRType.IXFR))
|
|
|
|
|
|
def test_do_xfrin_fail2(self):
|
|
|
'''IXFR fails due to a bogus DNS message.
|
|
|
|
|
|
'''
|
|
|
self._create_broken_response_data()
|
|
|
- self.assertEqual(XFRIN_FAIL, self.conn.do_xfrin(False, RRType.IXFR()))
|
|
|
+ self.assertEqual(XFRIN_FAIL, self.conn.do_xfrin(False, RRType.IXFR))
|
|
|
|
|
|
def test_do_xfrin_uptodate(self):
|
|
|
'''IXFR is (gracefully) aborted because serial is not new
|
|
@@ -1906,10 +1906,10 @@ class TestIXFRSession(TestXfrinConnection):
|
|
|
def create_response():
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
- RRType.IXFR())],
|
|
|
+ RRType.IXFR)],
|
|
|
answers=[begin_soa_rrset])
|
|
|
self.conn.response_generator = create_response
|
|
|
- self.assertEqual(XFRIN_OK, self.conn.do_xfrin(False, RRType.IXFR()))
|
|
|
+ self.assertEqual(XFRIN_OK, self.conn.do_xfrin(False, RRType.IXFR))
|
|
|
|
|
|
self.assertEqual(1, self.conn._transfer_stats.message_count)
|
|
|
self.assertEqual(0, self.conn._transfer_stats.axfr_rr_count)
|
|
@@ -1956,7 +1956,7 @@ class TestXFRSessionWithSQLite3(TestXfrinConnection):
|
|
|
def get_zone_serial(self):
|
|
|
result, finder = self.conn._datasrc_client.find_zone(TEST_ZONE_NAME)
|
|
|
self.assertEqual(DataSourceClient.SUCCESS, result)
|
|
|
- result, soa, _ = finder.find(TEST_ZONE_NAME, RRType.SOA())
|
|
|
+ result, soa, _ = finder.find(TEST_ZONE_NAME, RRType.SOA)
|
|
|
self.assertEqual(ZoneFinder.SUCCESS, result)
|
|
|
self.assertEqual(1, soa.get_rdata_count())
|
|
|
return get_soa_serial(soa.get_rdata()[0])
|
|
@@ -1971,13 +1971,13 @@ class TestXFRSessionWithSQLite3(TestXfrinConnection):
|
|
|
def create_ixfr_response():
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
- RRType.IXFR())],
|
|
|
+ RRType.IXFR)],
|
|
|
answers=[soa_rrset, begin_soa_rrset, soa_rrset, soa_rrset])
|
|
|
self.conn.response_generator = create_ixfr_response
|
|
|
|
|
|
# Confirm xfrin succeeds and SOA is updated
|
|
|
self.assertEqual(1230, self.get_zone_serial().get_value())
|
|
|
- self.assertEqual(XFRIN_OK, self.conn.do_xfrin(False, RRType.IXFR()))
|
|
|
+ self.assertEqual(XFRIN_OK, self.conn.do_xfrin(False, RRType.IXFR))
|
|
|
self.assertEqual(1234, self.get_zone_serial().get_value())
|
|
|
|
|
|
# Also confirm the corresponding diffs are stored in the diffs table
|
|
@@ -2002,18 +2002,18 @@ class TestXFRSessionWithSQLite3(TestXfrinConnection):
|
|
|
def create_ixfr_response():
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
- RRType.IXFR())],
|
|
|
+ RRType.IXFR)],
|
|
|
answers=[soa_rrset, begin_soa_rrset, soa_rrset,
|
|
|
self._create_soa('1235')])
|
|
|
self.conn.response_generator = create_ixfr_response
|
|
|
|
|
|
self.assertEqual(1230, self.get_zone_serial().get_value())
|
|
|
- self.assertEqual(XFRIN_FAIL, self.conn.do_xfrin(False, RRType.IXFR()))
|
|
|
+ self.assertEqual(XFRIN_FAIL, self.conn.do_xfrin(False, RRType.IXFR))
|
|
|
self.assertEqual(1230, self.get_zone_serial().get_value())
|
|
|
|
|
|
def test_do_ixfrin_nozone_sqlite3(self):
|
|
|
self._set_test_zone(Name('nosuchzone.example'))
|
|
|
- self.assertEqual(XFRIN_FAIL, self.conn.do_xfrin(False, RRType.IXFR()))
|
|
|
+ self.assertEqual(XFRIN_FAIL, self.conn.do_xfrin(False, RRType.IXFR))
|
|
|
# This should fail even before starting state transition
|
|
|
self.assertEqual(None, self.conn.get_xfrstate())
|
|
|
|
|
@@ -2030,23 +2030,23 @@ class TestXFRSessionWithSQLite3(TestXfrinConnection):
|
|
|
# Confirm xfrin succeeds and SOA is updated, A RR is deleted.
|
|
|
self.assertEqual(1230, self.get_zone_serial().get_value())
|
|
|
self.assertTrue(self.record_exist(Name('dns01.example.com'),
|
|
|
- RRType.A()))
|
|
|
+ RRType.A))
|
|
|
self.assertEqual(XFRIN_OK, self.conn.do_xfrin(False, type))
|
|
|
self.assertEqual(1234, self.get_zone_serial().get_value())
|
|
|
self.assertFalse(self.record_exist(Name('dns01.example.com'),
|
|
|
- RRType.A()))
|
|
|
+ RRType.A))
|
|
|
|
|
|
def test_do_ixfrin_axfr_sqlite3(self):
|
|
|
'''AXFR-style IXFR.
|
|
|
|
|
|
'''
|
|
|
- self.axfr_check(RRType.IXFR())
|
|
|
+ self.axfr_check(RRType.IXFR)
|
|
|
|
|
|
def test_do_axfrin_sqlite3(self):
|
|
|
'''AXFR.
|
|
|
|
|
|
'''
|
|
|
- self.axfr_check(RRType.AXFR())
|
|
|
+ self.axfr_check(RRType.AXFR)
|
|
|
|
|
|
def axfr_failure_check(self, type):
|
|
|
'''Similar to the previous two tests, but xfrin fails due to error.
|
|
@@ -2062,23 +2062,23 @@ class TestXFRSessionWithSQLite3(TestXfrinConnection):
|
|
|
|
|
|
self.assertEqual(1230, self.get_zone_serial().get_value())
|
|
|
self.assertTrue(self.record_exist(Name('dns01.example.com'),
|
|
|
- RRType.A()))
|
|
|
+ RRType.A))
|
|
|
self.assertEqual(XFRIN_FAIL, self.conn.do_xfrin(False, type))
|
|
|
self.assertEqual(1230, self.get_zone_serial().get_value())
|
|
|
self.assertTrue(self.record_exist(Name('dns01.example.com'),
|
|
|
- RRType.A()))
|
|
|
+ RRType.A))
|
|
|
|
|
|
def test_do_xfrin_axfr_sqlite3_fail(self):
|
|
|
'''Failure case for AXFR-style IXFR.
|
|
|
|
|
|
'''
|
|
|
- self.axfr_failure_check(RRType.IXFR())
|
|
|
+ self.axfr_failure_check(RRType.IXFR)
|
|
|
|
|
|
def test_do_axfrin_sqlite3_fail(self):
|
|
|
'''Failure case for AXFR.
|
|
|
|
|
|
'''
|
|
|
- self.axfr_failure_check(RRType.AXFR())
|
|
|
+ self.axfr_failure_check(RRType.AXFR)
|
|
|
|
|
|
def test_do_axfrin_nozone_sqlite3(self):
|
|
|
'''AXFR test with an empty SQLite3 DB file, thus no target zone there.
|
|
@@ -2095,16 +2095,16 @@ class TestXFRSessionWithSQLite3(TestXfrinConnection):
|
|
|
def create_response():
|
|
|
self.conn.reply_data = self.conn.create_response_data(
|
|
|
questions=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
|
|
|
- RRType.AXFR())],
|
|
|
+ RRType.AXFR)],
|
|
|
answers=[soa_rrset, self._create_ns(), soa_rrset])
|
|
|
self.conn.response_generator = create_response
|
|
|
self._set_test_zone(Name('example.com'))
|
|
|
- self.assertEqual(XFRIN_OK, self.conn.do_xfrin(False, RRType.AXFR()))
|
|
|
+ self.assertEqual(XFRIN_OK, self.conn.do_xfrin(False, RRType.AXFR))
|
|
|
self.assertEqual(type(XfrinAXFREnd()),
|
|
|
type(self.conn.get_xfrstate()))
|
|
|
self.assertEqual(1234, self.get_zone_serial().get_value())
|
|
|
self.assertFalse(self.record_exist(Name('dns01.example.com'),
|
|
|
- RRType.A()))
|
|
|
+ RRType.A))
|
|
|
|
|
|
class TestXfrinRecorder(unittest.TestCase):
|
|
|
def setUp(self):
|
|
@@ -2213,7 +2213,7 @@ class TestXfrinProcess(unittest.TestCase):
|
|
|
# Normal, successful case. We only check that things are cleaned up
|
|
|
# at the tearDown time.
|
|
|
process_xfrin(self, self, TEST_ZONE_NAME, TEST_RRCLASS, None, None,
|
|
|
- self.master, False, None, RRType.AXFR(),
|
|
|
+ self.master, False, None, RRType.AXFR,
|
|
|
self.create_xfrinconn)
|
|
|
|
|
|
def test_process_xfrin_exception_on_connect(self):
|
|
@@ -2221,7 +2221,7 @@ class TestXfrinProcess(unittest.TestCase):
|
|
|
# cleaned up.
|
|
|
self.do_raise_on_connect = True
|
|
|
process_xfrin(self, self, TEST_ZONE_NAME, TEST_RRCLASS, None, None,
|
|
|
- self.master, False, None, RRType.AXFR(),
|
|
|
+ self.master, False, None, RRType.AXFR,
|
|
|
self.create_xfrinconn)
|
|
|
|
|
|
def test_process_xfrin_exception_on_close(self):
|
|
@@ -2231,7 +2231,7 @@ class TestXfrinProcess(unittest.TestCase):
|
|
|
self.do_raise_on_connect = True
|
|
|
self.do_raise_on_close = True
|
|
|
process_xfrin(self, self, TEST_ZONE_NAME, TEST_RRCLASS, None, None,
|
|
|
- self.master, False, None, RRType.AXFR(),
|
|
|
+ self.master, False, None, RRType.AXFR,
|
|
|
self.create_xfrinconn)
|
|
|
|
|
|
def test_process_xfrin_exception_on_publish(self):
|
|
@@ -2239,7 +2239,7 @@ class TestXfrinProcess(unittest.TestCase):
|
|
|
# everything must still be cleaned up.
|
|
|
self.do_raise_on_publish = True
|
|
|
process_xfrin(self, self, TEST_ZONE_NAME, TEST_RRCLASS, None, None,
|
|
|
- self.master, False, None, RRType.AXFR(),
|
|
|
+ self.master, False, None, RRType.AXFR,
|
|
|
self.create_xfrinconn)
|
|
|
|
|
|
class TestXfrin(unittest.TestCase):
|
|
@@ -2292,7 +2292,7 @@ class TestXfrin(unittest.TestCase):
|
|
|
|
|
|
def test_parse_cmd_params_chclass(self):
|
|
|
self.args['zone_class'] = 'CH'
|
|
|
- self.assertEqual(self._do_parse_zone_name_class()[1], RRClass.CH())
|
|
|
+ self.assertEqual(self._do_parse_zone_name_class()[1], RRClass.CH)
|
|
|
|
|
|
def test_parse_cmd_params_bogusclass(self):
|
|
|
self.args['zone_class'] = 'XXX'
|
|
@@ -2339,7 +2339,7 @@ class TestXfrin(unittest.TestCase):
|
|
|
self.assertEqual(self.args['master'], self.xfr.xfrin_started_master_addr)
|
|
|
self.assertEqual(int(self.args['port']), self.xfr.xfrin_started_master_port)
|
|
|
# By default we use AXFR (for now)
|
|
|
- self.assertEqual(RRType.AXFR(), self.xfr.xfrin_started_request_type)
|
|
|
+ self.assertEqual(RRType.AXFR, self.xfr.xfrin_started_request_type)
|
|
|
|
|
|
def test_command_handler_retransfer_short_command1(self):
|
|
|
# try it when only specifying the zone name (of unknown zone)
|
|
@@ -2453,7 +2453,7 @@ class TestXfrin(unittest.TestCase):
|
|
|
self.assertEqual(int(TEST_MASTER_PORT),
|
|
|
self.xfr.xfrin_started_master_port)
|
|
|
# By default we use AXFR (for now)
|
|
|
- self.assertEqual(RRType.AXFR(), self.xfr.xfrin_started_request_type)
|
|
|
+ self.assertEqual(RRType.AXFR, self.xfr.xfrin_started_request_type)
|
|
|
|
|
|
def test_command_handler_notify(self):
|
|
|
# at this level, refresh is no different than retransfer.
|
|
@@ -2520,7 +2520,7 @@ class TestXfrin(unittest.TestCase):
|
|
|
self.xfr._max_transfers_in)
|
|
|
for zone_config in config_given['zones']:
|
|
|
zone_name = zone_config['name']
|
|
|
- zone_info = self.xfr._get_zone_info(Name(zone_name), RRClass.IN())
|
|
|
+ zone_info = self.xfr._get_zone_info(Name(zone_name), RRClass.IN)
|
|
|
self.assertEqual(str(zone_info.master_addr), zone_config['master_addr'])
|
|
|
self.assertEqual(zone_info.master_port, zone_config['master_port'])
|
|
|
if 'tsig_key' in zone_config:
|
|
@@ -2695,16 +2695,16 @@ class TestXfrin(unittest.TestCase):
|
|
|
def test_command_handler_retransfer_ixfr_enabled(self):
|
|
|
# If IXFR is explicitly enabled in config, IXFR will be used
|
|
|
self.common_ixfr_setup('retransfer', True)
|
|
|
- self.assertEqual(RRType.IXFR(), self.xfr.xfrin_started_request_type)
|
|
|
+ self.assertEqual(RRType.IXFR, self.xfr.xfrin_started_request_type)
|
|
|
|
|
|
def test_command_handler_refresh_ixfr_enabled(self):
|
|
|
# Same for refresh
|
|
|
self.common_ixfr_setup('refresh', True)
|
|
|
- self.assertEqual(RRType.IXFR(), self.xfr.xfrin_started_request_type)
|
|
|
+ self.assertEqual(RRType.IXFR, self.xfr.xfrin_started_request_type)
|
|
|
|
|
|
def test_command_handler_retransfer_with_tsig(self):
|
|
|
self.common_ixfr_setup('retransfer', False, 'example.com.key')
|
|
|
- self.assertEqual(RRType.AXFR(), self.xfr.xfrin_started_request_type)
|
|
|
+ self.assertEqual(RRType.AXFR, self.xfr.xfrin_started_request_type)
|
|
|
|
|
|
def test_command_handler_retransfer_with_tsig_bad_key(self):
|
|
|
# bad keys should not reach xfrin, but should they somehow,
|
|
@@ -2718,7 +2718,7 @@ class TestXfrin(unittest.TestCase):
|
|
|
|
|
|
def test_command_handler_refresh_with_tsig(self):
|
|
|
self.common_ixfr_setup('refresh', False, 'example.com.key')
|
|
|
- self.assertEqual(RRType.AXFR(), self.xfr.xfrin_started_request_type)
|
|
|
+ self.assertEqual(RRType.AXFR, self.xfr.xfrin_started_request_type)
|
|
|
|
|
|
def test_command_handler_refresh_with_tsig_bad_key(self):
|
|
|
# bad keys should not reach xfrin, but should they somehow,
|
|
@@ -2734,12 +2734,12 @@ class TestXfrin(unittest.TestCase):
|
|
|
# Similar to the previous case, but explicitly disabled. AXFR should
|
|
|
# be used.
|
|
|
self.common_ixfr_setup('retransfer', False)
|
|
|
- self.assertEqual(RRType.AXFR(), self.xfr.xfrin_started_request_type)
|
|
|
+ self.assertEqual(RRType.AXFR, self.xfr.xfrin_started_request_type)
|
|
|
|
|
|
def test_command_handler_refresh_ixfr_disabled(self):
|
|
|
# Same for refresh
|
|
|
self.common_ixfr_setup('refresh', False)
|
|
|
- self.assertEqual(RRType.AXFR(), self.xfr.xfrin_started_request_type)
|
|
|
+ self.assertEqual(RRType.AXFR, self.xfr.xfrin_started_request_type)
|
|
|
|
|
|
class TestXfrinMemoryZones(unittest.TestCase):
|
|
|
def setUp(self):
|
|
@@ -3015,7 +3015,7 @@ class TestXfrinProcess(unittest.TestCase):
|
|
|
self.__rets = rets
|
|
|
published = rets[-1]
|
|
|
xfrin.process_xfrin(self, XfrinRecorder(), Name("example.org."),
|
|
|
- RRClass.IN(), None, None, None, True, None,
|
|
|
+ RRClass.IN, None, None, None, True, None,
|
|
|
request_type, self.__get_connection)
|
|
|
self.assertEqual([], self.__rets)
|
|
|
self.assertEqual(transfers, self.__transfers)
|
|
@@ -3027,7 +3027,7 @@ class TestXfrinProcess(unittest.TestCase):
|
|
|
"""
|
|
|
Everything OK the first time, over IXFR.
|
|
|
"""
|
|
|
- self.__do_test([XFRIN_OK], [RRType.IXFR()], RRType.IXFR())
|
|
|
+ self.__do_test([XFRIN_OK], [RRType.IXFR], RRType.IXFR)
|
|
|
# Check there was loadzone command
|
|
|
self.assertTrue(self._send_cc_session.send_called)
|
|
|
self.assertTrue(self._send_cc_session.send_called_correctly)
|
|
@@ -3038,7 +3038,7 @@ class TestXfrinProcess(unittest.TestCase):
|
|
|
"""
|
|
|
Everything OK the first time, over AXFR.
|
|
|
"""
|
|
|
- self.__do_test([XFRIN_OK], [RRType.AXFR()], RRType.AXFR())
|
|
|
+ self.__do_test([XFRIN_OK], [RRType.AXFR], RRType.AXFR)
|
|
|
|
|
|
def test_axfr_fail(self):
|
|
|
"""
|
|
@@ -3046,15 +3046,15 @@ class TestXfrinProcess(unittest.TestCase):
|
|
|
to fail on AXFR, but succeed on IXFR and we didn't use IXFR in the first
|
|
|
place for some reason.
|
|
|
"""
|
|
|
- self.__do_test([XFRIN_FAIL], [RRType.AXFR()], RRType.AXFR())
|
|
|
+ self.__do_test([XFRIN_FAIL], [RRType.AXFR], RRType.AXFR)
|
|
|
|
|
|
def test_ixfr_fallback(self):
|
|
|
"""
|
|
|
The transfer fails over IXFR, but suceeds over AXFR. It should fall back
|
|
|
to it and say everything is OK.
|
|
|
"""
|
|
|
- self.__do_test([XFRIN_FAIL, XFRIN_OK], [RRType.IXFR(), RRType.AXFR()],
|
|
|
- RRType.IXFR())
|
|
|
+ self.__do_test([XFRIN_FAIL, XFRIN_OK], [RRType.IXFR, RRType.AXFR],
|
|
|
+ RRType.IXFR)
|
|
|
|
|
|
def test_ixfr_fail(self):
|
|
|
"""
|
|
@@ -3062,13 +3062,13 @@ class TestXfrinProcess(unittest.TestCase):
|
|
|
(only once) and should try both before giving up.
|
|
|
"""
|
|
|
self.__do_test([XFRIN_FAIL, XFRIN_FAIL],
|
|
|
- [RRType.IXFR(), RRType.AXFR()], RRType.IXFR())
|
|
|
+ [RRType.IXFR, RRType.AXFR], RRType.IXFR)
|
|
|
|
|
|
def test_send_loadzone(self):
|
|
|
"""
|
|
|
Check the loadzone command is sent after successful transfer.
|
|
|
"""
|
|
|
- self.__do_test([XFRIN_OK], [RRType.IXFR()], RRType.IXFR())
|
|
|
+ self.__do_test([XFRIN_OK], [RRType.IXFR], RRType.IXFR)
|
|
|
self.assertTrue(self._send_cc_session.send_called)
|
|
|
self.assertTrue(self._send_cc_session.send_called_correctly)
|
|
|
self.assertTrue(self._send_cc_session.recv_called)
|