123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212 |
- # Copyright (C) 2012 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- import os
- import shutil
- import isc.log
- import unittest
- from isc.dns import *
- from isc.datasrc import DataSourceClient
- from isc.ddns.session import *
- from isc.ddns.zone_config import *
- # Some common test parameters
- TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
- READ_ZONE_DB_FILE = TESTDATA_PATH + "rwtest.sqlite3" # original, to be copied
- TESTDATA_WRITE_PATH = os.environ['TESTDATA_WRITE_PATH'] + os.sep
- WRITE_ZONE_DB_FILE = TESTDATA_WRITE_PATH + "rwtest.sqlite3.copied"
- WRITE_ZONE_DB_CONFIG = "{ \"database_file\": \"" + WRITE_ZONE_DB_FILE + "\"}"
- TEST_ZONE_NAME = Name('example.org')
- UPDATE_RRTYPE = RRType.SOA()
- TEST_RRCLASS = RRClass.IN()
- TEST_ZONE_RECORD = Question(TEST_ZONE_NAME, TEST_RRCLASS, UPDATE_RRTYPE)
- TEST_CLIENT6 = ('2001:db8::1', 53, 0, 0)
- TEST_CLIENT4 = ('192.0.2.1', 53)
- def create_update_msg(zones=[TEST_ZONE_RECORD], prerequisites=[],
- updates=[]):
- msg = Message(Message.RENDER)
- msg.set_qid(5353) # arbitrary chosen
- msg.set_opcode(Opcode.UPDATE())
- msg.set_rcode(Rcode.NOERROR())
- for z in zones:
- msg.add_question(z)
- for p in prerequisites:
- msg.add_rrset(SECTION_PREREQUISITE, p)
- for u in updates:
- msg.add_rrset(SECTION_UPDATE, u)
- renderer = MessageRenderer()
- msg.to_wire(renderer)
- # re-read the created data in the parse mode
- msg.clear(Message.PARSE)
- msg.from_wire(renderer.get_data(), Message.PRESERVE_ORDER)
- return renderer.get_data(), msg
- def add_rdata(rrset, rdata):
- '''
- Helper function for easily adding Rdata fields to RRsets.
- This function assumes the given rdata is of type string or bytes,
- and corresponds to the given rrset
- '''
- rrset.add_rdata(isc.dns.Rdata(rrset.get_type(),
- rrset.get_class(),
- rdata))
- def create_rrset(name, rrclass, rrtype, ttl, rdatas = []):
- '''
- Helper method to easily create RRsets, auto-converts
- name, rrclass, rrtype, and ttl (if possibly through their
- respective constructors)
- rdatas is a list of rr data strings, or bytestrings, which
- should match the RRType of the rrset to create
- '''
- if type(name) != Name:
- name = Name(name)
- if type(rrclass) != RRClass:
- rrclass = RRClass(rrclass)
- if type(rrtype) != RRType:
- rrtype = RRType(rrtype)
- if type(ttl) != RRTTL:
- ttl = RRTTL(ttl)
- rrset = isc.dns.RRset(name, rrclass, rrtype, ttl)
- for rdata in rdatas:
- add_rdata(rrset, rdata)
- return rrset
- class SessionTest(unittest.TestCase):
- '''Session tests'''
- def setUp(self):
- shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
- self.__datasrc_client = DataSourceClient("sqlite3",
- WRITE_ZONE_DB_CONFIG)
- self.__update_msgdata, self.__update_msg = create_update_msg()
- self.__session = UpdateSession(self.__update_msg,
- self.__update_msgdata, TEST_CLIENT4,
- ZoneConfig([], TEST_RRCLASS,
- self.__datasrc_client))
- self.__session._UpdateSession__get_update_zone()
- def check_response(self, msg, expected_rcode):
- '''Perform common checks on update resposne message.'''
- self.assertTrue(msg.get_header_flag(Message.HEADERFLAG_QR))
- # note: we convert opcode to text it'd be more helpful on failure.
- self.assertEqual(Opcode.UPDATE().to_text(), msg.get_opcode().to_text())
- self.assertEqual(expected_rcode.to_text(), msg.get_rcode().to_text())
- # All sections should be cleared
- self.assertEqual(0, msg.get_rr_count(SECTION_ZONE))
- self.assertEqual(0, msg.get_rr_count(SECTION_PREREQUISITE))
- self.assertEqual(0, msg.get_rr_count(SECTION_UPDATE))
- self.assertEqual(0, msg.get_rr_count(Message.SECTION_ADDITIONAL))
- def test_handle(self):
- '''Basic update case'''
- result, zname, zclass = self.__session.handle()
- self.assertEqual(UPDATE_SUCCESS, result)
- self.assertEqual(TEST_ZONE_NAME, zname)
- self.assertEqual(TEST_RRCLASS, zclass)
- # Just checking these are different from the success code.
- self.assertNotEqual(UPDATE_ERROR, result)
- self.assertNotEqual(UPDATE_DROP, result)
- def test_broken_request(self):
- # Zone section is empty
- msg_data, msg = create_update_msg(zones=[])
- session = UpdateSession(msg, msg_data, TEST_CLIENT6, None)
- result, zname, zclass = session.handle()
- self.assertEqual(UPDATE_ERROR, result)
- self.assertEqual(None, zname)
- self.assertEqual(None, zclass)
- self.check_response(session.get_message(), Rcode.FORMERR())
- # Zone section contains multiple records
- msg_data, msg = create_update_msg(zones=[TEST_ZONE_RECORD,
- TEST_ZONE_RECORD])
- session = UpdateSession(msg, msg_data, TEST_CLIENT4, None)
- self.assertEqual(UPDATE_ERROR, session.handle()[0])
- self.check_response(session.get_message(), Rcode.FORMERR())
- # Zone section's type is not SOA
- msg_data, msg = create_update_msg(zones=[Question(TEST_ZONE_NAME,
- TEST_RRCLASS,
- RRType.A())])
- session = UpdateSession(msg, msg_data, TEST_CLIENT4, None)
- self.assertEqual(UPDATE_ERROR, session.handle()[0])
- self.check_response(session.get_message(), Rcode.FORMERR())
- def test_update_secondary(self):
- # specified zone is configured as a secondary. Since this
- # implementation doesn't support update forwarding, the result
- # should be NOTIMP.
- msg_data, msg = create_update_msg(zones=[Question(TEST_ZONE_NAME,
- TEST_RRCLASS,
- RRType.SOA())])
- session = UpdateSession(msg, msg_data, TEST_CLIENT4,
- ZoneConfig([(TEST_ZONE_NAME, TEST_RRCLASS)],
- TEST_RRCLASS,
- self.__datasrc_client))
- self.assertEqual(UPDATE_ERROR, session.handle()[0])
- self.check_response(session.get_message(), Rcode.NOTIMP())
- def check_notauth(self, zname, zclass=TEST_RRCLASS):
- '''Common test sequence for the 'notauth' test'''
- msg_data, msg = create_update_msg(zones=[Question(zname, zclass,
- RRType.SOA())])
- session = UpdateSession(msg, msg_data, TEST_CLIENT4,
- ZoneConfig([(TEST_ZONE_NAME, TEST_RRCLASS)],
- TEST_RRCLASS,
- self.__datasrc_client))
- self.assertEqual(UPDATE_ERROR, session.handle()[0])
- self.check_response(session.get_message(), Rcode.NOTAUTH())
- def test_update_notauth(self):
- '''Update attempt for non authoritative zones'''
- # zone name doesn't match
- self.check_notauth(Name('example.com'))
- # zone name is a subdomain of the actual authoritative zone
- # (match must be exact)
- self.check_notauth(Name('sub.example.org'))
- # zone class doesn't match
- self.check_notauth(Name('example.org'), RRClass.CH())
- def test_foreach_rr_in_rrset(self):
- rrset = create_rrset("www.example.org", TEST_RRCLASS,
- RRType.A(), 3600, [ "192.0.2.1" ])
- l = []
- for rr in foreach_rr(rrset):
- l.append(str(rr))
- self.assertEqual(["www.example.org. 3600 IN A 192.0.2.1\n"], l)
- add_rdata(rrset, "192.0.2.2")
- add_rdata(rrset, "192.0.2.3")
- # but through the generator, there should be several 1-line entries
- l = []
- for rr in foreach_rr(rrset):
- l.append(str(rr))
- self.assertEqual(["www.example.org. 3600 IN A 192.0.2.1\n",
- "www.example.org. 3600 IN A 192.0.2.2\n",
- "www.example.org. 3600 IN A 192.0.2.3\n",
- ], l)
- def test_convert_rrset_class(self):
- # Converting an RRSET to a different class should work
- # if the rdata types can be converted
- rrset = create_rrset("www.example.org", RRClass.NONE(), RRType.A(),
- 3600, [ b'\xc0\x00\x02\x01', b'\xc0\x00\x02\x02'])
- rrset2 = convert_rrset_class(rrset, RRClass.IN())
- self.assertEqual("www.example.org. 3600 IN A 192.0.2.1\n" +
- "www.example.org. 3600 IN A 192.0.2.2\n",
- str(rrset2))
- rrset3 = convert_rrset_class(rrset2, RRClass.NONE())
- self.assertEqual("www.example.org. 3600 CLASS254 A \\# 4 " +
- "c0000201\nwww.example.org. 3600 CLASS254 " +
- "A \\# 4 c0000202\n",
- str(rrset3))
- # depending on what type of bad data is given, a number
- # of different exceptions could be raised (TODO: i recall
- # there was a ticket about making a better hierarchy for
- # dns/parsing related exceptions)
- self.assertRaises(InvalidRdataLength, convert_rrset_class,
- rrset, RRClass.CH())
- add_rdata(rrset, b'\xc0\x00')
- self.assertRaises(DNSMessageFORMERR, convert_rrset_class,
- rrset, RRClass.IN())
- def test_collect_rrsets(self):
- '''
- Tests the 'rrset collector' method, which collects rrsets
- with the same name and type
- '''
- collected = []
- collect_rrsets(collected, create_rrset("a.example.org", RRClass.IN(),
- RRType.A(), 0, [ "192.0.2.1" ]))
- # Same name and class, different type
- collect_rrsets(collected, create_rrset("a.example.org", RRClass.IN(),
- RRType.TXT(), 0, [ "one" ]))
- collect_rrsets(collected, create_rrset("a.example.org", RRClass.IN(),
- RRType.A(), 0, [ "192.0.2.2" ]))
- collect_rrsets(collected, create_rrset("a.example.org", RRClass.IN(),
- RRType.TXT(), 0, [ "two" ]))
- # Same class and type as an existing one, different name
- collect_rrsets(collected, create_rrset("b.example.org", RRClass.IN(),
- RRType.A(), 0, [ "192.0.2.3" ]))
- # Same name and type as an existing one, different class
- collect_rrsets(collected, create_rrset("a.example.org", RRClass.CH(),
- RRType.TXT(), 0, [ "one" ]))
- collect_rrsets(collected, create_rrset("b.example.org", RRClass.IN(),
- RRType.A(), 0, [ "192.0.2.4" ]))
- collect_rrsets(collected, create_rrset("a.example.org", RRClass.CH(),
- RRType.TXT(), 0, [ "two" ]))
- strings = [ rrset.to_text() for rrset in collected ]
- # note + vs , in this list
- expected = ['a.example.org. 0 IN A 192.0.2.1\n' +
- 'a.example.org. 0 IN A 192.0.2.2\n',
- 'a.example.org. 0 IN TXT "one"\n' +
- 'a.example.org. 0 IN TXT "two"\n',
- 'b.example.org. 0 IN A 192.0.2.3\n' +
- 'b.example.org. 0 IN A 192.0.2.4\n',
- 'a.example.org. 0 CH TXT "one"\n' +
- 'a.example.org. 0 CH TXT "two"\n']
- self.assertEqual(expected, strings)
- def __prereq_helper(self, method, expected, rrset):
- '''Calls the given method with self.__datasrc_client
- and the given rrset, and compares the return value.
- Function does not do much but makes the code look nicer'''
- self.assertEqual(expected, method(rrset))
- def __check_prerequisite_exists_combined(self, method, rrclass, expected):
- '''shared code for the checks for the very similar (but reversed
- in behaviour) methods __prereq_rrset_exists and
- __prereq_rrset_does_not_exist.
- For rrset_exists, rrclass should be ANY, for rrset_does_not_exist,
- it should be NONE.
- '''
- # Basic existence checks
- # www.example.org should have an A, but not an MX
- rrset = create_rrset("www.example.org", rrclass, RRType.A(), 0)
- self.__prereq_helper(method, expected, rrset)
- rrset = create_rrset("www.example.org", rrclass, RRType.MX(), 0)
- self.__prereq_helper(method, not expected, rrset)
- # example.org should have an MX, but not an A
- rrset = create_rrset("example.org", rrclass, RRType.MX(), 0)
- self.__prereq_helper(method, expected, rrset)
- rrset = create_rrset("example.org", rrclass, RRType.A(), 0)
- self.__prereq_helper(method, not expected, rrset)
- # Also check the case where the name does not even exist
- rrset = create_rrset("doesnotexist.example.org", rrclass, RRType.A(), 0)
- self.__prereq_helper(method, not expected, rrset)
- # Wildcard expansion should not be applied, but literal matches
- # should work
- rrset = create_rrset("foo.wildcard.example.org", rrclass, RRType.A(), 0)
- self.__prereq_helper(method, not expected, rrset)
- rrset = create_rrset("*.wildcard.example.org", rrclass, RRType.A(), 0)
- self.__prereq_helper(method, expected, rrset)
- # Likewise, CNAME directly should match, but what it points to should
- # not
- rrset = create_rrset("cname.example.org", rrclass, RRType.A(), 0)
- self.__prereq_helper(method, not expected, rrset)
- rrset = create_rrset("cname.example.org", rrclass, RRType.CNAME(), 0)
- self.__prereq_helper(method, expected, rrset)
- # And also make sure a delegation (itself) is not treated as existing
- # data
- rrset = create_rrset("foo.sub.example.org", rrclass, RRType.A(), 0)
- self.__prereq_helper(method, not expected, rrset)
- # But the delegation data itself should match
- rrset = create_rrset("sub.example.org", rrclass, RRType.NS(), 0)
- self.__prereq_helper(method, expected, rrset)
- # As should glue
- rrset = create_rrset("ns.sub.example.org", rrclass, RRType.A(), 0)
- self.__prereq_helper(method, expected, rrset)
- def test_check_prerequisite_exists(self):
- method = self.__session._UpdateSession__prereq_rrset_exists
- self.__check_prerequisite_exists_combined(method,
- RRClass.ANY(),
- True)
- def test_check_prerequisite_does_not_exist(self):
- method = self.__session._UpdateSession__prereq_rrset_does_not_exist
- self.__check_prerequisite_exists_combined(method,
- RRClass.NONE(),
- False)
- def test_check_prerequisite_exists_value(self):
- method = self.__session._UpdateSession__prereq_rrset_exists_value
- rrset = create_rrset("www.example.org", RRClass.IN(), RRType.A(), 0)
- # empty one should not match
- self.__prereq_helper(method, False, rrset)
- # When the rdata is added, it should match
- add_rdata(rrset, "192.0.2.1")
- self.__prereq_helper(method, True, rrset)
- # But adding more should not
- add_rdata(rrset, "192.0.2.2")
- self.__prereq_helper(method, False, rrset)
- # Also test one with more than one RR
- rrset = create_rrset("example.org", RRClass.IN(), RRType.NS(), 0)
- self.__prereq_helper(method, False, rrset)
- add_rdata(rrset, "ns1.example.org.")
- self.__prereq_helper(method, False, rrset)
- add_rdata(rrset, "ns2.example.org")
- self.__prereq_helper(method, False, rrset)
- add_rdata(rrset, "ns3.example.org.")
- self.__prereq_helper(method, True, rrset)
- add_rdata(rrset, "ns4.example.org.")
- self.__prereq_helper(method, False, rrset)
- # Repeat that, but try a different order of Rdata addition
- rrset = create_rrset("example.org", RRClass.IN(), RRType.NS(), 0)
- self.__prereq_helper(method, False, rrset)
- add_rdata(rrset, "ns3.example.org.")
- self.__prereq_helper(method, False, rrset)
- add_rdata(rrset, "ns2.example.org.")
- self.__prereq_helper(method, False, rrset)
- add_rdata(rrset, "ns1.example.org.")
- self.__prereq_helper(method, True, rrset)
- add_rdata(rrset, "ns4.example.org.")
- self.__prereq_helper(method, False, rrset)
- # and test one where the name does not even exist
- rrset = create_rrset("doesnotexist.example.org", RRClass.IN(),
- RRType.A(), 0, [ "192.0.2.1" ])
- self.__prereq_helper(method, False, rrset)
- def __check_prerequisite_name_in_use_combined(self, method, rrclass,
- expected):
- '''shared code for the checks for the very similar (but reversed
- in behaviour) methods __prereq_name_in_use and
- __prereq_name_not_in_use
- '''
- rrset = create_rrset("example.org", rrclass, RRType.ANY(), 0)
- self.__prereq_helper(method, expected, rrset)
- rrset = create_rrset("www.example.org", rrclass, RRType.ANY(), 0)
- self.__prereq_helper(method, expected, rrset)
- rrset = create_rrset("doesnotexist.example.org", rrclass,
- RRType.ANY(), 0)
- self.__prereq_helper(method, not expected, rrset)
- rrset = create_rrset("belowdelegation.sub.example.org", rrclass,
- RRType.ANY(), 0)
- self.__prereq_helper(method, not expected, rrset)
- rrset = create_rrset("foo.wildcard.example.org", rrclass,
- RRType.ANY(), 0)
- self.__prereq_helper(method, not expected, rrset)
- # empty nonterminal should not match
- rrset = create_rrset("nonterminal.example.org", rrclass,
- RRType.ANY(), 0)
- self.__prereq_helper(method, not expected, rrset)
- rrset = create_rrset("empty.nonterminal.example.org", rrclass,
- RRType.ANY(), 0)
- self.__prereq_helper(method, expected, rrset)
- def test_check_prerequisite_name_in_use(self):
- method = self.__session._UpdateSession__prereq_name_in_use
- self.__check_prerequisite_name_in_use_combined(method,
- RRClass.ANY(),
- True)
- def test_check_prerequisite_name_not_in_use(self):
- method = self.__session._UpdateSession__prereq_name_not_in_use
- self.__check_prerequisite_name_in_use_combined(method,
- RRClass.NONE(),
- False)
- def check_prerequisite_result(self, expected, prerequisites):
- '''Helper method for checking the result of a prerequisite check;
- creates an update session, and fills it with the list of rrsets
- from 'prerequisites'. Then checks if __check_prerequisites()
- returns the Rcode specified in 'expected'.'''
- msg_data, msg = create_update_msg([TEST_ZONE_RECORD],
- prerequisites)
- zconfig = ZoneConfig([], TEST_RRCLASS, self.__datasrc_client)
- session = UpdateSession(msg, msg_data, TEST_CLIENT4, zconfig)
- session._UpdateSession__get_update_zone()
- # compare the to_text output of the rcodes (nicer error messages)
- # This call itself should also be done by handle(),
- # but just for better failures, it is first called on its own
- self.assertEqual(expected.to_text(),
- session._UpdateSession__check_prerequisites().to_text())
- # Now see if handle finds the same result
- (result, _, _) = session.handle()
- self.assertEqual(expected,
- session._UpdateSession__message.get_rcode())
- # And that the result looks right
- if expected == Rcode.NOERROR():
- self.assertEqual(UPDATE_SUCCESS, result)
- else:
- self.assertEqual(UPDATE_ERROR, result)
- def check_prescan_result(self, expected, updates, expected_soa = None):
- '''Helper method for checking the result of a prerequisite check;
- creates an update session, and fills it with the list of rrsets
- from 'updates'. Then checks if __do_prescan()
- returns the Rcode specified in 'expected'.'''
- msg_data, msg = create_update_msg([TEST_ZONE_RECORD],
- [], updates)
- zconfig = ZoneConfig([], TEST_RRCLASS, self.__datasrc_client)
- session = UpdateSession(msg, msg_data, TEST_CLIENT4, zconfig)
- session._UpdateSession__get_update_zone()
- # compare the to_text output of the rcodes (nicer error messages)
- # This call itself should also be done by handle(),
- # but just for better failures, it is first called on its own
- self.assertEqual(expected.to_text(),
- session._UpdateSession__do_prescan().to_text())
- # If there is an expected soa, check it
- self.assertEqual(str(expected_soa),
- str(session._UpdateSession__added_soa))
- def check_full_handle_result(self, expected, updates):
- '''Helper method for checking the result of a full handle;
- creates an update session, and fills it with the list of rrsets
- from 'updates'. Then checks if __handle()
- results in a response with rcode 'expected'.'''
- msg_data, msg = create_update_msg([TEST_ZONE_RECORD],
- [], updates)
- zconfig = ZoneConfig([], TEST_RRCLASS, self.__datasrc_client)
- session = UpdateSession(msg, msg_data, TEST_CLIENT4, zconfig)
- # Now see if handle finds the same result
- (result, _, _) = session.handle()
- self.assertEqual(expected.to_text(),
- session._UpdateSession__message.get_rcode().to_text())
- # And that the result looks right
- if expected == Rcode.NOERROR():
- self.assertEqual(UPDATE_SUCCESS, result)
- else:
- self.assertEqual(UPDATE_ERROR, result)
- def test_check_prerequisites(self):
- # This test checks if the actual prerequisite-type-specific
- # methods are called.
- # It does test all types of prerequisites, but it does not test
- # every possible result for those types (those are tested above,
- # in the specific prerequisite type tests)
- # Let's first define a number of prereq's that should succeed
- rrset_exists_yes = create_rrset("example.org", RRClass.ANY(),
- RRType.SOA(), 0)
- rrset_exists_value_yes = create_rrset("www.example.org", RRClass.IN(),
- RRType.A(), 0, [ "192.0.2.1" ])
- rrset_does_not_exist_yes = create_rrset("foo.example.org",
- RRClass.NONE(), RRType.SOA(),
- 0)
- name_in_use_yes = create_rrset("www.example.org", RRClass.ANY(),
- RRType.ANY(), 0)
- name_not_in_use_yes = create_rrset("foo.example.org", RRClass.NONE(),
- RRType.ANY(), 0)
- rrset_exists_value_1 = create_rrset("example.org", RRClass.IN(),
- RRType.NS(), 0,
- [ "ns1.example.org" ])
- rrset_exists_value_2 = create_rrset("example.org", RRClass.IN(),
- RRType.NS(), 0,
- [ "ns2.example.org" ])
- rrset_exists_value_3 = create_rrset("example.org", RRClass.IN(),
- RRType.NS(), 0,
- [ "ns3.example.org" ])
- # and a number that should not
- rrset_exists_no = create_rrset("foo.example.org", RRClass.ANY(),
- RRType.SOA(), 0)
- rrset_exists_value_no = create_rrset("www.example.org", RRClass.IN(),
- RRType.A(), 0, [ "192.0.2.2" ])
- rrset_does_not_exist_no = create_rrset("example.org", RRClass.NONE(),
- RRType.SOA(), 0)
- name_in_use_no = create_rrset("foo.example.org", RRClass.ANY(),
- RRType.ANY(), 0)
- name_not_in_use_no = create_rrset("www.example.org", RRClass.NONE(),
- RRType.ANY(), 0)
- # check 'no' result codes
- self.check_prerequisite_result(Rcode.NXRRSET(),
- [ rrset_exists_no ])
- self.check_prerequisite_result(Rcode.NXRRSET(),
- [ rrset_exists_value_no ])
- self.check_prerequisite_result(Rcode.YXRRSET(),
- [ rrset_does_not_exist_no ])
- self.check_prerequisite_result(Rcode.NXDOMAIN(),
- [ name_in_use_no ])
- self.check_prerequisite_result(Rcode.YXDOMAIN(),
- [ name_not_in_use_no ])
- # the 'yes' codes should result in ok
- # individually
- self.check_prerequisite_result(Rcode.NOERROR(),
- [ rrset_exists_yes ] )
- self.check_prerequisite_result(Rcode.NOERROR(),
- [ rrset_exists_value_yes ])
- self.check_prerequisite_result(Rcode.NOERROR(),
- [ rrset_does_not_exist_yes ])
- self.check_prerequisite_result(Rcode.NOERROR(),
- [ name_in_use_yes ])
- self.check_prerequisite_result(Rcode.NOERROR(),
- [ name_not_in_use_yes ])
- self.check_prerequisite_result(Rcode.NOERROR(),
- [ rrset_exists_value_1,
- rrset_exists_value_2,
- rrset_exists_value_3])
- # and together
- self.check_prerequisite_result(Rcode.NOERROR(),
- [ rrset_exists_yes,
- rrset_exists_value_yes,
- rrset_does_not_exist_yes,
- name_in_use_yes,
- name_not_in_use_yes,
- rrset_exists_value_1,
- rrset_exists_value_2,
- rrset_exists_value_3])
- # try out a permutation, note that one rrset is split up,
- # and the order of the RRs should not matter
- self.check_prerequisite_result(Rcode.NOERROR(),
- [ rrset_exists_value_3,
- rrset_exists_yes,
- rrset_exists_value_2,
- name_in_use_yes,
- rrset_exists_value_1])
- # Should fail on the first error, even if most of the
- # prerequisites are ok
- self.check_prerequisite_result(Rcode.NXDOMAIN(),
- [ rrset_exists_value_3,
- rrset_exists_yes,
- rrset_exists_value_2,
- name_in_use_yes,
- name_in_use_no,
- rrset_exists_value_1])
- def test_prerequisite_notzone(self):
- rrset = create_rrset("some.other.zone.", RRClass.ANY(), RRType.SOA(), 0)
- self.check_prerequisite_result(Rcode.NOTZONE(), [ rrset ])
- def test_prerequisites_formerr(self):
- # test for form errors in the prerequisite section
- # Class ANY, non-zero TTL
- rrset = create_rrset("example.org", RRClass.ANY(), RRType.SOA(), 1)
- self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])
- # Class ANY, but with rdata
- rrset = create_rrset("example.org", RRClass.ANY(), RRType.A(), 0,
- [ b'\x00\x00\x00\x00' ])
- self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])
- # Class NONE, non-zero TTL
- rrset = create_rrset("example.org", RRClass.NONE(), RRType.SOA(), 1)
- self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])
- # Class NONE, but with rdata
- rrset = create_rrset("example.org", RRClass.NONE(), RRType.A(), 0,
- [ b'\x00\x00\x00\x00' ])
- self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])
- # Matching class and type, but non-zero TTL
- rrset = create_rrset("www.example.org", RRClass.IN(), RRType.A(), 1,
- [ "192.0.2.1" ])
- self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])
- # Completely different class
- rrset = create_rrset("example.org", RRClass.CH(), RRType.TXT(), 0,
- [ "foo" ])
- self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])
- def __prereq_helper(self, method, expected, rrset):
- '''Calls the given method with self.__datasrc_client
- and the given rrset, and compares the return value.
- Function does not do much but makes the code look nicer'''
- self.assertEqual(expected, method(rrset))
- def __initialize_update_rrsets(self):
- '''Prepare a number of RRsets to be used in several update tests
- The rrsets are stored in self'''
- orig_a_rrset = create_rrset("www.example.org", TEST_RRCLASS,
- RRType.A(), 3600, [ "192.0.2.1" ])
- self.orig_a_rrset = orig_a_rrset
- rrset_update_a = create_rrset("www.example.org", TEST_RRCLASS,
- RRType.A(), 3600,
- [ "192.0.2.2", "192.0.2.3" ])
- self.rrset_update_a = rrset_update_a
- rrset_update_soa = create_rrset("example.org", TEST_RRCLASS,
- RRType.SOA(), 3600,
- [ "ns1.example.org. " +
- "admin.example.org. " +
- "1233 3600 1800 2419200 7200" ])
- self.rrset_update_soa = rrset_update_soa
- rrset_update_soa_del = create_rrset("example.org", RRClass.NONE(),
- RRType.SOA(), 0,
- [ "ns1.example.org. " +
- "admin.example.org. " +
- "1233 3600 1800 2419200 7200" ])
- self.rrset_update_soa_del = rrset_update_soa_del
- rrset_update_soa2 = create_rrset("example.org", TEST_RRCLASS,
- RRType.SOA(), 3600,
- [ "ns1.example.org. " +
- "admin.example.org. " +
- "4000 3600 1800 2419200 7200" ])
- self.rrset_update_soa2 = rrset_update_soa2
- rrset_update_del_name = create_rrset("www.example.org", RRClass.ANY(),
- RRType.ANY(), 0)
- self.rrset_update_del_name = rrset_update_del_name
- rrset_update_del_name_apex = create_rrset("example.org", RRClass.ANY(),
- RRType.ANY(), 0)
- self.rrset_update_del_name_apex = rrset_update_del_name_apex
- rrset_update_del_rrset = create_rrset("www.example.org", RRClass.ANY(),
- RRType.A(), 0)
- self.rrset_update_del_rrset = rrset_update_del_rrset
- rrset_update_del_mx_apex = create_rrset("example.org", RRClass.ANY(),
- RRType.MX(), 0)
- self.rrset_update_del_mx_apex = rrset_update_del_mx_apex
- rrset_update_del_soa_apex = create_rrset("example.org", RRClass.ANY(),
- RRType.SOA(), 0)
- self.rrset_update_del_soa_apex = rrset_update_del_soa_apex
- rrset_update_del_ns_apex = create_rrset("example.org", RRClass.ANY(),
- RRType.NS(), 0)
- self.rrset_update_del_ns_apex = rrset_update_del_ns_apex
- rrset_update_del_rrset_part = create_rrset("www.example.org",
- RRClass.NONE(), RRType.A(),
- 0,
- [ b'\xc0\x00\x02\x02',
- b'\xc0\x00\x02\x03' ])
- self.rrset_update_del_rrset_part = rrset_update_del_rrset_part
- rrset_update_del_rrset_ns = create_rrset("example.org", RRClass.NONE(),
- RRType.NS(), 0,
- [ b'\x03ns1\x07example\x03org\x00',
- b'\x03ns2\x07example\x03org\x00',
- b'\x03ns3\x07example\x03org\x00' ])
- self.rrset_update_del_rrset_ns = rrset_update_del_rrset_ns
- rrset_update_del_rrset_mx = create_rrset("example.org", RRClass.NONE(),
- RRType.MX(), 0,
- [ b'\x00\x0a\x04mail\x07example\x03org\x00' ])
- self.rrset_update_del_rrset_mx = rrset_update_del_rrset_mx
- def test_prescan(self):
- '''Test whether the prescan succeeds on data that is ok, and whether
- if notices the SOA if present'''
- # prepare a set of correct update statements
- self.__initialize_update_rrsets()
- self.check_prescan_result(Rcode.NOERROR(), [ self.rrset_update_a ])
- # check if soa is noticed
- self.check_prescan_result(Rcode.NOERROR(), [ self.rrset_update_soa ],
- self.rrset_update_soa)
- # Other types of succesful prechecks
- self.check_prescan_result(Rcode.NOERROR(), [ self.rrset_update_soa2 ],
- self.rrset_update_soa2)
- self.check_prescan_result(Rcode.NOERROR(),
- [ self.rrset_update_del_name ])
- self.check_prescan_result(Rcode.NOERROR(),
- [ self.rrset_update_del_name_apex ])
- self.check_prescan_result(Rcode.NOERROR(),
- [ self.rrset_update_del_rrset ])
- self.check_prescan_result(Rcode.NOERROR(),
- [ self.rrset_update_del_mx_apex ])
- self.check_prescan_result(Rcode.NOERROR(),
- [ self.rrset_update_del_rrset_part ])
- # and check a few permutations of the above
- # all of them (with one of the soas)
- self.check_prescan_result(Rcode.NOERROR(),
- [
- self.rrset_update_a,
- self.rrset_update_soa,
- self.rrset_update_del_name,
- self.rrset_update_del_name_apex,
- self.rrset_update_del_rrset,
- self.rrset_update_del_mx_apex,
- self.rrset_update_del_rrset_part
- ],
- self.rrset_update_soa)
- # Two soas. Should we reject or simply use the last?
- # (RFC is not really explicit on this, but between the lines I read
- # use the last)
- self.check_prescan_result(Rcode.NOERROR(),
- [ self.rrset_update_soa,
- self.rrset_update_soa2 ],
- self.rrset_update_soa2)
- self.check_prescan_result(Rcode.NOERROR(),
- [ self.rrset_update_soa2,
- self.rrset_update_soa ],
- self.rrset_update_soa)
- self.check_prescan_result(Rcode.NOERROR(),
- [
- self.rrset_update_del_mx_apex,
- self.rrset_update_del_name,
- self.rrset_update_del_name_apex,
- self.rrset_update_del_rrset_part,
- self.rrset_update_a,
- self.rrset_update_del_rrset,
- self.rrset_update_soa
- ],
- self.rrset_update_soa)
- def test_prescan_failures(self):
- '''Test whether prescan fails on bad data'''
- # out of zone data
- rrset = create_rrset("different.zone", RRClass.ANY(), RRType.TXT(), 0)
- self.check_prescan_result(Rcode.NOTZONE(), [ rrset ])
- # forbidden type, zone class
- rrset = create_rrset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.ANY(), 0,
- [ b'\x00' ])
- self.check_prescan_result(Rcode.FORMERR(), [ rrset ])
- # non-zero TTL, class ANY
- rrset = create_rrset(TEST_ZONE_NAME, RRClass.ANY(), RRType.TXT(), 1)
- self.check_prescan_result(Rcode.FORMERR(), [ rrset ])
- # non-zero Rdata, class ANY
- rrset = create_rrset(TEST_ZONE_NAME, RRClass.ANY(), RRType.TXT(), 0,
- [ "foo" ])
- self.check_prescan_result(Rcode.FORMERR(), [ rrset ])
- # forbidden type, class ANY
- rrset = create_rrset(TEST_ZONE_NAME, RRClass.ANY(), RRType.AXFR(), 0,
- [ b'\x00' ])
- self.check_prescan_result(Rcode.FORMERR(), [ rrset ])
- # non-zero TTL, class NONE
- rrset = create_rrset(TEST_ZONE_NAME, RRClass.NONE(), RRType.TXT(), 1)
- self.check_prescan_result(Rcode.FORMERR(), [ rrset ])
- # forbidden type, class NONE
- rrset = create_rrset(TEST_ZONE_NAME, RRClass.NONE(), RRType.AXFR(), 0,
- [ b'\x00' ])
- self.check_prescan_result(Rcode.FORMERR(), [ rrset ])
- def __check_inzone_data(self, expected_result, name, rrtype,
- expected_rrset = None):
- '''Does a find on TEST_ZONE for the given rrset's name and type,
- then checks if the result matches the expected result.
- If so, and if expected_rrset is given, they are compared as
- well.'''
- _, finder = self.__datasrc_client.find_zone(TEST_ZONE_NAME)
- result, found_rrset, _ = finder.find(name, rrtype,
- finder.NO_WILDCARD |
- finder.FIND_GLUE_OK)
- self.assertEqual(expected_result, result)
- # Sigh. Need rrsets.compare() again.
- # To be sure, compare name, class, type, and ttl
- if expected_rrset is not None:
- self.assertEqual(expected_rrset.get_name(), found_rrset.get_name())
- self.assertEqual(expected_rrset.get_class(), found_rrset.get_class())
- self.assertEqual(expected_rrset.get_type(), found_rrset.get_type())
- self.assertEqual(expected_rrset.get_ttl().to_text(),
- found_rrset.get_ttl().to_text())
- expected_rdata =\
- [ rdata.to_text() for rdata in expected_rrset.get_rdata() ]
- found_rdata =\
- [ rdata.to_text() for rdata in found_rrset.get_rdata() ]
- expected_rdata.sort()
- found_rdata.sort()
- self.assertEqual(expected_rdata, found_rdata)
- def test_update_add_delete_rrset(self):
- '''
- Tests a sequence of related add and delete updates. Some other
- cases are tested by later tests.
- '''
- self.__initialize_update_rrsets()
- # initially, the www should only contain one rr
- # (set to self.orig_a_rrset)
- # during this test, we will extend it at some point
- extended_a_rrset = create_rrset("www.example.org", TEST_RRCLASS,
- RRType.A(), 3600,
- [ "192.0.2.1",
- "192.0.2.2",
- "192.0.2.3" ])
- # Sanity check, make sure original data is really there before updates
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("www.example.org"),
- RRType.A(),
- self.orig_a_rrset)
- # Add two rrs
- self.check_full_handle_result(Rcode.NOERROR(), [ self.rrset_update_a ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("www.example.org"),
- RRType.A(),
- extended_a_rrset)
- # Adding the same RRsets should not make a difference.
- self.check_full_handle_result(Rcode.NOERROR(), [ self.rrset_update_a ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("www.example.org"),
- RRType.A(),
- extended_a_rrset)
- # Now delete those two, and we should end up with the original RRset
- self.check_full_handle_result(Rcode.NOERROR(),
- [ self.rrset_update_del_rrset_part ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("www.example.org"),
- RRType.A(),
- self.orig_a_rrset)
- # 'Deleting' them again should make no difference
- self.check_full_handle_result(Rcode.NOERROR(),
- [ self.rrset_update_del_rrset_part ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("www.example.org"),
- RRType.A(),
- self.orig_a_rrset)
- # But deleting the entire rrset, independent of its contents, should
- # work
- self.check_full_handle_result(Rcode.NOERROR(),
- [ self.rrset_update_del_rrset ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
- isc.dns.Name("www.example.org"),
- RRType.A())
- # Check that if we update the SOA, it is updated to our value
- self.check_full_handle_result(Rcode.NOERROR(),
- [ self.rrset_update_soa2 ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("example.org"),
- RRType.SOA(),
- self.rrset_update_soa2)
- def test_glue_deletions(self):
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("sub.example.org."),
- RRType.NS())
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("ns.sub.example.org."),
- RRType.A())
- # See that we can delete glue
- rrset_delete_glue = create_rrset("ns.sub.example.org.",
- RRClass.ANY(),
- RRType.A(),
- 0)
- self.check_full_handle_result(Rcode.NOERROR(),
- [ rrset_delete_glue ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("sub.example.org."),
- RRType.NS())
- self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
- isc.dns.Name("ns.sub.example.org."),
- RRType.A())
- # Check that we don't accidentally delete a delegation if we
- # try to delete non-existent glue
- rrset_delete_nonexistent_glue = create_rrset("foo.sub.example.org.",
- RRClass.ANY(),
- RRType.A(),
- 0)
- self.check_full_handle_result(Rcode.NOERROR(),
- [ rrset_delete_nonexistent_glue ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("sub.example.org."),
- RRType.NS())
- def test_update_add_new_data(self):
- '''
- This tests adds data where none is present
- '''
- # Add data at a completely new name
- self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
- isc.dns.Name("new.example.org"),
- RRType.A())
- rrset = create_rrset("new.example.org", TEST_RRCLASS, RRType.A(),
- 3600, [ "192.0.2.1", "192.0.2.2" ])
- self.check_full_handle_result(Rcode.NOERROR(), [ rrset ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("new.example.org"),
- RRType.A(),
- rrset)
- # Also try a name where data is present, but none of this
- # specific type
- self.__check_inzone_data(isc.datasrc.ZoneFinder.NXRRSET,
- isc.dns.Name("new.example.org"),
- RRType.TXT())
- rrset = create_rrset("new.example.org", TEST_RRCLASS, RRType.TXT(),
- 3600, [ "foo" ])
- self.check_full_handle_result(Rcode.NOERROR(), [ rrset ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("new.example.org"),
- RRType.TXT(),
- rrset)
- def test_update_add_new_data_interspersed(self):
- '''
- This tests adds data where none is present, similar to
- test_update_add_new_data, but this time the second RRset
- is put into the record between the two RRs of the first
- RRset.
- '''
- # Add data at a completely new name
- self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
- isc.dns.Name("new_a.example.org"),
- RRType.A())
- self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
- isc.dns.Name("new_txt.example.org"),
- RRType.TXT())
- rrset1 = create_rrset("new_a.example.org", TEST_RRCLASS, RRType.A(),
- 3600, [ "192.0.2.1" ])
- rrset2 = create_rrset("new_txt.example.org", TEST_RRCLASS, RRType.TXT(),
- 3600, [ "foo" ])
- rrset3 = create_rrset("new_a.example.org", TEST_RRCLASS, RRType.A(),
- 3600, [ "192.0.2.2" ])
- self.check_full_handle_result(Rcode.NOERROR(),
- [ rrset1, rrset2, rrset3 ])
- # The update should have merged rrset1 and rrset3
- rrset_merged = create_rrset("new_a.example.org", TEST_RRCLASS,
- RRType.A(), 3600,
- [ "192.0.2.1", "192.0.2.2" ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("new_a.example.org"),
- RRType.A(),
- rrset_merged)
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("new_txt.example.org"),
- RRType.TXT(),
- rrset2)
- def test_update_delete_name(self):
- self.__initialize_update_rrsets()
- # First check it is there
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("www.example.org"),
- RRType.A())
- # Delete the entire name
- self.check_full_handle_result(Rcode.NOERROR(),
- [ self.rrset_update_del_name ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
- isc.dns.Name("www.example.org"),
- RRType.A())
- # Should still be gone after pointless second delete
- self.check_full_handle_result(Rcode.NOERROR(),
- [ self.rrset_update_del_name ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
- isc.dns.Name("www.example.org"),
- RRType.A())
- def test_update_apex_special_cases(self):
- '''
- Tests a few special cases when deleting data from the apex
- '''
- self.__initialize_update_rrsets()
- # the original SOA
- orig_soa_rrset = create_rrset("example.org", TEST_RRCLASS,
- RRType.SOA(), 3600,
- [ "ns1.example.org. " +
- "admin.example.org. " +
- "1234 3600 1800 2419200 7200" ])
- # We will delete some of the NS records
- orig_ns_rrset = create_rrset("example.org", TEST_RRCLASS,
- RRType.NS(), 3600,
- [ "ns1.example.org.",
- "ns2.example.org.",
- "ns3.example.org." ])
- # Sanity check, make sure original data is really there before updates
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("example.org"),
- RRType.NS(),
- orig_ns_rrset)
- # We will delete the MX record later in this test, so let's make
- # sure that it exists (we do not care about its value)
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("example.org"),
- RRType.MX())
- # Check that we cannot delete the SOA record by direction deletion
- # both by name+type and by full rrset
- self.check_full_handle_result(Rcode.NOERROR(),
- [ self.rrset_update_del_soa_apex,
- self.rrset_update_soa_del ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("example.org"),
- RRType.SOA(),
- orig_soa_rrset)
- # If we delete everything at the apex, the SOA and NS rrsets should be
- # untouched
- self.check_full_handle_result(Rcode.NOERROR(),
- [ self.rrset_update_del_name_apex ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("example.org"),
- RRType.SOA(),
- orig_soa_rrset)
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("example.org"),
- RRType.NS(),
- orig_ns_rrset)
- # but the MX should be gone
- self.__check_inzone_data(isc.datasrc.ZoneFinder.NXRRSET,
- isc.dns.Name("example.org"),
- RRType.MX())
- # Deleting the NS rrset by name and type only, it should also be left
- # untouched
- self.check_full_handle_result(Rcode.NOERROR(),
- [ self.rrset_update_del_ns_apex ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("example.org"),
- RRType.NS(),
- orig_ns_rrset)
- def DISABLED_test_update_apex_special_case_ns_rrset(self):
- # If we delete the NS at the apex specifically, it should still
- # keep one record
- self.__initialize_update_rrsets()
- # When we are done, we should have a reduced NS rrset
- short_ns_rrset = create_rrset("example.org", TEST_RRCLASS,
- RRType.NS(), 3600,
- [ "ns3.example.org." ])
- self.check_full_handle_result(Rcode.NOERROR(),
- [ self.rrset_update_del_rrset_ns ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("example.org"),
- RRType.NS(),
- short_ns_rrset)
- def test_update_delete_normal_rrset_at_apex(self):
- '''
- Tests a number of 'normal rrset' deletes at the apex
- '''
- # MX should simply be deleted
- self.__initialize_update_rrsets()
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("example.org"),
- RRType.MX())
- self.check_full_handle_result(Rcode.NOERROR(),
- [ self.rrset_update_del_rrset_mx ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.NXRRSET,
- isc.dns.Name("example.org"),
- RRType.MX())
- def test_update_cname_special_cases(self):
- self.__initialize_update_rrsets()
- # Sanity check
- orig_cname_rrset = create_rrset("cname.example.org", TEST_RRCLASS,
- RRType.CNAME(), 3600,
- [ "www.example.org." ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.CNAME,
- isc.dns.Name("cname.example.org"),
- RRType.A(),
- orig_cname_rrset)
- # If we try to add data where a cname is preset
- rrset = create_rrset("cname.example.org", TEST_RRCLASS, RRType.A(),
- 3600, [ "192.0.2.1" ])
- self.check_full_handle_result(Rcode.NOERROR(), [ rrset ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.CNAME,
- isc.dns.Name("cname.example.org"),
- RRType.A(),
- orig_cname_rrset)
- # But updating the cname itself should work
- new_cname_rrset = create_rrset("cname.example.org", TEST_RRCLASS,
- RRType.CNAME(), 3600,
- [ "mail.example.org." ])
- self.check_full_handle_result(Rcode.NOERROR(), [ new_cname_rrset ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.CNAME,
- isc.dns.Name("cname.example.org"),
- RRType.A(),
- new_cname_rrset)
- self.__initialize_update_rrsets()
- # Likewise, adding a cname where other data is
- # present should do nothing either
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("www.example.org"),
- RRType.A(),
- self.orig_a_rrset)
- new_cname_rrset = create_rrset("www.example.org", TEST_RRCLASS,
- RRType.CNAME(), 3600,
- [ "mail.example.org." ])
- self.check_full_handle_result(Rcode.NOERROR(), [ new_cname_rrset ])
- self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
- isc.dns.Name("www.example.org"),
- RRType.A(),
- self.orig_a_rrset)
- def test_update_bad_class(self):
- rrset = create_rrset("example.org.", RRClass.CH(), RRType.TXT(), 0,
- [ "foo" ])
- self.check_full_handle_result(Rcode.FORMERR(), [ rrset ])
- def test_uncaught_exception(self):
- def my_exc():
- raise Exception("foo")
- self.__session._UpdateSession__update_soa = my_exc
- self.assertEqual(Rcode.SERVFAIL().to_text(),
- self.__session._UpdateSession__do_update().to_text())
- if __name__ == "__main__":
- isc.log.init("bind10")
- isc.log.resetUnitTestRootLogger()
- unittest.main()
|