datasrc_test.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. # Copyright (C) 2011 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import isc.log
  16. import isc.datasrc
  17. from isc.datasrc import ZoneFinder
  18. import isc.dns
  19. import unittest
  20. import os
  21. import shutil
  22. import json
  23. TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
  24. TESTDATA_WRITE_PATH = os.environ['TESTDATA_WRITE_PATH'] + os.sep
  25. READ_ZONE_DB_FILE = TESTDATA_PATH + "example.com.sqlite3"
  26. WRITE_ZONE_DB_FILE = TESTDATA_WRITE_PATH + "rwtest.sqlite3.copied"
  27. READ_ZONE_DB_CONFIG = "{ \"database_file\": \"" + READ_ZONE_DB_FILE + "\" }"
  28. WRITE_ZONE_DB_CONFIG = "{ \"database_file\": \"" + WRITE_ZONE_DB_FILE + "\"}"
  29. def add_rrset(rrset_list, name, rrclass, rrtype, ttl, rdatas):
  30. rrset_to_add = isc.dns.RRset(name, rrclass, rrtype, ttl)
  31. if rdatas is not None:
  32. for rdata in rdatas:
  33. rrset_to_add.add_rdata(isc.dns.Rdata(rrtype, rrclass, rdata))
  34. rrset_list.append(rrset_to_add)
  35. # helper function, we have no direct rrset comparison atm
  36. def rrsets_equal(a, b):
  37. # no accessor for sigs either (so this only checks name, class, type, ttl,
  38. # and rdata)
  39. # also, because of the fake data in rrsigs, if the type is rrsig, the
  40. # rdata is not checked
  41. return a.get_name() == b.get_name() and\
  42. a.get_class() == b.get_class() and\
  43. a.get_type() == b.get_type() and \
  44. a.get_ttl() == b.get_ttl() and\
  45. (a.get_type() == isc.dns.RRType.RRSIG() or
  46. sorted(a.get_rdata()) == sorted(b.get_rdata()))
  47. # returns true if rrset is in expected_rrsets
  48. # will remove the rrset from expected_rrsets if found
  49. def check_for_rrset(expected_rrsets, rrset):
  50. for cur_rrset in expected_rrsets[:]:
  51. if rrsets_equal(cur_rrset, rrset):
  52. expected_rrsets.remove(cur_rrset)
  53. return True
  54. return False
  55. class DataSrcClient(unittest.TestCase):
  56. def test_constructors(self):
  57. # can't construct directly
  58. self.assertRaises(TypeError, isc.datasrc.ZoneIterator)
  59. self.assertRaises(TypeError, isc.datasrc.DataSourceClient, 1, "{}")
  60. self.assertRaises(TypeError, isc.datasrc.DataSourceClient, "sqlite3", 1)
  61. self.assertRaises(isc.datasrc.Error,
  62. isc.datasrc.DataSourceClient, "foo", "{}")
  63. self.assertRaises(isc.datasrc.Error,
  64. isc.datasrc.DataSourceClient, "sqlite3", "")
  65. self.assertRaises(isc.datasrc.Error,
  66. isc.datasrc.DataSourceClient, "sqlite3", "{}")
  67. self.assertRaises(isc.datasrc.Error,
  68. isc.datasrc.DataSourceClient, "sqlite3",
  69. "{ \"foo\": 1 }")
  70. self.assertRaises(isc.datasrc.Error,
  71. isc.datasrc.DataSourceClient, "memory",
  72. "{ \"foo\": 1 }")
  73. def test_iterate(self):
  74. dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
  75. # for RRSIGS, the TTL's are currently modified. This test should
  76. # start failing when we fix that.
  77. rrs = dsc.get_iterator(isc.dns.Name("sql1.example.com."))
  78. # we do not know the order in which they are returned by the iterator
  79. # but we do want to check them, so we put all records into one list
  80. # sort it (doesn't matter which way it is sorted, as long as it is
  81. # sorted)
  82. # RRset is (atm) an unorderable type, and within an rrset, the
  83. # rdatas and rrsigs may also be in random order. In theory the
  84. # rrsets themselves can be returned in any order.
  85. #
  86. # So we create a second list with all rrsets we expect, and for each
  87. # rrset we get from the iterator, see if it is in that list, and
  88. # remove it.
  89. #
  90. # When the iterator is empty, we check no rrsets are left in the
  91. # list of expected ones
  92. expected_rrset_list = []
  93. name = isc.dns.Name("sql1.example.com")
  94. rrclass = isc.dns.RRClass.IN()
  95. add_rrset(expected_rrset_list, name, rrclass,
  96. isc.dns.RRType.DNSKEY(), isc.dns.RRTTL(3600),
  97. [
  98. "256 3 5 AwEAAdYdRhBAEY67R/8G1N5AjGF6asIiNh/pNGeQ8xDQP13J"+
  99. "N2lo+sNqWcmpYNhuVqRbLB+mamsU1XcCICSBvAlSmfz/ZUdafX23knAr"+
  100. "TlALxMmspcfdpqun3Yr3YYnztuj06rV7RqmveYckWvAUXVYMSMQZfJ30"+
  101. "5fs0dE/xLztL/CzZ",
  102. "257 3 5 AwEAAbaKDSa9XEFTsjSYpUTHRotTS9Tz3krfDucugW5UokGQ"+
  103. "KC26QlyHXlPTZkC+aRFUs/dicJX2kopndLcnlNAPWiKnKtrsFSCnIJDB"+
  104. "ZIyvcKq+9RXmV3HK3bUdHnQZ88IZWBRmWKfZ6wnzHo53kdYKAemTErkz"+
  105. "taX3lRRPLYWpxRcDPEjysXT3Lh0vfL5D+CIO1yKw/q7C+v6+/kYAxc2l"+
  106. "fbNE3HpklSuF+dyX4nXxWgzbcFuLz5Bwfq6ZJ9RYe/kNkA0uMWNa1KkG"+
  107. "eRh8gg22kgD/KT5hPTnpezUWLvoY5Qc7IB3T0y4n2JIwiF2ZrZYVrWgD"+
  108. "jRWAzGsxJiJyjd6w2k0="
  109. ])
  110. add_rrset(expected_rrset_list, name, rrclass,
  111. isc.dns.RRType.NS(), isc.dns.RRTTL(3600),
  112. [
  113. "dns01.example.com.",
  114. "dns02.example.com.",
  115. "dns03.example.com."
  116. ])
  117. add_rrset(expected_rrset_list, name, rrclass,
  118. isc.dns.RRType.NSEC(), isc.dns.RRTTL(7200),
  119. [
  120. "www.sql1.example.com. NS SOA RRSIG NSEC DNSKEY"
  121. ])
  122. # For RRSIGS, we can't add the fake data through the API, so we
  123. # simply pass no rdata at all (which is skipped by the check later)
  124. add_rrset(expected_rrset_list, name, rrclass,
  125. isc.dns.RRType.RRSIG(), isc.dns.RRTTL(3600), None)
  126. add_rrset(expected_rrset_list, name, rrclass,
  127. isc.dns.RRType.SOA(), isc.dns.RRTTL(3600),
  128. [
  129. "master.example.com. admin.example.com. 678 3600 1800 2419200 7200"
  130. ])
  131. name = isc.dns.Name("www.sql1.example.com.")
  132. add_rrset(expected_rrset_list, name, rrclass,
  133. isc.dns.RRType.A(), isc.dns.RRTTL(3600),
  134. [
  135. "192.0.2.100"
  136. ])
  137. name = isc.dns.Name("www.sql1.example.com.")
  138. add_rrset(expected_rrset_list, name, rrclass,
  139. isc.dns.RRType.NSEC(), isc.dns.RRTTL(7200),
  140. [
  141. "sql1.example.com. A RRSIG NSEC"
  142. ])
  143. add_rrset(expected_rrset_list, name, rrclass,
  144. isc.dns.RRType.RRSIG(), isc.dns.RRTTL(3600), None)
  145. # rrs is an iterator, but also has direct get_next_rrset(), use
  146. # the latter one here
  147. rrset_to_check = rrs.get_next_rrset()
  148. while (rrset_to_check != None):
  149. self.assertTrue(check_for_rrset(expected_rrset_list,
  150. rrset_to_check),
  151. "Unexpected rrset returned by iterator:\n" +
  152. rrset_to_check.to_text())
  153. rrset_to_check = rrs.get_next_rrset()
  154. # Now check there are none left
  155. self.assertEqual(0, len(expected_rrset_list),
  156. "RRset(s) not returned by iterator: " +
  157. str([rrset.to_text() for rrset in expected_rrset_list ]
  158. ))
  159. # TODO should we catch this (iterating past end) and just return None
  160. # instead of failing?
  161. self.assertRaises(isc.datasrc.Error, rrs.get_next_rrset)
  162. rrets = dsc.get_iterator(isc.dns.Name("example.com"))
  163. # there are more than 80 RRs in this zone... let's just count them
  164. # (already did a full check of the smaller zone above)
  165. self.assertEqual(55, len(list(rrets)))
  166. # TODO should we catch this (iterating past end) and just return None
  167. # instead of failing?
  168. self.assertRaises(isc.datasrc.Error, rrs.get_next_rrset)
  169. self.assertRaises(TypeError, dsc.get_iterator, "asdf")
  170. def test_construct(self):
  171. # can't construct directly
  172. self.assertRaises(TypeError, isc.datasrc.ZoneFinder)
  173. def test_findoptions(self):
  174. '''A simple test to confirm no option is specified by default.
  175. '''
  176. self.assertFalse(ZoneFinder.FIND_DEFAULT & ZoneFinder.FIND_GLUE_OK)
  177. self.assertFalse(ZoneFinder.FIND_DEFAULT & ZoneFinder.FIND_DNSSEC)
  178. self.assertFalse(ZoneFinder.FIND_DEFAULT & ZoneFinder.NO_WILDCARD)
  179. def test_findresults(self):
  180. '''A simple test to confirm result codes are (defined and) different
  181. for some combinations.
  182. '''
  183. self.assertNotEqual(ZoneFinder.SUCCESS, ZoneFinder.DELEGATION)
  184. self.assertNotEqual(ZoneFinder.DELEGATION, ZoneFinder.NXDOMAIN)
  185. self.assertNotEqual(ZoneFinder.NXDOMAIN, ZoneFinder.NXRRSET)
  186. self.assertNotEqual(ZoneFinder.NXRRSET, ZoneFinder.CNAME)
  187. self.assertNotEqual(ZoneFinder.CNAME, ZoneFinder.DNAME)
  188. self.assertNotEqual(ZoneFinder.DNAME, ZoneFinder.WILDCARD)
  189. self.assertNotEqual(ZoneFinder.WILDCARD, ZoneFinder.WILDCARD_CNAME)
  190. self.assertNotEqual(ZoneFinder.WILDCARD_CNAME,
  191. ZoneFinder.WILDCARD_NXRRSET)
  192. def test_find(self):
  193. dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
  194. result, finder = dsc.find_zone(isc.dns.Name("example.com"))
  195. self.assertEqual(finder.SUCCESS, result)
  196. self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
  197. self.assertEqual("example.com.", finder.get_origin().to_text())
  198. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  199. isc.dns.RRType.A(),
  200. None,
  201. finder.FIND_DEFAULT)
  202. self.assertEqual(finder.SUCCESS, result)
  203. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  204. rrset.to_text())
  205. result, rrset = finder.find(isc.dns.Name("www.sql1.example.com"),
  206. isc.dns.RRType.A(),
  207. None,
  208. finder.FIND_DEFAULT)
  209. self.assertEqual(finder.DELEGATION, result)
  210. self.assertEqual("sql1.example.com. 3600 IN NS dns01.example.com.\n" +
  211. "sql1.example.com. 3600 IN NS dns02.example.com.\n" +
  212. "sql1.example.com. 3600 IN NS dns03.example.com.\n",
  213. rrset.to_text())
  214. result, rrset = finder.find(isc.dns.Name("doesnotexist.example.com"),
  215. isc.dns.RRType.A(),
  216. None,
  217. finder.FIND_DEFAULT)
  218. self.assertEqual(finder.NXDOMAIN, result)
  219. self.assertEqual(None, rrset)
  220. result, rrset = finder.find(isc.dns.Name("www.some.other.domain"),
  221. isc.dns.RRType.A(),
  222. None,
  223. finder.FIND_DEFAULT)
  224. self.assertEqual(finder.NXDOMAIN, result)
  225. self.assertEqual(None, rrset)
  226. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  227. isc.dns.RRType.TXT(),
  228. None,
  229. finder.FIND_DEFAULT)
  230. self.assertEqual(finder.NXRRSET, result)
  231. self.assertEqual(None, rrset)
  232. result, rrset = finder.find(isc.dns.Name("cname-ext.example.com"),
  233. isc.dns.RRType.A(),
  234. None,
  235. finder.FIND_DEFAULT)
  236. self.assertEqual(finder.CNAME, result)
  237. self.assertEqual(
  238. "cname-ext.example.com. 3600 IN CNAME www.sql1.example.com.\n",
  239. rrset.to_text())
  240. result, rrset = finder.find(isc.dns.Name("foo.wild.example.com"),
  241. isc.dns.RRType.A(),
  242. None,
  243. finder.FIND_DEFAULT)
  244. self.assertEqual(finder.WILDCARD, result)
  245. self.assertEqual("foo.wild.example.com. 3600 IN A 192.0.2.255\n",
  246. rrset.to_text())
  247. result, rrset = finder.find(isc.dns.Name("foo.wild.example.com"),
  248. isc.dns.RRType.TXT(),
  249. None,
  250. finder.FIND_DEFAULT)
  251. self.assertEqual(finder.WILDCARD_NXRRSET, result)
  252. self.assertEqual(None, rrset)
  253. self.assertRaises(TypeError, finder.find,
  254. "foo",
  255. isc.dns.RRType.A(),
  256. None,
  257. finder.FIND_DEFAULT)
  258. self.assertRaises(TypeError, finder.find,
  259. isc.dns.Name("cname-ext.example.com"),
  260. "foo",
  261. None,
  262. finder.FIND_DEFAULT)
  263. self.assertRaises(TypeError, finder.find,
  264. isc.dns.Name("cname-ext.example.com"),
  265. isc.dns.RRType.A(),
  266. None,
  267. "foo")
  268. def test_find_previous(self):
  269. dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
  270. result, finder = dsc.find_zone(isc.dns.Name("example.com"))
  271. self.assertEqual(finder.SUCCESS, result)
  272. prev = finder.find_previous_name(isc.dns.Name("bbb.example.com"))
  273. self.assertEqual("example.com.", prev.to_text())
  274. prev = finder.find_previous_name(isc.dns.Name("zzz.example.com"))
  275. self.assertEqual("www.example.com.", prev.to_text())
  276. prev = finder.find_previous_name(prev)
  277. self.assertEqual("*.wild.example.com.", prev.to_text())
  278. self.assertRaises(isc.datasrc.NotImplemented,
  279. finder.find_previous_name,
  280. isc.dns.Name("com"))
  281. class DataSrcUpdater(unittest.TestCase):
  282. def setUp(self):
  283. # Make a fresh copy of the writable database with all original content
  284. shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
  285. def test_construct(self):
  286. # can't construct directly
  287. self.assertRaises(TypeError, isc.datasrc.ZoneUpdater)
  288. def test_update_delete_commit(self):
  289. dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
  290. # first make sure, through a separate finder, that some record exists
  291. result, finder = dsc.find_zone(isc.dns.Name("example.com"))
  292. self.assertEqual(finder.SUCCESS, result)
  293. self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
  294. self.assertEqual("example.com.", finder.get_origin().to_text())
  295. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  296. isc.dns.RRType.A(),
  297. None,
  298. finder.FIND_DEFAULT)
  299. self.assertEqual(finder.SUCCESS, result)
  300. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  301. rrset.to_text())
  302. rrset_to_delete = rrset;
  303. # can't delete rrset with associated sig. Abuse that to force an
  304. # exception first, then remove the sig, then delete the record
  305. updater = dsc.get_updater(isc.dns.Name("example.com"), True)
  306. self.assertRaises(isc.datasrc.Error, updater.delete_rrset,
  307. rrset_to_delete)
  308. rrset_to_delete.remove_rrsig()
  309. updater.delete_rrset(rrset_to_delete)
  310. # The record should be gone in the updater, but not in the original
  311. # finder (since we have not committed)
  312. result, rrset = updater.find(isc.dns.Name("www.example.com"),
  313. isc.dns.RRType.A(),
  314. None,
  315. finder.FIND_DEFAULT)
  316. self.assertEqual(finder.NXDOMAIN, result)
  317. self.assertEqual(None, rrset)
  318. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  319. isc.dns.RRType.A(),
  320. None,
  321. finder.FIND_DEFAULT)
  322. self.assertEqual(finder.SUCCESS, result)
  323. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  324. rrset.to_text())
  325. updater.commit()
  326. # second commit should raise exception
  327. self.assertRaises(isc.datasrc.Error, updater.commit)
  328. # the record should be gone now in the 'real' finder as well
  329. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  330. isc.dns.RRType.A(),
  331. None,
  332. finder.FIND_DEFAULT)
  333. self.assertEqual(finder.NXDOMAIN, result)
  334. self.assertEqual(None, rrset)
  335. # now add it again
  336. updater = dsc.get_updater(isc.dns.Name("example.com"), True)
  337. updater.add_rrset(rrset_to_delete)
  338. updater.commit()
  339. # second commit should throw
  340. self.assertRaises(isc.datasrc.Error, updater.commit)
  341. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  342. isc.dns.RRType.A(),
  343. None,
  344. finder.FIND_DEFAULT)
  345. self.assertEqual(finder.SUCCESS, result)
  346. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  347. rrset.to_text())
  348. def test_two_modules(self):
  349. # load two modules, and check if they don't interfere
  350. mem_cfg = { "type": "memory", "class": "IN", "zones": [] };
  351. dsc_mem = isc.datasrc.DataSourceClient("memory", json.dumps(mem_cfg))
  352. dsc_sql = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
  353. # check if exceptions are working
  354. self.assertRaises(isc.datasrc.Error, isc.datasrc.DataSourceClient,
  355. "memory", "{}")
  356. self.assertRaises(isc.datasrc.Error, isc.datasrc.DataSourceClient,
  357. "sqlite3", "{}")
  358. # see if a lookup succeeds in sqlite3 ds
  359. result, finder = dsc_sql.find_zone(isc.dns.Name("example.com"))
  360. self.assertEqual(finder.SUCCESS, result)
  361. self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
  362. self.assertEqual("example.com.", finder.get_origin().to_text())
  363. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  364. isc.dns.RRType.A(),
  365. None,
  366. finder.FIND_DEFAULT)
  367. self.assertEqual(finder.SUCCESS, result)
  368. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  369. rrset.to_text())
  370. # see if a lookup fails in mem ds
  371. result, finder = dsc_mem.find_zone(isc.dns.Name("example.com"))
  372. self.assertEqual(finder.NXDOMAIN, result)
  373. def test_update_delete_abort(self):
  374. # we don't do enything with this one, just making sure loading two
  375. # datasources
  376. dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
  377. # first make sure, through a separate finder, that some record exists
  378. result, finder = dsc.find_zone(isc.dns.Name("example.com"))
  379. self.assertEqual(finder.SUCCESS, result)
  380. self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
  381. self.assertEqual("example.com.", finder.get_origin().to_text())
  382. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  383. isc.dns.RRType.A(),
  384. None,
  385. finder.FIND_DEFAULT)
  386. self.assertEqual(finder.SUCCESS, result)
  387. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  388. rrset.to_text())
  389. rrset_to_delete = rrset;
  390. # can't delete rrset with associated sig. Abuse that to force an
  391. # exception first, then remove the sig, then delete the record
  392. updater = dsc.get_updater(isc.dns.Name("example.com"), True)
  393. self.assertRaises(isc.datasrc.Error, updater.delete_rrset,
  394. rrset_to_delete)
  395. rrset_to_delete.remove_rrsig()
  396. updater.delete_rrset(rrset_to_delete)
  397. # The record should be gone in the updater, but not in the original
  398. # finder (since we have not committed)
  399. result, rrset = updater.find(isc.dns.Name("www.example.com"),
  400. isc.dns.RRType.A(),
  401. None,
  402. finder.FIND_DEFAULT)
  403. self.assertEqual(finder.NXDOMAIN, result)
  404. self.assertEqual(None, rrset)
  405. # destroy the updater, which should make it roll back
  406. updater = None
  407. # the record should still be available in the 'real' finder as well
  408. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  409. isc.dns.RRType.A(),
  410. None,
  411. finder.FIND_DEFAULT)
  412. self.assertEqual(finder.SUCCESS, result)
  413. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  414. rrset.to_text())
  415. def test_update_for_no_zone(self):
  416. dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
  417. self.assertEqual(None,
  418. dsc.get_updater(isc.dns.Name("notexistent.example"),
  419. True))
  420. if __name__ == "__main__":
  421. isc.log.init("bind10")
  422. unittest.main()