|
@@ -24,9 +24,10 @@ TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
|
|
|
TESTDATA_WRITE_PATH = os.environ['TESTDATA_WRITE_PATH'] + os.sep
|
|
|
|
|
|
READ_ZONE_DB_FILE = TESTDATA_PATH + "example.com.sqlite3"
|
|
|
-BROKEN_DB_FILE = TESTDATA_PATH + "brokendb.sqlite3"
|
|
|
WRITE_ZONE_DB_FILE = TESTDATA_WRITE_PATH + "rwtest.sqlite3.copied"
|
|
|
-NEW_DB_FILE = TESTDATA_WRITE_PATH + "new_db.sqlite3"
|
|
|
+
|
|
|
+READ_ZONE_DB_CONFIG = "{ \"database_file\": \"" + READ_ZONE_DB_FILE + "\" }"
|
|
|
+WRITE_ZONE_DB_CONFIG = "{ \"database_file\": \"" + WRITE_ZONE_DB_FILE + "\"}"
|
|
|
|
|
|
def add_rrset(rrset_list, name, rrclass, rrtype, ttl, rdatas):
|
|
|
rrset_to_add = isc.dns.RRset(name, rrclass, rrtype, ttl)
|
|
@@ -59,13 +60,24 @@ def check_for_rrset(expected_rrsets, rrset):
|
|
|
|
|
|
class DataSrcClient(unittest.TestCase):
|
|
|
|
|
|
- def test_construct(self):
|
|
|
+ def test_constructors(self):
|
|
|
# can't construct directly
|
|
|
self.assertRaises(TypeError, isc.datasrc.ZoneIterator)
|
|
|
|
|
|
+ self.assertRaises(TypeError, isc.datasrc.DataSourceClient, 1, "{}")
|
|
|
+ self.assertRaises(TypeError, isc.datasrc.DataSourceClient, "sqlite3", 1)
|
|
|
+ self.assertRaises(isc.datasrc.Error,
|
|
|
+ isc.datasrc.DataSourceClient, "foo", "{}")
|
|
|
+ self.assertRaises(isc.datasrc.ConfigError,
|
|
|
+ isc.datasrc.DataSourceClient, "sqlite3", "")
|
|
|
+ self.assertRaises(isc.datasrc.ConfigError,
|
|
|
+ isc.datasrc.DataSourceClient, "sqlite3", "{}")
|
|
|
+ self.assertRaises(isc.datasrc.ConfigError,
|
|
|
+ isc.datasrc.DataSourceClient, "sqlite3",
|
|
|
+ "{ \"foo\": 1 }")
|
|
|
|
|
|
def test_iterate(self):
|
|
|
- dsc = isc.datasrc.DataSourceClient("sqlite3", "{ \"database_file\": \"" + READ_ZONE_DB_FILE + "\" }")
|
|
|
+ dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
|
|
|
|
|
|
# for RRSIGS, the TTL's are currently modified. This test should
|
|
|
# start failing when we fix that.
|
|
@@ -176,7 +188,7 @@ class DataSrcClient(unittest.TestCase):
|
|
|
self.assertRaises(TypeError, isc.datasrc.ZoneFinder)
|
|
|
|
|
|
def test_find(self):
|
|
|
- dsc = isc.datasrc.DataSourceClient("sqlite3", "{ \"database_file\": \"" + READ_ZONE_DB_FILE + "\"}")
|
|
|
+ dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
|
|
|
|
|
|
result, finder = dsc.find_zone(isc.dns.Name("example.com"))
|
|
|
self.assertEqual(finder.SUCCESS, result)
|
|
@@ -260,7 +272,7 @@ class DataSrcUpdater(unittest.TestCase):
|
|
|
|
|
|
def test_update_delete_commit(self):
|
|
|
|
|
|
- dsc = isc.datasrc.DataSourceClient("sqlite3", "{ \"database_file\": \"" + WRITE_ZONE_DB_FILE + "\"}")
|
|
|
+ dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
|
|
|
|
|
|
# first make sure, through a separate finder, that some record exists
|
|
|
result, finder = dsc.find_zone(isc.dns.Name("example.com"))
|
|
@@ -334,7 +346,7 @@ class DataSrcUpdater(unittest.TestCase):
|
|
|
rrset.to_text())
|
|
|
|
|
|
def test_update_delete_abort(self):
|
|
|
- dsc = isc.datasrc.DataSourceClient("sqlite3", "{ \"database_file\": \"" + WRITE_ZONE_DB_FILE + "\"}")
|
|
|
+ dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
|
|
|
|
|
|
# first make sure, through a separate finder, that some record exists
|
|
|
result, finder = dsc.find_zone(isc.dns.Name("example.com"))
|