# Copyright (C) 2012  Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

import os
import shutil
import isc.log
import unittest
from isc.dns import *
from isc.datasrc import DataSourceClient
from isc.ddns.session import *
from isc.ddns.zone_config import *

# Some common test parameters
TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
READ_ZONE_DB_FILE = TESTDATA_PATH + "rwtest.sqlite3" # original, to be copied
TESTDATA_WRITE_PATH = os.environ['TESTDATA_WRITE_PATH'] + os.sep
WRITE_ZONE_DB_FILE = TESTDATA_WRITE_PATH + "rwtest.sqlite3.copied"
WRITE_ZONE_DB_CONFIG = "{ \"database_file\": \"" + WRITE_ZONE_DB_FILE + "\"}"

TEST_ZONE_NAME = Name('example.org')
UPDATE_RRTYPE = RRType.SOA()
TEST_RRCLASS = RRClass.IN()
TEST_ZONE_RECORD = Question(TEST_ZONE_NAME, TEST_RRCLASS, UPDATE_RRTYPE)
TEST_CLIENT6 = ('2001:db8::1', 53, 0, 0)
TEST_CLIENT4 = ('192.0.2.1', 53)
# TSIG key for tests when needed.  The key name is TEST_ZONE_NAME.
TEST_TSIG_KEY = TSIGKey("example.org:SFuWd/q99SzF8Yzd1QbB9g==")

def create_update_msg(zones=[TEST_ZONE_RECORD], prerequisites=[],
                      updates=[], tsig_key=None):
    msg = Message(Message.RENDER)
    msg.set_qid(5353)           # arbitrary chosen
    msg.set_opcode(Opcode.UPDATE())
    msg.set_rcode(Rcode.NOERROR())
    for z in zones:
        msg.add_question(z)
    for p in prerequisites:
        msg.add_rrset(SECTION_PREREQUISITE, p)
    for u in updates:
        msg.add_rrset(SECTION_UPDATE, u)

    renderer = MessageRenderer()
    if tsig_key is not None:
        msg.to_wire(renderer, TSIGContext(tsig_key))
    else:
        msg.to_wire(renderer)

    # re-read the created data in the parse mode
    msg.clear(Message.PARSE)
    msg.from_wire(renderer.get_data(), Message.PRESERVE_ORDER)

    return msg

def add_rdata(rrset, rdata):
    '''
    Helper function for easily adding Rdata fields to RRsets.
    This function assumes the given rdata is of type string or bytes,
    and corresponds to the given rrset
    '''
    rrset.add_rdata(isc.dns.Rdata(rrset.get_type(),
                                  rrset.get_class(),
                                  rdata))

def create_rrset(name, rrclass, rrtype, ttl, rdatas = []):
    '''
    Helper method to easily create RRsets, auto-converts
    name, rrclass, rrtype,  and ttl (if possibly through their
    respective constructors)
    rdatas is a list of rr data strings, or bytestrings, which
    should match the RRType of the rrset to create
    '''
    if type(name) != Name:
        name = Name(name)
    if type(rrclass) != RRClass:
        rrclass = RRClass(rrclass)
    if type(rrtype) != RRType:
        rrtype = RRType(rrtype)
    if type(ttl) != RRTTL:
        ttl = RRTTL(ttl)
    rrset = isc.dns.RRset(name, rrclass, rrtype, ttl)
    for rdata in rdatas:
        add_rdata(rrset, rdata)
    return rrset

class SessionTestBase(unittest.TestCase):
    '''Base class for all sesion related tests.

    It just initializes common test parameters in its setUp() and defines
    some common utility method(s).

    '''
    def setUp(self):
        shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
        self._datasrc_client = DataSourceClient("sqlite3",
                                                WRITE_ZONE_DB_CONFIG)
        self._update_msg = create_update_msg()
        self._acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
                             REQUEST_LOADER.load([{"action": "ACCEPT"}])}
        self._session = UpdateSession(self._update_msg, TEST_CLIENT4,
                                      ZoneConfig([], TEST_RRCLASS,
                                                 self._datasrc_client,
                                                 self._acl_map))
        self._session._UpdateSession__get_update_zone()

    def check_response(self, msg, expected_rcode):
        '''Perform common checks on update resposne message.'''
        self.assertTrue(msg.get_header_flag(Message.HEADERFLAG_QR))
        # note: we convert opcode to text it'd be more helpful on failure.
        self.assertEqual(Opcode.UPDATE().to_text(), msg.get_opcode().to_text())
        self.assertEqual(expected_rcode.to_text(), msg.get_rcode().to_text())
        # All sections should be cleared
        self.assertEqual(0, msg.get_rr_count(SECTION_ZONE))
        self.assertEqual(0, msg.get_rr_count(SECTION_PREREQUISITE))
        self.assertEqual(0, msg.get_rr_count(SECTION_UPDATE))
        self.assertEqual(0, msg.get_rr_count(Message.SECTION_ADDITIONAL))

class SessionTest(SessionTestBase):
    '''Basic session tests'''

    def test_handle(self):
        '''Basic update case'''
        result, zname, zclass = self._session.handle()
        self.assertEqual(UPDATE_SUCCESS, result)
        self.assertEqual(TEST_ZONE_NAME, zname)
        self.assertEqual(TEST_RRCLASS, zclass)

        # Just checking these are different from the success code.
        self.assertNotEqual(UPDATE_ERROR, result)
        self.assertNotEqual(UPDATE_DROP, result)

    def test_broken_request(self):
        # Zone section is empty
        msg = create_update_msg(zones=[])
        session = UpdateSession(msg, TEST_CLIENT6, None)
        result, zname, zclass = session.handle()
        self.assertEqual(UPDATE_ERROR, result)
        self.assertEqual(None, zname)
        self.assertEqual(None, zclass)
        self.check_response(session.get_message(), Rcode.FORMERR())

        # Zone section contains multiple records
        msg = create_update_msg(zones=[TEST_ZONE_RECORD, TEST_ZONE_RECORD])
        session = UpdateSession(msg, TEST_CLIENT4, None)
        self.assertEqual(UPDATE_ERROR, session.handle()[0])
        self.check_response(session.get_message(), Rcode.FORMERR())

        # Zone section's type is not SOA
        msg = create_update_msg(zones=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
                                                RRType.A())])
        session = UpdateSession(msg, TEST_CLIENT4, None)
        self.assertEqual(UPDATE_ERROR, session.handle()[0])
        self.check_response(session.get_message(), Rcode.FORMERR())

    def test_update_secondary(self):
        # specified zone is configured as a secondary.  Since this
        # implementation doesn't support update forwarding, the result
        # should be NOTIMP.
        msg = create_update_msg(zones=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
                                                RRType.SOA())])
        session = UpdateSession(msg, TEST_CLIENT4,
                                ZoneConfig([(TEST_ZONE_NAME, TEST_RRCLASS)],
                                           TEST_RRCLASS, self._datasrc_client))
        self.assertEqual(UPDATE_ERROR, session.handle()[0])
        self.check_response(session.get_message(), Rcode.NOTIMP())

    def check_notauth(self, zname, zclass=TEST_RRCLASS):
        '''Common test sequence for the 'notauth' test'''
        msg = create_update_msg(zones=[Question(zname, zclass, RRType.SOA())])
        session = UpdateSession(msg, TEST_CLIENT4,
                                ZoneConfig([(TEST_ZONE_NAME, TEST_RRCLASS)],
                                           TEST_RRCLASS, self._datasrc_client))
        self.assertEqual(UPDATE_ERROR, session.handle()[0])
        self.check_response(session.get_message(), Rcode.NOTAUTH())

    def test_update_notauth(self):
        '''Update attempt for non authoritative zones'''
        # zone name doesn't match
        self.check_notauth(Name('example.com'))
        # zone name is a subdomain of the actual authoritative zone
        # (match must be exact)
        self.check_notauth(Name('sub.example.org'))
        # zone class doesn't match
        self.check_notauth(Name('example.org'), RRClass.CH())

    def test_update_datasrc_error(self):
        # if the data source client raises an exception, it should result in
        # a SERVFAIL.
        class BadDataSourceClient:
            def find_zone(self, name):
                raise isc.datasrc.Error('faked exception')
        msg = create_update_msg(zones=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
                                                RRType.SOA())])
        session = UpdateSession(msg, TEST_CLIENT4,
                                ZoneConfig([(TEST_ZONE_NAME, TEST_RRCLASS)],
                                           TEST_RRCLASS,
                                           BadDataSourceClient()))
        self.assertEqual(UPDATE_ERROR, session.handle()[0])
        self.check_response(session.get_message(), Rcode.SERVFAIL())

    def test_foreach_rr_in_rrset(self):
        rrset = create_rrset("www.example.org", TEST_RRCLASS,
                             RRType.A(), 3600, [ "192.0.2.1" ])

        l = []
        for rr in foreach_rr(rrset):
            l.append(str(rr))
        self.assertEqual(["www.example.org. 3600 IN A 192.0.2.1\n"], l)

        add_rdata(rrset, "192.0.2.2")
        add_rdata(rrset, "192.0.2.3")

        # but through the generator, there should be several 1-line entries
        l = []
        for rr in foreach_rr(rrset):
            l.append(str(rr))
        self.assertEqual(["www.example.org. 3600 IN A 192.0.2.1\n",
                          "www.example.org. 3600 IN A 192.0.2.2\n",
                          "www.example.org. 3600 IN A 192.0.2.3\n",
                         ], l)

    def test_convert_rrset_class(self):
        # Converting an RRSET to a different class should work
        # if the rdata types can be converted
        rrset = create_rrset("www.example.org", RRClass.NONE(), RRType.A(),
                             3600, [ b'\xc0\x00\x02\x01', b'\xc0\x00\x02\x02'])

        rrset2 = convert_rrset_class(rrset, RRClass.IN())
        self.assertEqual("www.example.org. 3600 IN A 192.0.2.1\n" +
                         "www.example.org. 3600 IN A 192.0.2.2\n",
                         str(rrset2))

        rrset3 = convert_rrset_class(rrset2, RRClass.NONE())
        self.assertEqual("www.example.org. 3600 CLASS254 A \\# 4 " +
                         "c0000201\nwww.example.org. 3600 CLASS254 " +
                         "A \\# 4 c0000202\n",
                         str(rrset3))

        # depending on what type of bad data is given, a number
        # of different exceptions could be raised (TODO: i recall
        # there was a ticket about making a better hierarchy for
        # dns/parsing related exceptions)
        self.assertRaises(InvalidRdataLength, convert_rrset_class,
                          rrset, RRClass.CH())
        add_rdata(rrset, b'\xc0\x00')
        self.assertRaises(DNSMessageFORMERR, convert_rrset_class,
                          rrset, RRClass.IN())

    def test_collect_rrsets(self):
        '''
        Tests the 'rrset collector' method, which collects rrsets
        with the same name and type
        '''
        collected = []

        collect_rrsets(collected, create_rrset("a.example.org", RRClass.IN(),
                                               RRType.A(), 0, [ "192.0.2.1" ]))
        # Same name and class, different type
        collect_rrsets(collected, create_rrset("a.example.org", RRClass.IN(),
                                               RRType.TXT(), 0, [ "one" ]))
        collect_rrsets(collected, create_rrset("a.example.org", RRClass.IN(),
                                               RRType.A(), 0, [ "192.0.2.2" ]))
        collect_rrsets(collected, create_rrset("a.example.org", RRClass.IN(),
                                               RRType.TXT(), 0, [ "two" ]))
        # Same class and type as an existing one, different name
        collect_rrsets(collected, create_rrset("b.example.org", RRClass.IN(),
                                               RRType.A(), 0, [ "192.0.2.3" ]))
        # Same name and type as an existing one, different class
        collect_rrsets(collected, create_rrset("a.example.org", RRClass.CH(),
                                               RRType.TXT(), 0, [ "one" ]))
        collect_rrsets(collected, create_rrset("b.example.org", RRClass.IN(),
                                               RRType.A(), 0, [ "192.0.2.4" ]))
        collect_rrsets(collected, create_rrset("a.example.org", RRClass.CH(),
                                               RRType.TXT(), 0, [ "two" ]))

        strings = [ rrset.to_text() for rrset in collected ]
        # note + vs , in this list
        expected = ['a.example.org. 0 IN A 192.0.2.1\n' +
                    'a.example.org. 0 IN A 192.0.2.2\n',
                    'a.example.org. 0 IN TXT "one"\n' +
                    'a.example.org. 0 IN TXT "two"\n',
                    'b.example.org. 0 IN A 192.0.2.3\n' +
                    'b.example.org. 0 IN A 192.0.2.4\n',
                    'a.example.org. 0 CH TXT "one"\n' +
                    'a.example.org. 0 CH TXT "two"\n']

        self.assertEqual(expected, strings)

    def __prereq_helper(self, method, expected, rrset):
        '''Calls the given method with self._datasrc_client
           and the given rrset, and compares the return value.
           Function does not do much but makes the code look nicer'''
        self.assertEqual(expected, method(rrset))

    def __check_prerequisite_exists_combined(self, method, rrclass, expected):
        '''shared code for the checks for the very similar (but reversed
           in behaviour) methods __prereq_rrset_exists and
           __prereq_rrset_does_not_exist.
           For rrset_exists, rrclass should be ANY, for rrset_does_not_exist,
           it should be NONE.
        '''
        # Basic existence checks
        # www.example.org should have an A, but not an MX
        rrset = create_rrset("www.example.org", rrclass, RRType.A(), 0)
        self.__prereq_helper(method, expected, rrset)
        rrset = create_rrset("www.example.org", rrclass, RRType.MX(), 0)
        self.__prereq_helper(method, not expected, rrset)

        # example.org should have an MX, but not an A
        rrset = create_rrset("example.org", rrclass, RRType.MX(), 0)
        self.__prereq_helper(method, expected, rrset)
        rrset = create_rrset("example.org", rrclass, RRType.A(), 0)
        self.__prereq_helper(method, not expected, rrset)

        # Also check the case where the name does not even exist
        rrset = create_rrset("doesnotexist.example.org", rrclass, RRType.A(), 0)
        self.__prereq_helper(method, not expected, rrset)

        # Wildcard expansion should not be applied, but literal matches
        # should work
        rrset = create_rrset("foo.wildcard.example.org", rrclass, RRType.A(), 0)
        self.__prereq_helper(method, not expected, rrset)

        rrset = create_rrset("*.wildcard.example.org", rrclass, RRType.A(), 0)
        self.__prereq_helper(method, expected, rrset)

        # Likewise, CNAME directly should match, but what it points to should
        # not
        rrset = create_rrset("cname.example.org", rrclass, RRType.A(), 0)
        self.__prereq_helper(method, not expected, rrset)

        rrset = create_rrset("cname.example.org", rrclass, RRType.CNAME(), 0)
        self.__prereq_helper(method, expected, rrset)

        # And also make sure a delegation (itself) is not treated as existing
        # data
        rrset = create_rrset("foo.sub.example.org", rrclass, RRType.A(), 0)
        self.__prereq_helper(method, not expected, rrset)
        # But the delegation data itself should match
        rrset = create_rrset("sub.example.org", rrclass, RRType.NS(), 0)
        self.__prereq_helper(method, expected, rrset)
        # As should glue
        rrset = create_rrset("ns.sub.example.org", rrclass, RRType.A(), 0)
        self.__prereq_helper(method, expected, rrset)

    def test_check_prerequisite_exists(self):
        method = self._session._UpdateSession__prereq_rrset_exists
        self.__check_prerequisite_exists_combined(method,
                                                  RRClass.ANY(),
                                                  True)

    def test_check_prerequisite_does_not_exist(self):
        method = self._session._UpdateSession__prereq_rrset_does_not_exist
        self.__check_prerequisite_exists_combined(method,
                                                  RRClass.NONE(),
                                                  False)

    def test_check_prerequisite_exists_value(self):
        method = self._session._UpdateSession__prereq_rrset_exists_value

        rrset = create_rrset("www.example.org", RRClass.IN(), RRType.A(), 0)
        # empty one should not match
        self.__prereq_helper(method, False, rrset)

        # When the rdata is added, it should match
        add_rdata(rrset, "192.0.2.1")
        self.__prereq_helper(method, True, rrset)

        # But adding more should not
        add_rdata(rrset, "192.0.2.2")
        self.__prereq_helper(method, False, rrset)

        # Also test one with more than one RR
        rrset = create_rrset("example.org", RRClass.IN(), RRType.NS(), 0)
        self.__prereq_helper(method, False, rrset)
        add_rdata(rrset, "ns1.example.org.")
        self.__prereq_helper(method, False, rrset)
        add_rdata(rrset, "ns2.example.org")
        self.__prereq_helper(method, False, rrset)
        add_rdata(rrset, "ns3.example.org.")
        self.__prereq_helper(method, True, rrset)
        add_rdata(rrset, "ns4.example.org.")
        self.__prereq_helper(method, False, rrset)

        # Repeat that, but try a different order of Rdata addition
        rrset = create_rrset("example.org", RRClass.IN(), RRType.NS(), 0)
        self.__prereq_helper(method, False, rrset)
        add_rdata(rrset, "ns3.example.org.")
        self.__prereq_helper(method, False, rrset)
        add_rdata(rrset, "ns2.example.org.")
        self.__prereq_helper(method, False, rrset)
        add_rdata(rrset, "ns1.example.org.")
        self.__prereq_helper(method, True, rrset)
        add_rdata(rrset, "ns4.example.org.")
        self.__prereq_helper(method, False, rrset)

        # and test one where the name does not even exist
        rrset = create_rrset("doesnotexist.example.org", RRClass.IN(),
                             RRType.A(), 0, [ "192.0.2.1" ])
        self.__prereq_helper(method, False, rrset)

    def __check_prerequisite_name_in_use_combined(self, method, rrclass,
                                                  expected):
        '''shared code for the checks for the very similar (but reversed
           in behaviour) methods __prereq_name_in_use and
           __prereq_name_not_in_use
        '''
        rrset = create_rrset("example.org", rrclass, RRType.ANY(), 0)
        self.__prereq_helper(method, expected, rrset)

        rrset = create_rrset("www.example.org", rrclass, RRType.ANY(), 0)
        self.__prereq_helper(method, expected, rrset)

        rrset = create_rrset("doesnotexist.example.org", rrclass,
                             RRType.ANY(), 0)
        self.__prereq_helper(method, not expected, rrset)

        rrset = create_rrset("belowdelegation.sub.example.org", rrclass,
                             RRType.ANY(), 0)
        self.__prereq_helper(method, not expected, rrset)

        rrset = create_rrset("foo.wildcard.example.org", rrclass,
                             RRType.ANY(), 0)
        self.__prereq_helper(method, not expected, rrset)

        # empty nonterminal should not match
        rrset = create_rrset("nonterminal.example.org", rrclass,
                             RRType.ANY(), 0)
        self.__prereq_helper(method, not expected, rrset)
        rrset = create_rrset("empty.nonterminal.example.org", rrclass,
                             RRType.ANY(), 0)
        self.__prereq_helper(method, expected, rrset)

    def test_check_prerequisite_name_in_use(self):
        method = self._session._UpdateSession__prereq_name_in_use
        self.__check_prerequisite_name_in_use_combined(method,
                                                       RRClass.ANY(),
                                                       True)

    def test_check_prerequisite_name_not_in_use(self):
        method = self._session._UpdateSession__prereq_name_not_in_use
        self.__check_prerequisite_name_in_use_combined(method,
                                                       RRClass.NONE(),
                                                       False)

    def check_prerequisite_result(self, expected, prerequisites):
        '''Helper method for checking the result of a prerequisite check;
           creates an update session, and fills it with the list of rrsets
           from 'prerequisites'. Then checks if __check_prerequisites()
           returns the Rcode specified in 'expected'.'''
        msg = create_update_msg([TEST_ZONE_RECORD], prerequisites)
        zconfig = ZoneConfig([], TEST_RRCLASS, self._datasrc_client,
                             self._acl_map)
        session = UpdateSession(msg, TEST_CLIENT4, zconfig)
        session._UpdateSession__get_update_zone()
        # compare the to_text output of the rcodes (nicer error messages)
        # This call itself should also be done by handle(),
        # but just for better failures, it is first called on its own
        self.assertEqual(expected.to_text(),
            session._UpdateSession__check_prerequisites().to_text())
        # Now see if handle finds the same result
        (result, _, _) = session.handle()
        self.assertEqual(expected.to_text(),
                         session._UpdateSession__message.get_rcode().to_text())
        # And that the result looks right
        if expected == Rcode.NOERROR():
            self.assertEqual(UPDATE_SUCCESS, result)
        else:
            self.assertEqual(UPDATE_ERROR, result)

    def check_prescan_result(self, expected, updates, expected_soa = None):
        '''Helper method for checking the result of a prerequisite check;
           creates an update session, and fills it with the list of rrsets
           from 'updates'. Then checks if __do_prescan()
           returns the Rcode specified in 'expected'.'''
        msg = create_update_msg([TEST_ZONE_RECORD], [], updates)
        zconfig = ZoneConfig([], TEST_RRCLASS, self._datasrc_client,
                             self._acl_map)
        session = UpdateSession(msg, TEST_CLIENT4, zconfig)
        session._UpdateSession__get_update_zone()
        # compare the to_text output of the rcodes (nicer error messages)
        # This call itself should also be done by handle(),
        # but just for better failures, it is first called on its own
        self.assertEqual(expected.to_text(),
            session._UpdateSession__do_prescan().to_text())
        # If there is an expected soa, check it
        self.assertEqual(str(expected_soa),
                         str(session._UpdateSession__added_soa))

    def check_full_handle_result(self, expected, updates):
        '''Helper method for checking the result of a full handle;
           creates an update session, and fills it with the list of rrsets
           from 'updates'. Then checks if __handle()
           results in a response with rcode 'expected'.'''
        msg = create_update_msg([TEST_ZONE_RECORD], [], updates)
        zconfig = ZoneConfig([], TEST_RRCLASS, self._datasrc_client,
                             self._acl_map)
        session = UpdateSession(msg, TEST_CLIENT4, zconfig)

        # Now see if handle finds the same result
        (result, _, _) = session.handle()
        self.assertEqual(expected.to_text(),
                         session._UpdateSession__message.get_rcode().to_text())
        # And that the result looks right
        if expected == Rcode.NOERROR():
            self.assertEqual(UPDATE_SUCCESS, result)
        else:
            self.assertEqual(UPDATE_ERROR, result)


    def test_check_prerequisites(self):
        # This test checks if the actual prerequisite-type-specific
        # methods are called.
        # It does test all types of prerequisites, but it does not test
        # every possible result for those types (those are tested above,
        # in the specific prerequisite type tests)

        # Let's first define a number of prereq's that should succeed
        rrset_exists_yes = create_rrset("example.org", RRClass.ANY(),
                                        RRType.SOA(), 0)

        rrset_exists_value_yes = create_rrset("www.example.org", RRClass.IN(),
                                              RRType.A(), 0, [ "192.0.2.1" ])

        rrset_does_not_exist_yes = create_rrset("foo.example.org",
                                                RRClass.NONE(), RRType.SOA(),
                                                0)

        name_in_use_yes = create_rrset("www.example.org", RRClass.ANY(),
                                       RRType.ANY(), 0)

        name_not_in_use_yes = create_rrset("foo.example.org", RRClass.NONE(),
                                           RRType.ANY(), 0)

        rrset_exists_value_1 = create_rrset("example.org", RRClass.IN(),
                                            RRType.NS(), 0,
                                            [ "ns1.example.org" ])
        rrset_exists_value_2 = create_rrset("example.org", RRClass.IN(),
                                            RRType.NS(), 0,
                                            [ "ns2.example.org" ])
        rrset_exists_value_3 = create_rrset("example.org", RRClass.IN(),
                                            RRType.NS(), 0,
                                            [ "ns3.example.org" ])

        # and a number that should not
        rrset_exists_no = create_rrset("foo.example.org", RRClass.ANY(),
                                       RRType.SOA(), 0)

        rrset_exists_value_no = create_rrset("www.example.org", RRClass.IN(),
                                             RRType.A(), 0, [ "192.0.2.2" ])

        rrset_does_not_exist_no = create_rrset("example.org", RRClass.NONE(),
                                               RRType.SOA(), 0)

        name_in_use_no = create_rrset("foo.example.org", RRClass.ANY(),
                                      RRType.ANY(), 0)

        name_not_in_use_no = create_rrset("www.example.org", RRClass.NONE(),
                                          RRType.ANY(), 0)
        # check 'no' result codes
        self.check_prerequisite_result(Rcode.NXRRSET(),
                                       [ rrset_exists_no ])
        self.check_prerequisite_result(Rcode.NXRRSET(),
                                       [ rrset_exists_value_no ])
        self.check_prerequisite_result(Rcode.YXRRSET(),
                                       [ rrset_does_not_exist_no ])
        self.check_prerequisite_result(Rcode.NXDOMAIN(),
                                       [ name_in_use_no ])
        self.check_prerequisite_result(Rcode.YXDOMAIN(),
                                       [ name_not_in_use_no ])

        # the 'yes' codes should result in ok
        # individually
        self.check_prerequisite_result(Rcode.NOERROR(),
                                       [ rrset_exists_yes ] )
        self.check_prerequisite_result(Rcode.NOERROR(),
                                       [ rrset_exists_value_yes ])
        self.check_prerequisite_result(Rcode.NOERROR(),
                                       [ rrset_does_not_exist_yes ])
        self.check_prerequisite_result(Rcode.NOERROR(),
                                       [ name_in_use_yes ])
        self.check_prerequisite_result(Rcode.NOERROR(),
                                       [ name_not_in_use_yes ])
        self.check_prerequisite_result(Rcode.NOERROR(),
                                       [ rrset_exists_value_1,
                                         rrset_exists_value_2,
                                         rrset_exists_value_3])

        # and together
        self.check_prerequisite_result(Rcode.NOERROR(),
                                       [ rrset_exists_yes,
                                         rrset_exists_value_yes,
                                         rrset_does_not_exist_yes,
                                         name_in_use_yes,
                                         name_not_in_use_yes,
                                         rrset_exists_value_1,
                                         rrset_exists_value_2,
                                         rrset_exists_value_3])

        # try out a permutation, note that one rrset is split up,
        # and the order of the RRs should not matter
        self.check_prerequisite_result(Rcode.NOERROR(),
                                       [ rrset_exists_value_3,
                                         rrset_exists_yes,
                                         rrset_exists_value_2,
                                         name_in_use_yes,
                                         rrset_exists_value_1])

        # Should fail on the first error, even if most of the
        # prerequisites are ok
        self.check_prerequisite_result(Rcode.NXDOMAIN(),
                                       [ rrset_exists_value_3,
                                         rrset_exists_yes,
                                         rrset_exists_value_2,
                                         name_in_use_yes,
                                         name_in_use_no,
                                         rrset_exists_value_1])

    def test_prerequisite_notzone(self):
        rrset = create_rrset("some.other.zone.", RRClass.ANY(), RRType.SOA(), 0)
        self.check_prerequisite_result(Rcode.NOTZONE(), [ rrset ])

    def test_prerequisites_formerr(self):
        # test for form errors in the prerequisite section

        # Class ANY, non-zero TTL
        rrset = create_rrset("example.org", RRClass.ANY(), RRType.SOA(), 1)
        self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])

        # Class ANY, but with rdata
        rrset = create_rrset("example.org", RRClass.ANY(), RRType.A(), 0,
                             [ b'\x00\x00\x00\x00' ])
        self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])

        # Class NONE, non-zero TTL
        rrset = create_rrset("example.org", RRClass.NONE(), RRType.SOA(), 1)
        self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])

        # Class NONE, but with rdata
        rrset = create_rrset("example.org", RRClass.NONE(), RRType.A(), 0,
                             [ b'\x00\x00\x00\x00' ])
        self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])

        # Matching class and type, but non-zero TTL
        rrset = create_rrset("www.example.org", RRClass.IN(), RRType.A(), 1,
                             [ "192.0.2.1" ])
        self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])

        # Completely different class
        rrset = create_rrset("example.org", RRClass.CH(), RRType.TXT(), 0,
                             [ "foo" ])
        self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])

    def __prereq_helper(self, method, expected, rrset):
        '''Calls the given method with self._datasrc_client
           and the given rrset, and compares the return value.
           Function does not do much but makes the code look nicer'''
        self.assertEqual(expected, method(rrset))

    def __initialize_update_rrsets(self):
        '''Prepare a number of RRsets to be used in several update tests
           The rrsets are stored in self'''
        orig_a_rrset = create_rrset("www.example.org", TEST_RRCLASS,
                                    RRType.A(), 3600, [ "192.0.2.1" ])
        self.orig_a_rrset = orig_a_rrset

        rrset_update_a = create_rrset("www.example.org", TEST_RRCLASS,
                                      RRType.A(), 3600,
                                      [ "192.0.2.2", "192.0.2.3" ])
        self.rrset_update_a = rrset_update_a

        rrset_update_soa = create_rrset("example.org", TEST_RRCLASS,
                                        RRType.SOA(), 3600,
                                        [ "ns1.example.org. " +
                                          "admin.example.org. " +
                                          "1233 3600 1800 2419200 7200" ])
        self.rrset_update_soa = rrset_update_soa

        rrset_update_soa_del = create_rrset("example.org", RRClass.NONE(),
                                            RRType.SOA(), 0,
                                            [ "ns1.example.org. " +
                                              "admin.example.org. " +
                                              "1233 3600 1800 2419200 7200" ])
        self.rrset_update_soa_del = rrset_update_soa_del


        rrset_update_soa2 = create_rrset("example.org", TEST_RRCLASS,
                                         RRType.SOA(), 3600,
                                         [ "ns1.example.org. " +
                                           "admin.example.org. " +
                                           "4000 3600 1800 2419200 7200" ])
        self.rrset_update_soa2 = rrset_update_soa2

        rrset_update_del_name = create_rrset("www.example.org", RRClass.ANY(),
                                             RRType.ANY(), 0)
        self.rrset_update_del_name = rrset_update_del_name

        rrset_update_del_name_apex = create_rrset("example.org", RRClass.ANY(),
                                                  RRType.ANY(), 0)
        self.rrset_update_del_name_apex = rrset_update_del_name_apex

        rrset_update_del_rrset = create_rrset("www.example.org", RRClass.ANY(),
                                              RRType.A(), 0)
        self.rrset_update_del_rrset = rrset_update_del_rrset

        rrset_update_del_mx_apex = create_rrset("example.org", RRClass.ANY(),
                                                RRType.MX(), 0)
        self.rrset_update_del_mx_apex = rrset_update_del_mx_apex

        rrset_update_del_soa_apex = create_rrset("example.org", RRClass.ANY(),
                                                 RRType.SOA(), 0)
        self.rrset_update_del_soa_apex = rrset_update_del_soa_apex

        rrset_update_del_ns_apex = create_rrset("example.org", RRClass.ANY(),
                                                RRType.NS(), 0)
        self.rrset_update_del_ns_apex = rrset_update_del_ns_apex

        rrset_update_del_rrset_part = create_rrset("www.example.org",
                                                   RRClass.NONE(), RRType.A(),
                                                   0,
                                                   [ b'\xc0\x00\x02\x02',
                                                     b'\xc0\x00\x02\x03' ])
        self.rrset_update_del_rrset_part = rrset_update_del_rrset_part

        rrset_update_del_rrset_ns = create_rrset("example.org", RRClass.NONE(),
                                        RRType.NS(), 0,
                                        [ b'\x03ns1\x07example\x03org\x00',
                                          b'\x03ns2\x07example\x03org\x00',
                                          b'\x03ns3\x07example\x03org\x00' ])
        self.rrset_update_del_rrset_ns = rrset_update_del_rrset_ns

        rrset_update_del_rrset_mx = create_rrset("example.org", RRClass.NONE(),
                                RRType.MX(), 0,
                                [ b'\x00\x0a\x04mail\x07example\x03org\x00' ])
        self.rrset_update_del_rrset_mx = rrset_update_del_rrset_mx

    def test_prescan(self):
        '''Test whether the prescan succeeds on data that is ok, and whether
           if notices the SOA if present'''
        # prepare a set of correct update statements
        self.__initialize_update_rrsets()

        self.check_prescan_result(Rcode.NOERROR(), [ self.rrset_update_a ])

        # check if soa is noticed
        self.check_prescan_result(Rcode.NOERROR(), [ self.rrset_update_soa ],
                                  self.rrset_update_soa)

        # Other types of succesful prechecks
        self.check_prescan_result(Rcode.NOERROR(), [ self.rrset_update_soa2 ],
                                  self.rrset_update_soa2)
        self.check_prescan_result(Rcode.NOERROR(),
                                  [ self.rrset_update_del_name ])
        self.check_prescan_result(Rcode.NOERROR(),
                                  [ self.rrset_update_del_name_apex ])
        self.check_prescan_result(Rcode.NOERROR(),
                                  [ self.rrset_update_del_rrset ])
        self.check_prescan_result(Rcode.NOERROR(),
                                  [ self.rrset_update_del_mx_apex ])
        self.check_prescan_result(Rcode.NOERROR(),
                                  [ self.rrset_update_del_rrset_part ])

        # and check a few permutations of the above
        # all of them (with one of the soas)
        self.check_prescan_result(Rcode.NOERROR(),
                                  [
                                    self.rrset_update_a,
                                    self.rrset_update_soa,
                                    self.rrset_update_del_name,
                                    self.rrset_update_del_name_apex,
                                    self.rrset_update_del_rrset,
                                    self.rrset_update_del_mx_apex,
                                    self.rrset_update_del_rrset_part
                                  ],
                                  self.rrset_update_soa)

        # Two soas. Should we reject or simply use the last?
        # (RFC is not really explicit on this, but between the lines I read
        # use the last)
        self.check_prescan_result(Rcode.NOERROR(),
                                  [ self.rrset_update_soa,
                                    self.rrset_update_soa2 ],
                                    self.rrset_update_soa2)
        self.check_prescan_result(Rcode.NOERROR(),
                                  [ self.rrset_update_soa2,
                                    self.rrset_update_soa ],
                                  self.rrset_update_soa)

        self.check_prescan_result(Rcode.NOERROR(),
                                  [
                                    self.rrset_update_del_mx_apex,
                                    self.rrset_update_del_name,
                                    self.rrset_update_del_name_apex,
                                    self.rrset_update_del_rrset_part,
                                    self.rrset_update_a,
                                    self.rrset_update_del_rrset,
                                    self.rrset_update_soa
                                  ],
                                  self.rrset_update_soa)

    def test_prescan_failures(self):
        '''Test whether prescan fails on bad data'''
        # out of zone data
        rrset = create_rrset("different.zone", RRClass.ANY(), RRType.TXT(), 0)
        self.check_prescan_result(Rcode.NOTZONE(), [ rrset ])


        # forbidden type, zone class
        rrset = create_rrset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.ANY(), 0,
                             [ b'\x00' ])
        self.check_prescan_result(Rcode.FORMERR(), [ rrset ])

        # non-zero TTL, class ANY
        rrset = create_rrset(TEST_ZONE_NAME, RRClass.ANY(), RRType.TXT(), 1)
        self.check_prescan_result(Rcode.FORMERR(), [ rrset ])

        # non-zero Rdata, class ANY
        rrset = create_rrset(TEST_ZONE_NAME, RRClass.ANY(), RRType.TXT(), 0,
                             [ "foo" ])
        self.check_prescan_result(Rcode.FORMERR(), [ rrset ])

        # forbidden type, class ANY
        rrset = create_rrset(TEST_ZONE_NAME, RRClass.ANY(), RRType.AXFR(), 0,
                             [ b'\x00' ])
        self.check_prescan_result(Rcode.FORMERR(), [ rrset ])

        # non-zero TTL, class NONE
        rrset = create_rrset(TEST_ZONE_NAME, RRClass.NONE(), RRType.TXT(), 1)
        self.check_prescan_result(Rcode.FORMERR(), [ rrset ])

        # forbidden type, class NONE
        rrset = create_rrset(TEST_ZONE_NAME, RRClass.NONE(), RRType.AXFR(), 0,
                             [ b'\x00' ])
        self.check_prescan_result(Rcode.FORMERR(), [ rrset ])

    def __check_inzone_data(self, expected_result, name, rrtype,
                            expected_rrset = None):
        '''Does a find on TEST_ZONE for the given rrset's name and type,
           then checks if the result matches the expected result.
           If so, and if expected_rrset is given, they are compared as
           well.'''
        _, finder = self._datasrc_client.find_zone(TEST_ZONE_NAME)
        result, found_rrset, _ = finder.find(name, rrtype,
                                             finder.NO_WILDCARD |
                                             finder.FIND_GLUE_OK)
        self.assertEqual(expected_result, result)
        # Sigh. Need rrsets.compare() again.
        # To be sure, compare name, class, type, and ttl
        if expected_rrset is not None:
            self.assertEqual(expected_rrset.get_name(), found_rrset.get_name())
            self.assertEqual(expected_rrset.get_class(), found_rrset.get_class())
            self.assertEqual(expected_rrset.get_type(), found_rrset.get_type())
            self.assertEqual(expected_rrset.get_ttl().to_text(),
                             found_rrset.get_ttl().to_text())
            expected_rdata =\
                [ rdata.to_text() for rdata in expected_rrset.get_rdata() ]
            found_rdata =\
                [ rdata.to_text() for rdata in found_rrset.get_rdata() ]
            expected_rdata.sort()
            found_rdata.sort()
            self.assertEqual(expected_rdata, found_rdata)

    def test_update_add_delete_rrset(self):
        '''
        Tests a sequence of related add and delete updates. Some other
        cases are tested by later tests.
        '''
        self.__initialize_update_rrsets()

        # initially, the www should only contain one rr
        # (set to self.orig_a_rrset)

        # during this test, we will extend it at some point
        extended_a_rrset = create_rrset("www.example.org", TEST_RRCLASS,
                                        RRType.A(), 3600,
                                        [ "192.0.2.1",
                                          "192.0.2.2",
                                          "192.0.2.3" ])

        # Sanity check, make sure original data is really there before updates
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("www.example.org"),
                                 RRType.A(),
                                 self.orig_a_rrset)

        # Add two rrs
        self.check_full_handle_result(Rcode.NOERROR(), [ self.rrset_update_a ])

        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("www.example.org"),
                                 RRType.A(),
                                 extended_a_rrset)

        # Adding the same RRsets should not make a difference.
        self.check_full_handle_result(Rcode.NOERROR(), [ self.rrset_update_a ])

        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("www.example.org"),
                                 RRType.A(),
                                 extended_a_rrset)

        # Now delete those two, and we should end up with the original RRset
        self.check_full_handle_result(Rcode.NOERROR(),
                                      [ self.rrset_update_del_rrset_part ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("www.example.org"),
                                 RRType.A(),
                                 self.orig_a_rrset)

        # 'Deleting' them again should make no difference
        self.check_full_handle_result(Rcode.NOERROR(),
                                      [ self.rrset_update_del_rrset_part ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("www.example.org"),
                                 RRType.A(),
                                 self.orig_a_rrset)

        # But deleting the entire rrset, independent of its contents, should
        # work
        self.check_full_handle_result(Rcode.NOERROR(),
                                      [ self.rrset_update_del_rrset ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
                                 isc.dns.Name("www.example.org"),
                                 RRType.A())

        # Check that if we update the SOA, it is updated to our value
        self.check_full_handle_result(Rcode.NOERROR(),
                                      [ self.rrset_update_soa2 ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("example.org"),
                                 RRType.SOA(),
                                 self.rrset_update_soa2)

    def test_glue_deletions(self):
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("sub.example.org."),
                                 RRType.NS())
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("ns.sub.example.org."),
                                 RRType.A())

        # See that we can delete glue
        rrset_delete_glue = create_rrset("ns.sub.example.org.",
                                         RRClass.ANY(),
                                         RRType.A(),
                                         0)
        self.check_full_handle_result(Rcode.NOERROR(),
                                      [ rrset_delete_glue ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("sub.example.org."),
                                 RRType.NS())
        self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
                                 isc.dns.Name("ns.sub.example.org."),
                                 RRType.A())

        # Check that we don't accidentally delete a delegation if we
        # try to delete non-existent glue
        rrset_delete_nonexistent_glue = create_rrset("foo.sub.example.org.",
                                                     RRClass.ANY(),
                                                     RRType.A(),
                                                     0)
        self.check_full_handle_result(Rcode.NOERROR(),
                                      [ rrset_delete_nonexistent_glue ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("sub.example.org."),
                                 RRType.NS())

    def test_update_add_new_data(self):
        '''
        This tests adds data where none is present
        '''
        # Add data at a completely new name
        self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
                                 isc.dns.Name("new.example.org"),
                                 RRType.A())
        rrset = create_rrset("new.example.org", TEST_RRCLASS, RRType.A(),
                             3600, [ "192.0.2.1", "192.0.2.2" ])
        self.check_full_handle_result(Rcode.NOERROR(), [ rrset ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("new.example.org"),
                                 RRType.A(),
                                 rrset)

        # Also try a name where data is present, but none of this
        # specific type
        self.__check_inzone_data(isc.datasrc.ZoneFinder.NXRRSET,
                                 isc.dns.Name("new.example.org"),
                                 RRType.TXT())
        rrset = create_rrset("new.example.org", TEST_RRCLASS, RRType.TXT(),
                             3600, [ "foo" ])
        self.check_full_handle_result(Rcode.NOERROR(), [ rrset ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("new.example.org"),
                                 RRType.TXT(),
                                 rrset)

    def test_update_add_new_data_interspersed(self):
        '''
        This tests adds data where none is present, similar to
        test_update_add_new_data, but this time the second RRset
        is put into the record between the two RRs of the first
        RRset.
        '''
        # Add data at a completely new name
        self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
                                 isc.dns.Name("new_a.example.org"),
                                 RRType.A())
        self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
                                 isc.dns.Name("new_txt.example.org"),
                                 RRType.TXT())

        rrset1 = create_rrset("new_a.example.org", TEST_RRCLASS, RRType.A(),
                              3600, [ "192.0.2.1" ])

        rrset2 = create_rrset("new_txt.example.org", TEST_RRCLASS, RRType.TXT(),
                              3600, [ "foo" ])

        rrset3 = create_rrset("new_a.example.org", TEST_RRCLASS, RRType.A(),
                              3600, [ "192.0.2.2" ])

        self.check_full_handle_result(Rcode.NOERROR(),
                                      [ rrset1, rrset2, rrset3 ])

        # The update should have merged rrset1 and rrset3
        rrset_merged = create_rrset("new_a.example.org", TEST_RRCLASS,
                                    RRType.A(), 3600,
                                    [ "192.0.2.1", "192.0.2.2" ])

        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("new_a.example.org"),
                                 RRType.A(),
                                 rrset_merged)

        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("new_txt.example.org"),
                                 RRType.TXT(),
                                 rrset2)

    def test_update_delete_name(self):
        self.__initialize_update_rrsets()

        # First check it is there
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("www.example.org"),
                                 RRType.A())

        # Delete the entire name
        self.check_full_handle_result(Rcode.NOERROR(),
                                      [ self.rrset_update_del_name ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
                                 isc.dns.Name("www.example.org"),
                                 RRType.A())

        # Should still be gone after pointless second delete
        self.check_full_handle_result(Rcode.NOERROR(),
                                      [ self.rrset_update_del_name ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
                                 isc.dns.Name("www.example.org"),
                                 RRType.A())

    def test_update_apex_special_cases(self):
        '''
        Tests a few special cases when deleting data from the apex
        '''
        self.__initialize_update_rrsets()

        # the original SOA
        orig_soa_rrset = create_rrset("example.org", TEST_RRCLASS,
                                      RRType.SOA(), 3600,
                                      [ "ns1.example.org. " +
                                        "admin.example.org. " +
                                        "1234 3600 1800 2419200 7200" ])

        # We will delete some of the NS records
        orig_ns_rrset = create_rrset("example.org", TEST_RRCLASS,
                                      RRType.NS(), 3600,
                                      [ "ns1.example.org.",
                                        "ns2.example.org.",
                                        "ns3.example.org." ])

        # Sanity check, make sure original data is really there before updates
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("example.org"),
                                 RRType.NS(),
                                 orig_ns_rrset)
        # We will delete the MX record later in this test, so let's make
        # sure that it exists (we do not care about its value)
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("example.org"),
                                 RRType.MX())

        # Check that we cannot delete the SOA record by direction deletion
        # both by name+type and by full rrset
        self.check_full_handle_result(Rcode.NOERROR(),
                                      [ self.rrset_update_del_soa_apex,
                                        self.rrset_update_soa_del ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("example.org"),
                                 RRType.SOA(),
                                 orig_soa_rrset)

        # If we delete everything at the apex, the SOA and NS rrsets should be
        # untouched
        self.check_full_handle_result(Rcode.NOERROR(),
                                      [ self.rrset_update_del_name_apex ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("example.org"),
                                 RRType.SOA(),
                                 orig_soa_rrset)
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("example.org"),
                                 RRType.NS(),
                                 orig_ns_rrset)
        # but the MX should be gone
        self.__check_inzone_data(isc.datasrc.ZoneFinder.NXRRSET,
                                 isc.dns.Name("example.org"),
                                 RRType.MX())

        # Deleting the NS rrset by name and type only, it should also be left
        # untouched
        self.check_full_handle_result(Rcode.NOERROR(),
                                      [ self.rrset_update_del_ns_apex ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("example.org"),
                                 RRType.NS(),
                                 orig_ns_rrset)

    def DISABLED_test_update_apex_special_case_ns_rrset(self):
        # If we delete the NS at the apex specifically, it should still
        # keep one record
        self.__initialize_update_rrsets()
        # When we are done, we should have a reduced NS rrset
        short_ns_rrset = create_rrset("example.org", TEST_RRCLASS,
                                      RRType.NS(), 3600,
                                      [ "ns3.example.org." ])
        self.check_full_handle_result(Rcode.NOERROR(),
                                      [ self.rrset_update_del_rrset_ns ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("example.org"),
                                 RRType.NS(),
                                 short_ns_rrset)

    def test_update_delete_normal_rrset_at_apex(self):
        '''
        Tests a number of 'normal rrset' deletes at the apex
        '''

        # MX should simply be deleted
        self.__initialize_update_rrsets()
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("example.org"),
                                 RRType.MX())
        self.check_full_handle_result(Rcode.NOERROR(),
                                      [ self.rrset_update_del_rrset_mx ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.NXRRSET,
                                 isc.dns.Name("example.org"),
                                 RRType.MX())

    def test_update_cname_special_cases(self):
        self.__initialize_update_rrsets()

        # Sanity check
        orig_cname_rrset = create_rrset("cname.example.org", TEST_RRCLASS,
                                        RRType.CNAME(), 3600,
                                        [ "www.example.org." ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.CNAME,
                                 isc.dns.Name("cname.example.org"),
                                 RRType.A(),
                                 orig_cname_rrset)

        # If we try to add data where a cname is preset
        rrset = create_rrset("cname.example.org", TEST_RRCLASS, RRType.A(),
                             3600, [ "192.0.2.1" ])

        self.check_full_handle_result(Rcode.NOERROR(), [ rrset ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.CNAME,
                                 isc.dns.Name("cname.example.org"),
                                 RRType.A(),
                                 orig_cname_rrset)

        # But updating the cname itself should work
        new_cname_rrset = create_rrset("cname.example.org", TEST_RRCLASS,
                                       RRType.CNAME(), 3600,
                                       [ "mail.example.org." ])
        self.check_full_handle_result(Rcode.NOERROR(), [ new_cname_rrset ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.CNAME,
                                 isc.dns.Name("cname.example.org"),
                                 RRType.A(),
                                 new_cname_rrset)

        self.__initialize_update_rrsets()

        # Likewise, adding a cname where other data is
        # present should do nothing either
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("www.example.org"),
                                 RRType.A(),
                                 self.orig_a_rrset)
        new_cname_rrset = create_rrset("www.example.org", TEST_RRCLASS,
                                       RRType.CNAME(), 3600,
                                       [ "mail.example.org." ])
        self.check_full_handle_result(Rcode.NOERROR(), [ new_cname_rrset ])
        self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
                                 isc.dns.Name("www.example.org"),
                                 RRType.A(),
                                 self.orig_a_rrset)

    def test_update_bad_class(self):
        rrset = create_rrset("example.org.", RRClass.CH(), RRType.TXT(), 0,
                             [ "foo" ])
        self.check_full_handle_result(Rcode.FORMERR(), [ rrset ])

    def test_uncaught_exception(self):
        def my_exc():
            raise Exception("foo")
        self._session._UpdateSession__update_soa = my_exc
        self.assertEqual(Rcode.SERVFAIL().to_text(),
                         self._session._UpdateSession__do_update().to_text())

class SessionACLTest(SessionTestBase):
    '''ACL related tests for update session.'''
    def test_update_acl_check(self):
        '''Test for various ACL checks.

        Note that accepted cases are covered in the basic tests.

        '''
        # create a separate session, with default (empty) ACL map.
        session = UpdateSession(self._update_msg,
                                TEST_CLIENT4, ZoneConfig([], TEST_RRCLASS,
                                                         self._datasrc_client))
        # then the request should be rejected.
        self.assertEqual((UPDATE_ERROR, None, None), session.handle())

        # recreate the request message, and test with an ACL that would result
        # in 'DROP'.  get_message() should return None.
        msg = create_update_msg()
        acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
                       REQUEST_LOADER.load([{"action": "DROP", "from":
                                                 TEST_CLIENT4[0]}])}
        session = UpdateSession(msg, TEST_CLIENT4,
                                ZoneConfig([], TEST_RRCLASS,
                                           self._datasrc_client, acl_map))
        self.assertEqual((UPDATE_DROP, None, None), session.handle())
        self.assertEqual(None, session.get_message())

    def test_update_tsigacl_check(self):
        '''Test for various ACL checks using TSIG.'''
        # This ACL will accept requests from TEST_CLIENT4 (any port) *and*
        # has TSIG signed by TEST_ZONE_NAME; all others will be rejected.
        acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
                       REQUEST_LOADER.load([{"action": "ACCEPT",
                                             "from": TEST_CLIENT4[0],
                                             "key": TEST_ZONE_NAME.to_text()}])}

        # If the message doesn't contain TSIG, it doesn't match the ACCEPT
        # ACL entry, and the request should be rejected.
        session = UpdateSession(self._update_msg,
                                TEST_CLIENT4, ZoneConfig([], TEST_RRCLASS,
                                                         self._datasrc_client,
                                                         acl_map))
        self.assertEqual((UPDATE_ERROR, None, None), session.handle())
        self.check_response(session.get_message(), Rcode.REFUSED())

        # If the message contains TSIG, it should match the ACCEPT
        # ACL entry, and the request should be granted.
        session = UpdateSession(create_update_msg(tsig_key=TEST_TSIG_KEY),
                                TEST_CLIENT4, ZoneConfig([], TEST_RRCLASS,
                                                         self._datasrc_client,
                                                         acl_map))
        self.assertEqual((UPDATE_SUCCESS, TEST_ZONE_NAME, TEST_RRCLASS),
                         session.handle())

if __name__ == "__main__":
    isc.log.init("bind10")
    isc.log.resetUnitTestRootLogger()
    unittest.main()