datasrc_test.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542
  1. # Copyright (C) 2011 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import isc.log
  16. import isc.datasrc
  17. from isc.datasrc import ZoneFinder
  18. import isc.dns
  19. import unittest
  20. import os
  21. import shutil
  22. import sys
  23. import json
  24. TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
  25. TESTDATA_WRITE_PATH = os.environ['TESTDATA_WRITE_PATH'] + os.sep
  26. READ_ZONE_DB_FILE = TESTDATA_PATH + "example.com.sqlite3"
  27. WRITE_ZONE_DB_FILE = TESTDATA_WRITE_PATH + "rwtest.sqlite3.copied"
  28. READ_ZONE_DB_CONFIG = "{ \"database_file\": \"" + READ_ZONE_DB_FILE + "\" }"
  29. WRITE_ZONE_DB_CONFIG = "{ \"database_file\": \"" + WRITE_ZONE_DB_FILE + "\"}"
  30. def add_rrset(rrset_list, name, rrclass, rrtype, ttl, rdatas):
  31. rrset_to_add = isc.dns.RRset(name, rrclass, rrtype, ttl)
  32. if rdatas is not None:
  33. for rdata in rdatas:
  34. rrset_to_add.add_rdata(isc.dns.Rdata(rrtype, rrclass, rdata))
  35. rrset_list.append(rrset_to_add)
  36. # helper function, we have no direct rrset comparison atm
  37. def rrsets_equal(a, b):
  38. # no accessor for sigs either (so this only checks name, class, type, ttl,
  39. # and rdata)
  40. # also, because of the fake data in rrsigs, if the type is rrsig, the
  41. # rdata is not checked
  42. return a.get_name() == b.get_name() and\
  43. a.get_class() == b.get_class() and\
  44. a.get_type() == b.get_type() and \
  45. a.get_ttl() == b.get_ttl() and\
  46. (a.get_type() == isc.dns.RRType.RRSIG() or
  47. sorted(a.get_rdata()) == sorted(b.get_rdata()))
  48. # returns true if rrset is in expected_rrsets
  49. # will remove the rrset from expected_rrsets if found
  50. def check_for_rrset(expected_rrsets, rrset):
  51. for cur_rrset in expected_rrsets[:]:
  52. if rrsets_equal(cur_rrset, rrset):
  53. expected_rrsets.remove(cur_rrset)
  54. return True
  55. return False
  56. class DataSrcClient(unittest.TestCase):
  57. def test_constructors(self):
  58. # can't construct directly
  59. self.assertRaises(TypeError, isc.datasrc.ZoneIterator)
  60. self.assertRaises(TypeError, isc.datasrc.DataSourceClient, 1, "{}")
  61. self.assertRaises(TypeError, isc.datasrc.DataSourceClient, "sqlite3", 1)
  62. self.assertRaises(isc.datasrc.Error,
  63. isc.datasrc.DataSourceClient, "foo", "{}")
  64. self.assertRaises(isc.datasrc.Error,
  65. isc.datasrc.DataSourceClient, "sqlite3", "")
  66. self.assertRaises(isc.datasrc.Error,
  67. isc.datasrc.DataSourceClient, "sqlite3", "{}")
  68. self.assertRaises(isc.datasrc.Error,
  69. isc.datasrc.DataSourceClient, "sqlite3",
  70. "{ \"foo\": 1 }")
  71. self.assertRaises(isc.datasrc.Error,
  72. isc.datasrc.DataSourceClient, "memory",
  73. "{ \"foo\": 1 }")
  74. def test_iterate(self):
  75. dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
  76. # for RRSIGS, the TTL's are currently modified. This test should
  77. # start failing when we fix that.
  78. rrs = dsc.get_iterator(isc.dns.Name("sql1.example.com."))
  79. # we do not know the order in which they are returned by the iterator
  80. # but we do want to check them, so we put all records into one list
  81. # sort it (doesn't matter which way it is sorted, as long as it is
  82. # sorted)
  83. # RRset is (atm) an unorderable type, and within an rrset, the
  84. # rdatas and rrsigs may also be in random order. In theory the
  85. # rrsets themselves can be returned in any order.
  86. #
  87. # So we create a second list with all rrsets we expect, and for each
  88. # rrset we get from the iterator, see if it is in that list, and
  89. # remove it.
  90. #
  91. # When the iterator is empty, we check no rrsets are left in the
  92. # list of expected ones
  93. expected_rrset_list = []
  94. name = isc.dns.Name("sql1.example.com")
  95. rrclass = isc.dns.RRClass.IN()
  96. add_rrset(expected_rrset_list, name, rrclass,
  97. isc.dns.RRType.DNSKEY(), isc.dns.RRTTL(3600),
  98. [
  99. "256 3 5 AwEAAdYdRhBAEY67R/8G1N5AjGF6asIiNh/pNGeQ8xDQP13J"+
  100. "N2lo+sNqWcmpYNhuVqRbLB+mamsU1XcCICSBvAlSmfz/ZUdafX23knAr"+
  101. "TlALxMmspcfdpqun3Yr3YYnztuj06rV7RqmveYckWvAUXVYMSMQZfJ30"+
  102. "5fs0dE/xLztL/CzZ",
  103. "257 3 5 AwEAAbaKDSa9XEFTsjSYpUTHRotTS9Tz3krfDucugW5UokGQ"+
  104. "KC26QlyHXlPTZkC+aRFUs/dicJX2kopndLcnlNAPWiKnKtrsFSCnIJDB"+
  105. "ZIyvcKq+9RXmV3HK3bUdHnQZ88IZWBRmWKfZ6wnzHo53kdYKAemTErkz"+
  106. "taX3lRRPLYWpxRcDPEjysXT3Lh0vfL5D+CIO1yKw/q7C+v6+/kYAxc2l"+
  107. "fbNE3HpklSuF+dyX4nXxWgzbcFuLz5Bwfq6ZJ9RYe/kNkA0uMWNa1KkG"+
  108. "eRh8gg22kgD/KT5hPTnpezUWLvoY5Qc7IB3T0y4n2JIwiF2ZrZYVrWgD"+
  109. "jRWAzGsxJiJyjd6w2k0="
  110. ])
  111. add_rrset(expected_rrset_list, name, rrclass,
  112. isc.dns.RRType.NS(), isc.dns.RRTTL(3600),
  113. [
  114. "dns01.example.com.",
  115. "dns02.example.com.",
  116. "dns03.example.com."
  117. ])
  118. add_rrset(expected_rrset_list, name, rrclass,
  119. isc.dns.RRType.NSEC(), isc.dns.RRTTL(7200),
  120. [
  121. "www.sql1.example.com. NS SOA RRSIG NSEC DNSKEY"
  122. ])
  123. # For RRSIGS, we can't add the fake data through the API, so we
  124. # simply pass no rdata at all (which is skipped by the check later)
  125. add_rrset(expected_rrset_list, name, rrclass,
  126. isc.dns.RRType.RRSIG(), isc.dns.RRTTL(3600), None)
  127. add_rrset(expected_rrset_list, name, rrclass,
  128. isc.dns.RRType.SOA(), isc.dns.RRTTL(3600),
  129. [
  130. "master.example.com. admin.example.com. 678 3600 1800 2419200 7200"
  131. ])
  132. name = isc.dns.Name("www.sql1.example.com.")
  133. add_rrset(expected_rrset_list, name, rrclass,
  134. isc.dns.RRType.A(), isc.dns.RRTTL(3600),
  135. [
  136. "192.0.2.100"
  137. ])
  138. name = isc.dns.Name("www.sql1.example.com.")
  139. add_rrset(expected_rrset_list, name, rrclass,
  140. isc.dns.RRType.NSEC(), isc.dns.RRTTL(7200),
  141. [
  142. "sql1.example.com. A RRSIG NSEC"
  143. ])
  144. add_rrset(expected_rrset_list, name, rrclass,
  145. isc.dns.RRType.RRSIG(), isc.dns.RRTTL(3600), None)
  146. # rrs is an iterator, but also has direct get_next_rrset(), use
  147. # the latter one here
  148. rrset_to_check = rrs.get_next_rrset()
  149. while (rrset_to_check != None):
  150. self.assertTrue(check_for_rrset(expected_rrset_list,
  151. rrset_to_check),
  152. "Unexpected rrset returned by iterator:\n" +
  153. rrset_to_check.to_text())
  154. rrset_to_check = rrs.get_next_rrset()
  155. # Now check there are none left
  156. self.assertEqual(0, len(expected_rrset_list),
  157. "RRset(s) not returned by iterator: " +
  158. str([rrset.to_text() for rrset in expected_rrset_list ]
  159. ))
  160. # TODO should we catch this (iterating past end) and just return None
  161. # instead of failing?
  162. self.assertRaises(isc.datasrc.Error, rrs.get_next_rrset)
  163. rrets = dsc.get_iterator(isc.dns.Name("example.com"))
  164. # there are more than 80 RRs in this zone... let's just count them
  165. # (already did a full check of the smaller zone above)
  166. self.assertEqual(55, len(list(rrets)))
  167. # TODO should we catch this (iterating past end) and just return None
  168. # instead of failing?
  169. self.assertRaises(isc.datasrc.Error, rrs.get_next_rrset)
  170. self.assertRaises(TypeError, dsc.get_iterator, "asdf")
  171. def test_iterator_soa(self):
  172. dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
  173. iterator = dsc.get_iterator(isc.dns.Name("sql1.example.com."))
  174. expected_soa = isc.dns.RRset(isc.dns.Name("sql1.example.com."),
  175. isc.dns.RRClass.IN(),
  176. isc.dns.RRType.SOA(),
  177. isc.dns.RRTTL(3600))
  178. expected_soa.add_rdata(isc.dns.Rdata(isc.dns.RRType.SOA(),
  179. isc.dns.RRClass.IN(),
  180. "master.example.com. " +
  181. "admin.example.com. 678 " +
  182. "3600 1800 2419200 7200"))
  183. self.assertTrue(rrsets_equal(expected_soa, iterator.get_soa()))
  184. def test_construct(self):
  185. # can't construct directly
  186. self.assertRaises(TypeError, isc.datasrc.ZoneFinder)
  187. def test_findoptions(self):
  188. '''A simple test to confirm no option is specified by default.
  189. '''
  190. self.assertFalse(ZoneFinder.FIND_DEFAULT & ZoneFinder.FIND_GLUE_OK)
  191. self.assertFalse(ZoneFinder.FIND_DEFAULT & ZoneFinder.FIND_DNSSEC)
  192. self.assertFalse(ZoneFinder.FIND_DEFAULT & ZoneFinder.NO_WILDCARD)
  193. def test_findresults(self):
  194. '''A simple test to confirm result codes are (defined and) different
  195. for some combinations.
  196. '''
  197. self.assertNotEqual(ZoneFinder.SUCCESS, ZoneFinder.DELEGATION)
  198. self.assertNotEqual(ZoneFinder.DELEGATION, ZoneFinder.NXDOMAIN)
  199. self.assertNotEqual(ZoneFinder.NXDOMAIN, ZoneFinder.NXRRSET)
  200. self.assertNotEqual(ZoneFinder.NXRRSET, ZoneFinder.CNAME)
  201. self.assertNotEqual(ZoneFinder.CNAME, ZoneFinder.DNAME)
  202. self.assertNotEqual(ZoneFinder.DNAME, ZoneFinder.WILDCARD)
  203. self.assertNotEqual(ZoneFinder.WILDCARD, ZoneFinder.WILDCARD_CNAME)
  204. self.assertNotEqual(ZoneFinder.WILDCARD_CNAME,
  205. ZoneFinder.WILDCARD_NXRRSET)
  206. def test_find(self):
  207. dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
  208. result, finder = dsc.find_zone(isc.dns.Name("example.com"))
  209. self.assertEqual(finder.SUCCESS, result)
  210. self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
  211. self.assertEqual("example.com.", finder.get_origin().to_text())
  212. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  213. isc.dns.RRType.A(),
  214. None,
  215. finder.FIND_DEFAULT)
  216. self.assertEqual(finder.SUCCESS, result)
  217. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  218. rrset.to_text())
  219. result, rrset = finder.find(isc.dns.Name("www.sql1.example.com"),
  220. isc.dns.RRType.A(),
  221. None,
  222. finder.FIND_DEFAULT)
  223. self.assertEqual(finder.DELEGATION, result)
  224. self.assertEqual("sql1.example.com. 3600 IN NS dns01.example.com.\n" +
  225. "sql1.example.com. 3600 IN NS dns02.example.com.\n" +
  226. "sql1.example.com. 3600 IN NS dns03.example.com.\n",
  227. rrset.to_text())
  228. result, rrset = finder.find(isc.dns.Name("doesnotexist.example.com"),
  229. isc.dns.RRType.A(),
  230. None,
  231. finder.FIND_DEFAULT)
  232. self.assertEqual(finder.NXDOMAIN, result)
  233. self.assertEqual(None, rrset)
  234. result, rrset = finder.find(isc.dns.Name("www.some.other.domain"),
  235. isc.dns.RRType.A(),
  236. None,
  237. finder.FIND_DEFAULT)
  238. self.assertEqual(finder.NXDOMAIN, result)
  239. self.assertEqual(None, rrset)
  240. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  241. isc.dns.RRType.TXT(),
  242. None,
  243. finder.FIND_DEFAULT)
  244. self.assertEqual(finder.NXRRSET, result)
  245. self.assertEqual(None, rrset)
  246. result, rrset = finder.find(isc.dns.Name("cname-ext.example.com"),
  247. isc.dns.RRType.A(),
  248. None,
  249. finder.FIND_DEFAULT)
  250. self.assertEqual(finder.CNAME, result)
  251. self.assertEqual(
  252. "cname-ext.example.com. 3600 IN CNAME www.sql1.example.com.\n",
  253. rrset.to_text())
  254. result, rrset = finder.find(isc.dns.Name("foo.wild.example.com"),
  255. isc.dns.RRType.A(),
  256. None,
  257. finder.FIND_DEFAULT)
  258. self.assertEqual(finder.WILDCARD, result)
  259. self.assertEqual("foo.wild.example.com. 3600 IN A 192.0.2.255\n",
  260. rrset.to_text())
  261. result, rrset = finder.find(isc.dns.Name("foo.wild.example.com"),
  262. isc.dns.RRType.TXT(),
  263. None,
  264. finder.FIND_DEFAULT)
  265. self.assertEqual(finder.WILDCARD_NXRRSET, result)
  266. self.assertEqual(None, rrset)
  267. self.assertRaises(TypeError, finder.find,
  268. "foo",
  269. isc.dns.RRType.A(),
  270. None,
  271. finder.FIND_DEFAULT)
  272. self.assertRaises(TypeError, finder.find,
  273. isc.dns.Name("cname-ext.example.com"),
  274. "foo",
  275. None,
  276. finder.FIND_DEFAULT)
  277. self.assertRaises(TypeError, finder.find,
  278. isc.dns.Name("cname-ext.example.com"),
  279. isc.dns.RRType.A(),
  280. None,
  281. "foo")
  282. def test_find_previous(self):
  283. dsc = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
  284. result, finder = dsc.find_zone(isc.dns.Name("example.com"))
  285. self.assertEqual(finder.SUCCESS, result)
  286. prev = finder.find_previous_name(isc.dns.Name("bbb.example.com"))
  287. self.assertEqual("example.com.", prev.to_text())
  288. prev = finder.find_previous_name(isc.dns.Name("zzz.example.com"))
  289. self.assertEqual("www.example.com.", prev.to_text())
  290. prev = finder.find_previous_name(prev)
  291. self.assertEqual("*.wild.example.com.", prev.to_text())
  292. self.assertRaises(isc.datasrc.NotImplemented,
  293. finder.find_previous_name,
  294. isc.dns.Name("com"))
  295. class DataSrcUpdater(unittest.TestCase):
  296. def setUp(self):
  297. # Make a fresh copy of the writable database with all original content
  298. shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
  299. def test_construct(self):
  300. # can't construct directly
  301. self.assertRaises(TypeError, isc.datasrc.ZoneUpdater)
  302. def test_update_delete_commit(self):
  303. dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
  304. # first make sure, through a separate finder, that some record exists
  305. result, finder = dsc.find_zone(isc.dns.Name("example.com"))
  306. self.assertEqual(finder.SUCCESS, result)
  307. self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
  308. self.assertEqual("example.com.", finder.get_origin().to_text())
  309. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  310. isc.dns.RRType.A(),
  311. None,
  312. finder.FIND_DEFAULT)
  313. self.assertEqual(finder.SUCCESS, result)
  314. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  315. rrset.to_text())
  316. rrset_to_delete = rrset;
  317. # can't delete rrset with associated sig. Abuse that to force an
  318. # exception first, then remove the sig, then delete the record
  319. updater = dsc.get_updater(isc.dns.Name("example.com"), True)
  320. self.assertRaises(isc.datasrc.Error, updater.delete_rrset,
  321. rrset_to_delete)
  322. rrset_to_delete.remove_rrsig()
  323. updater.delete_rrset(rrset_to_delete)
  324. # The record should be gone in the updater, but not in the original
  325. # finder (since we have not committed)
  326. result, rrset = updater.find(isc.dns.Name("www.example.com"),
  327. isc.dns.RRType.A(),
  328. None,
  329. finder.FIND_DEFAULT)
  330. self.assertEqual(finder.NXDOMAIN, result)
  331. self.assertEqual(None, rrset)
  332. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  333. isc.dns.RRType.A(),
  334. None,
  335. finder.FIND_DEFAULT)
  336. self.assertEqual(finder.SUCCESS, result)
  337. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  338. rrset.to_text())
  339. updater.commit()
  340. # second commit should raise exception
  341. self.assertRaises(isc.datasrc.Error, updater.commit)
  342. # the record should be gone now in the 'real' finder as well
  343. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  344. isc.dns.RRType.A(),
  345. None,
  346. finder.FIND_DEFAULT)
  347. self.assertEqual(finder.NXDOMAIN, result)
  348. self.assertEqual(None, rrset)
  349. # now add it again
  350. updater = dsc.get_updater(isc.dns.Name("example.com"), True)
  351. updater.add_rrset(rrset_to_delete)
  352. updater.commit()
  353. # second commit should throw
  354. self.assertRaises(isc.datasrc.Error, updater.commit)
  355. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  356. isc.dns.RRType.A(),
  357. None,
  358. finder.FIND_DEFAULT)
  359. self.assertEqual(finder.SUCCESS, result)
  360. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  361. rrset.to_text())
  362. def test_two_modules(self):
  363. # load two modules, and check if they don't interfere
  364. mem_cfg = { "type": "memory", "class": "IN", "zones": [] };
  365. dsc_mem = isc.datasrc.DataSourceClient("memory", json.dumps(mem_cfg))
  366. dsc_sql = isc.datasrc.DataSourceClient("sqlite3", READ_ZONE_DB_CONFIG)
  367. # check if exceptions are working
  368. self.assertRaises(isc.datasrc.Error, isc.datasrc.DataSourceClient,
  369. "memory", "{}")
  370. self.assertRaises(isc.datasrc.Error, isc.datasrc.DataSourceClient,
  371. "sqlite3", "{}")
  372. # see if a lookup succeeds in sqlite3 ds
  373. result, finder = dsc_sql.find_zone(isc.dns.Name("example.com"))
  374. self.assertEqual(finder.SUCCESS, result)
  375. self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
  376. self.assertEqual("example.com.", finder.get_origin().to_text())
  377. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  378. isc.dns.RRType.A(),
  379. None,
  380. finder.FIND_DEFAULT)
  381. self.assertEqual(finder.SUCCESS, result)
  382. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  383. rrset.to_text())
  384. # see if a lookup fails in mem ds
  385. result, finder = dsc_mem.find_zone(isc.dns.Name("example.com"))
  386. self.assertEqual(finder.NXDOMAIN, result)
  387. def test_update_delete_abort(self):
  388. # we don't do enything with this one, just making sure loading two
  389. # datasources
  390. dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
  391. # first make sure, through a separate finder, that some record exists
  392. result, finder = dsc.find_zone(isc.dns.Name("example.com"))
  393. self.assertEqual(finder.SUCCESS, result)
  394. self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
  395. self.assertEqual("example.com.", finder.get_origin().to_text())
  396. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  397. isc.dns.RRType.A(),
  398. None,
  399. finder.FIND_DEFAULT)
  400. self.assertEqual(finder.SUCCESS, result)
  401. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  402. rrset.to_text())
  403. rrset_to_delete = rrset;
  404. # can't delete rrset with associated sig. Abuse that to force an
  405. # exception first, then remove the sig, then delete the record
  406. updater = dsc.get_updater(isc.dns.Name("example.com"), True)
  407. self.assertRaises(isc.datasrc.Error, updater.delete_rrset,
  408. rrset_to_delete)
  409. rrset_to_delete.remove_rrsig()
  410. updater.delete_rrset(rrset_to_delete)
  411. # The record should be gone in the updater, but not in the original
  412. # finder (since we have not committed)
  413. result, rrset = updater.find(isc.dns.Name("www.example.com"),
  414. isc.dns.RRType.A(),
  415. None,
  416. finder.FIND_DEFAULT)
  417. self.assertEqual(finder.NXDOMAIN, result)
  418. self.assertEqual(None, rrset)
  419. # destroy the updater, which should make it roll back
  420. updater = None
  421. # the record should still be available in the 'real' finder as well
  422. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  423. isc.dns.RRType.A(),
  424. None,
  425. finder.FIND_DEFAULT)
  426. self.assertEqual(finder.SUCCESS, result)
  427. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  428. rrset.to_text())
  429. def test_update_for_no_zone(self):
  430. dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
  431. self.assertEqual(None,
  432. dsc.get_updater(isc.dns.Name("notexistent.example"),
  433. True))
  434. def test_client_reference(self):
  435. # Temporarily create various objects using factory methods of the
  436. # client. The created objects won't be stored anywhere and
  437. # immediately released. The creation shouldn't affect the reference
  438. # to the base client.
  439. dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
  440. orig_ref = sys.getrefcount(dsc)
  441. dsc.find_zone(isc.dns.Name("example.com"))
  442. self.assertEqual(orig_ref, sys.getrefcount(dsc))
  443. dsc.get_iterator(isc.dns.Name("example.com."))
  444. self.assertEqual(orig_ref, sys.getrefcount(dsc))
  445. dsc.get_updater(isc.dns.Name("example.com"), True)
  446. self.assertEqual(orig_ref, sys.getrefcount(dsc))
  447. def test_iterate_over_empty_zone(self):
  448. # empty the test zone first
  449. dsc = isc.datasrc.DataSourceClient("sqlite3", WRITE_ZONE_DB_CONFIG)
  450. updater = dsc.get_updater(isc.dns.Name("example.com"), True)
  451. updater.commit()
  452. # Check the iterator behavior for the empty zone.
  453. iterator = dsc.get_iterator(isc.dns.Name("example.com."))
  454. self.assertEqual(None, iterator.get_soa())
  455. self.assertEqual(None, iterator.get_next_rrset())
  456. if __name__ == "__main__":
  457. isc.log.init("bind10")
  458. unittest.main()