datasrc_test.py 17 KB


  1. # Copyright (C) 2011 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import isc.log
  16. import isc.datasrc
  17. import isc.dns
  18. import unittest
  19. import os
  20. import shutil
  21. TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
  22. TESTDATA_WRITE_PATH = os.environ['TESTDATA_WRITE_PATH'] + os.sep
  23. READ_ZONE_DB_FILE = TESTDATA_PATH + "example.com.sqlite3"
  24. BROKEN_DB_FILE = TESTDATA_PATH + "brokendb.sqlite3"
  25. WRITE_ZONE_DB_FILE = TESTDATA_WRITE_PATH + "rwtest.sqlite3.copied"
  26. NEW_DB_FILE = TESTDATA_WRITE_PATH + "new_db.sqlite3"
  27. def add_rrset(rrset_list, name, rrclass, rrtype, ttl, rdatas):
  28. rrset_to_add = isc.dns.RRset(name, rrclass, rrtype, ttl)
  29. if rdatas is not None:
  30. for rdata in rdatas:
  31. rrset_to_add.add_rdata(isc.dns.Rdata(rrtype, rrclass, rdata))
  32. rrset_list.append(rrset_to_add)
  33. # helper function, we have no direct rrset comparison atm
  34. def rrsets_equal(a, b):
  35. # no accessor for sigs either (so this only checks name, class, type, ttl,
  36. # and rdata)
  37. # also, because of the fake data in rrsigs, if the type is rrsig, the
  38. # rdata is not checked
  39. return a.get_name() == b.get_name() and\
  40. a.get_class() == b.get_class() and\
  41. a.get_type() == b.get_type() and \
  42. a.get_ttl() == b.get_ttl() and\
  43. (a.get_type() == isc.dns.RRType.RRSIG() or
  44. sorted(a.get_rdata()) == sorted(b.get_rdata()))
  45. # returns true if rrset is in expected_rrsets
  46. # will remove the rrset from expected_rrsets if found
  47. def check_for_rrset(expected_rrsets, rrset):
  48. for cur_rrset in expected_rrsets[:]:
  49. if rrsets_equal(cur_rrset, rrset):
  50. expected_rrsets.remove(cur_rrset)
  51. return True
  52. return False
  53. class DataSrcClient(unittest.TestCase):
  54. def test_construct(self):
  55. # can't construct directly
  56. self.assertRaises(TypeError, isc.datasrc.ZoneIterator)
  57. def test_iterate(self):
  58. dsc = isc.datasrc.DataSourceClient(READ_ZONE_DB_FILE)
  59. # for RRSIGS, the TTL's are currently modified. This test should
  60. # start failing when we fix that.
  61. rrs = dsc.get_iterator(isc.dns.Name("sql1.example.com."))
  62. # we do not know the order in which they are returned by the iterator
  63. # but we do want to check them, so we put all records into one list
  64. # sort it (doesn't matter which way it is sorted, as long as it is
  65. # sorted)
  66. # RRset is (atm) an unorderable type, and within an rrset, the
  67. # rdatas and rrsigs may also be in random order. In theory the
  68. # rrsets themselves can be returned in any order.
  69. #
  70. # So we create a second list with all rrsets we expect, and for each
  71. # rrset we get from the iterator, see if it is in that list, and
  72. # remove it.
  73. #
  74. # When the iterator is empty, we check no rrsets are left in the
  75. # list of expected ones
  76. expected_rrset_list = []
  77. name = isc.dns.Name("sql1.example.com")
  78. rrclass = isc.dns.RRClass.IN()
  79. add_rrset(expected_rrset_list, name, rrclass,
  80. isc.dns.RRType.DNSKEY(), isc.dns.RRTTL(3600),
  81. [
  82. "256 3 5 AwEAAdYdRhBAEY67R/8G1N5AjGF6asIiNh/pNGeQ8xDQP13J"+
  83. "N2lo+sNqWcmpYNhuVqRbLB+mamsU1XcCICSBvAlSmfz/ZUdafX23knAr"+
  84. "TlALxMmspcfdpqun3Yr3YYnztuj06rV7RqmveYckWvAUXVYMSMQZfJ30"+
  85. "5fs0dE/xLztL/CzZ",
  86. "257 3 5 AwEAAbaKDSa9XEFTsjSYpUTHRotTS9Tz3krfDucugW5UokGQ"+
  87. "KC26QlyHXlPTZkC+aRFUs/dicJX2kopndLcnlNAPWiKnKtrsFSCnIJDB"+
  88. "ZIyvcKq+9RXmV3HK3bUdHnQZ88IZWBRmWKfZ6wnzHo53kdYKAemTErkz"+
  89. "taX3lRRPLYWpxRcDPEjysXT3Lh0vfL5D+CIO1yKw/q7C+v6+/kYAxc2l"+
  90. "fbNE3HpklSuF+dyX4nXxWgzbcFuLz5Bwfq6ZJ9RYe/kNkA0uMWNa1KkG"+
  91. "eRh8gg22kgD/KT5hPTnpezUWLvoY5Qc7IB3T0y4n2JIwiF2ZrZYVrWgD"+
  92. "jRWAzGsxJiJyjd6w2k0="
  93. ])
  94. add_rrset(expected_rrset_list, name, rrclass,
  95. isc.dns.RRType.NS(), isc.dns.RRTTL(3600),
  96. [
  97. "dns01.example.com.",
  98. "dns02.example.com.",
  99. "dns03.example.com."
  100. ])
  101. add_rrset(expected_rrset_list, name, rrclass,
  102. isc.dns.RRType.NSEC(), isc.dns.RRTTL(7200),
  103. [
  104. "www.sql1.example.com. NS SOA RRSIG NSEC DNSKEY"
  105. ])
  106. # For RRSIGS, we can't add the fake data through the API, so we
  107. # simply pass no rdata at all (which is skipped by the check later)
  108. add_rrset(expected_rrset_list, name, rrclass,
  109. isc.dns.RRType.RRSIG(), isc.dns.RRTTL(3600), None)
  110. add_rrset(expected_rrset_list, name, rrclass,
  111. isc.dns.RRType.SOA(), isc.dns.RRTTL(3600),
  112. [
  113. "master.example.com. admin.example.com. 678 3600 1800 2419200 7200"
  114. ])
  115. name = isc.dns.Name("www.sql1.example.com.")
  116. add_rrset(expected_rrset_list, name, rrclass,
  117. isc.dns.RRType.A(), isc.dns.RRTTL(3600),
  118. [
  119. "192.0.2.100"
  120. ])
  121. name = isc.dns.Name("www.sql1.example.com.")
  122. add_rrset(expected_rrset_list, name, rrclass,
  123. isc.dns.RRType.NSEC(), isc.dns.RRTTL(7200),
  124. [
  125. "sql1.example.com. A RRSIG NSEC"
  126. ])
  127. add_rrset(expected_rrset_list, name, rrclass,
  128. isc.dns.RRType.RRSIG(), isc.dns.RRTTL(3600), None)
  129. # rrs is an iterator, but also has direct get_next_rrset(), use
  130. # the latter one here
  131. rrset_to_check = rrs.get_next_rrset()
  132. while (rrset_to_check != None):
  133. self.assertTrue(check_for_rrset(expected_rrset_list,
  134. rrset_to_check),
  135. "Unexpected rrset returned by iterator:\n" +
  136. rrset_to_check.to_text())
  137. rrset_to_check = rrs.get_next_rrset()
  138. # Now check there are none left
  139. self.assertEqual(0, len(expected_rrset_list),
  140. "RRset(s) not returned by iterator: " +
  141. str([rrset.to_text() for rrset in expected_rrset_list ]
  142. ))
  143. # TODO should we catch this (iterating past end) and just return None
  144. # instead of failing?
  145. self.assertRaises(isc.datasrc.Error, rrs.get_next_rrset)
  146. rrets = dsc.get_iterator(isc.dns.Name("example.com"))
  147. # there are more than 80 RRs in this zone... let's just count them
  148. # (already did a full check of the smaller zone above)
  149. self.assertEqual(55, len(list(rrets)))
  150. # TODO should we catch this (iterating past end) and just return None
  151. # instead of failing?
  152. self.assertRaises(isc.datasrc.Error, rrs.get_next_rrset)
  153. self.assertRaises(TypeError, dsc.get_iterator, "asdf")
  154. def test_construct(self):
  155. # can't construct directly
  156. self.assertRaises(TypeError, isc.datasrc.ZoneFinder)
  157. def test_find(self):
  158. dsc = isc.datasrc.DataSourceClient(READ_ZONE_DB_FILE)
  159. result, finder = dsc.find_zone(isc.dns.Name("example.com"))
  160. self.assertEqual(finder.SUCCESS, result)
  161. self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
  162. self.assertEqual("example.com.", finder.get_origin().to_text())
  163. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  164. isc.dns.RRType.A(),
  165. None,
  166. finder.FIND_DEFAULT)
  167. self.assertEqual(finder.SUCCESS, result)
  168. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  169. rrset.to_text())
  170. result, rrset = finder.find(isc.dns.Name("www.sql1.example.com"),
  171. isc.dns.RRType.A(),
  172. None,
  173. finder.FIND_DEFAULT)
  174. self.assertEqual(finder.DELEGATION, result)
  175. self.assertEqual("sql1.example.com. 3600 IN NS dns01.example.com.\n" +
  176. "sql1.example.com. 3600 IN NS dns02.example.com.\n" +
  177. "sql1.example.com. 3600 IN NS dns03.example.com.\n",
  178. rrset.to_text())
  179. result, rrset = finder.find(isc.dns.Name("doesnotexist.example.com"),
  180. isc.dns.RRType.A(),
  181. None,
  182. finder.FIND_DEFAULT)
  183. self.assertEqual(finder.NXDOMAIN, result)
  184. self.assertEqual(None, rrset)
  185. result, rrset = finder.find(isc.dns.Name("www.some.other.domain"),
  186. isc.dns.RRType.A(),
  187. None,
  188. finder.FIND_DEFAULT)
  189. self.assertEqual(finder.NXDOMAIN, result)
  190. self.assertEqual(None, rrset)
  191. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  192. isc.dns.RRType.TXT(),
  193. None,
  194. finder.FIND_DEFAULT)
  195. self.assertEqual(finder.NXRRSET, result)
  196. self.assertEqual(None, rrset)
  197. result, rrset = finder.find(isc.dns.Name("cname-ext.example.com"),
  198. isc.dns.RRType.A(),
  199. None,
  200. finder.FIND_DEFAULT)
  201. self.assertEqual(finder.CNAME, result)
  202. self.assertEqual(
  203. "cname-ext.example.com. 3600 IN CNAME www.sql1.example.com.\n",
  204. rrset.to_text())
  205. self.assertRaises(TypeError, finder.find,
  206. "foo",
  207. isc.dns.RRType.A(),
  208. None,
  209. finder.FIND_DEFAULT)
  210. self.assertRaises(TypeError, finder.find,
  211. isc.dns.Name("cname-ext.example.com"),
  212. "foo",
  213. None,
  214. finder.FIND_DEFAULT)
  215. self.assertRaises(TypeError, finder.find,
  216. isc.dns.Name("cname-ext.example.com"),
  217. isc.dns.RRType.A(),
  218. None,
  219. "foo")
  220. class DataSrcUpdater(unittest.TestCase):
  221. def setUp(self):
  222. # Make a fresh copy of the writable database with all original content
  223. shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
  224. def test_construct(self):
  225. # can't construct directly
  226. self.assertRaises(TypeError, isc.datasrc.ZoneUpdater)
  227. def test_update_delete_commit(self):
  228. dsc = isc.datasrc.DataSourceClient(WRITE_ZONE_DB_FILE)
  229. # first make sure, through a separate finder, that some record exists
  230. result, finder = dsc.find_zone(isc.dns.Name("example.com"))
  231. self.assertEqual(finder.SUCCESS, result)
  232. self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
  233. self.assertEqual("example.com.", finder.get_origin().to_text())
  234. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  235. isc.dns.RRType.A(),
  236. None,
  237. finder.FIND_DEFAULT)
  238. self.assertEqual(finder.SUCCESS, result)
  239. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  240. rrset.to_text())
  241. rrset_to_delete = rrset;
  242. # can't delete rrset with associated sig. Abuse that to force an
  243. # exception first, then remove the sig, then delete the record
  244. updater = dsc.get_updater(isc.dns.Name("example.com"), True)
  245. self.assertRaises(isc.datasrc.Error, updater.delete_rrset,
  246. rrset_to_delete)
  247. rrset_to_delete.remove_rrsig()
  248. updater.delete_rrset(rrset_to_delete)
  249. # The record should be gone in the updater, but not in the original
  250. # finder (since we have not committed)
  251. result, rrset = updater.find(isc.dns.Name("www.example.com"),
  252. isc.dns.RRType.A(),
  253. None,
  254. finder.FIND_DEFAULT)
  255. self.assertEqual(finder.NXDOMAIN, result)
  256. self.assertEqual(None, rrset)
  257. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  258. isc.dns.RRType.A(),
  259. None,
  260. finder.FIND_DEFAULT)
  261. self.assertEqual(finder.SUCCESS, result)
  262. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  263. rrset.to_text())
  264. updater.commit()
  265. # second commit should raise exception
  266. self.assertRaises(isc.datasrc.Error, updater.commit)
  267. # the record should be gone now in the 'real' finder as well
  268. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  269. isc.dns.RRType.A(),
  270. None,
  271. finder.FIND_DEFAULT)
  272. self.assertEqual(finder.NXDOMAIN, result)
  273. self.assertEqual(None, rrset)
  274. # now add it again
  275. updater = dsc.get_updater(isc.dns.Name("example.com"), True)
  276. updater.add_rrset(rrset_to_delete)
  277. updater.commit()
  278. # second commit should throw
  279. self.assertRaises(isc.datasrc.Error, updater.commit)
  280. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  281. isc.dns.RRType.A(),
  282. None,
  283. finder.FIND_DEFAULT)
  284. self.assertEqual(finder.SUCCESS, result)
  285. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  286. rrset.to_text())
  287. def test_update_delete_abort(self):
  288. dsc = isc.datasrc.DataSourceClient(WRITE_ZONE_DB_FILE)
  289. # first make sure, through a separate finder, that some record exists
  290. result, finder = dsc.find_zone(isc.dns.Name("example.com"))
  291. self.assertEqual(finder.SUCCESS, result)
  292. self.assertEqual(isc.dns.RRClass.IN(), finder.get_class())
  293. self.assertEqual("example.com.", finder.get_origin().to_text())
  294. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  295. isc.dns.RRType.A(),
  296. None,
  297. finder.FIND_DEFAULT)
  298. self.assertEqual(finder.SUCCESS, result)
  299. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  300. rrset.to_text())
  301. rrset_to_delete = rrset;
  302. # can't delete rrset with associated sig. Abuse that to force an
  303. # exception first, then remove the sig, then delete the record
  304. updater = dsc.get_updater(isc.dns.Name("example.com"), True)
  305. self.assertRaises(isc.datasrc.Error, updater.delete_rrset,
  306. rrset_to_delete)
  307. rrset_to_delete.remove_rrsig()
  308. updater.delete_rrset(rrset_to_delete)
  309. # The record should be gone in the updater, but not in the original
  310. # finder (since we have not committed)
  311. result, rrset = updater.find(isc.dns.Name("www.example.com"),
  312. isc.dns.RRType.A(),
  313. None,
  314. finder.FIND_DEFAULT)
  315. self.assertEqual(finder.NXDOMAIN, result)
  316. self.assertEqual(None, rrset)
  317. # destroy the updater, which should make it roll back
  318. updater = None
  319. # the record should still be available in the 'real' finder as well
  320. result, rrset = finder.find(isc.dns.Name("www.example.com"),
  321. isc.dns.RRType.A(),
  322. None,
  323. finder.FIND_DEFAULT)
  324. self.assertEqual(finder.SUCCESS, result)
  325. self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n",
  326. rrset.to_text())
  327. def test_update_for_no_zone(self):
  328. dsc = isc.datasrc.DataSourceClient(WRITE_ZONE_DB_FILE)
  329. self.assertEqual(None,
  330. dsc.get_updater(isc.dns.Name("notexistent.example"),
  331. True))
  332. if __name__ == "__main__":
  333. isc.log.init("bind10")
  334. unittest.main()