session_tests.py 56 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167
  1. # Copyright (C) 2012 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import os
  16. import shutil
  17. import isc.log
  18. import unittest
  19. from isc.dns import *
  20. from isc.datasrc import DataSourceClient
  21. from isc.ddns.session import *
  22. from isc.ddns.zone_config import *
  23. # Some common test parameters
  24. TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
  25. READ_ZONE_DB_FILE = TESTDATA_PATH + "rwtest.sqlite3" # original, to be copied
  26. TESTDATA_WRITE_PATH = os.environ['TESTDATA_WRITE_PATH'] + os.sep
  27. WRITE_ZONE_DB_FILE = TESTDATA_WRITE_PATH + "rwtest.sqlite3.copied"
  28. WRITE_ZONE_DB_CONFIG = "{ \"database_file\": \"" + WRITE_ZONE_DB_FILE + "\"}"
  29. TEST_ZONE_NAME = Name('example.org')
  30. UPDATE_RRTYPE = RRType.SOA()
  31. TEST_RRCLASS = RRClass.IN()
  32. TEST_ZONE_RECORD = Question(TEST_ZONE_NAME, TEST_RRCLASS, UPDATE_RRTYPE)
  33. TEST_CLIENT6 = ('2001:db8::1', 53, 0, 0)
  34. TEST_CLIENT4 = ('192.0.2.1', 53)
  35. def create_update_msg(zones=[TEST_ZONE_RECORD], prerequisites=[],
  36. updates=[]):
  37. msg = Message(Message.RENDER)
  38. msg.set_qid(5353) # arbitrary chosen
  39. msg.set_opcode(Opcode.UPDATE())
  40. msg.set_rcode(Rcode.NOERROR())
  41. for z in zones:
  42. msg.add_question(z)
  43. for p in prerequisites:
  44. msg.add_rrset(SECTION_PREREQUISITE, p)
  45. for u in updates:
  46. msg.add_rrset(SECTION_UPDATE, u)
  47. renderer = MessageRenderer()
  48. msg.to_wire(renderer)
  49. # re-read the created data in the parse mode
  50. msg.clear(Message.PARSE)
  51. msg.from_wire(renderer.get_data())
  52. return renderer.get_data(), msg
  53. def create_rrset(name, rrclass, rrtype, ttl, rdatas = []):
  54. '''
  55. Helper method to easily create RRsets, auto-converts
  56. name, rrclass, rrtype, and ttl (if possibly through their
  57. respective constructors)
  58. rdatas is a list of rr data strings, or bytestrings, which
  59. should match the RRType of the rrset to create
  60. '''
  61. if type(name) != Name:
  62. name = Name(name)
  63. if type(rrclass) != RRClass:
  64. rrclass = RRClass(rrclass)
  65. if type(rrtype) != RRType:
  66. rrtype = RRType(rrtype)
  67. if type(ttl) != RRTTL:
  68. ttl = RRTTL(ttl)
  69. rrset = isc.dns.RRset(name, rrclass, rrtype, ttl)
  70. for rdata in rdatas:
  71. add_rdata(rrset, rdata)
  72. return rrset
  73. def add_rdata(rrset, rdata):
  74. '''
  75. Helper function for easily adding Rdata fields to RRsets.
  76. This function assumes the given rdata is of type string or bytes,
  77. and corresponds to the given rrset
  78. '''
  79. rrset.add_rdata(isc.dns.Rdata(rrset.get_type(),
  80. rrset.get_class(),
  81. rdata))
  82. class TestDDNSSOA(unittest.TestCase):
  83. '''unittest for the DDNS_SOA'''
  84. def test_update_soa(self):
  85. '''unittest for update_soa function'''
  86. soa_update = DDNS_SOA()
  87. soa_rr = create_rrset("example.org", TEST_RRCLASS,
  88. RRType.SOA(), 3600, ["ns1.example.org. " +
  89. "admin.example.org. " +
  90. "1233 3600 1800 2419200 7200"])
  91. expected_soa_rr = create_rrset("example.org", TEST_RRCLASS,
  92. RRType.SOA(), 3600, ["ns1.example.org. "
  93. + "admin.example.org. " +
  94. "1234 3600 1800 2419200 7200"])
  95. self.assertEqual(soa_update.update_soa(soa_rr).get_rdata()[0].to_text(),
  96. expected_soa_rr.get_rdata()[0].to_text())
  97. max_serial = 2 ** 32 - 1
  98. soa_rdata = "%d %s"%(max_serial,"3600 1800 2419200 7200")
  99. soa_rr = create_rrset("example.org", TEST_RRCLASS, RRType.SOA(), 3600,
  100. ["ns1.example.org. " + "admin.example.org. " +
  101. soa_rdata])
  102. expected_soa_rr = create_rrset("example.org", TEST_RRCLASS,
  103. RRType.SOA(), 3600, ["ns1.example.org. "
  104. + "admin.example.org. " +
  105. "1 3600 1800 2419200 7200"])
  106. self.assertEqual(soa_update.update_soa(soa_rr).get_rdata()[0].to_text(),
  107. expected_soa_rr.get_rdata()[0].to_text())
  108. def test_soa_update_check(self):
  109. '''unittest for soa_update_check function'''
  110. small_soa_rr = create_rrset("example.org", TEST_RRCLASS, RRType.SOA(),
  111. 3600, ["ns1.example.org. " +
  112. "admin.example.org. " +
  113. "1233 3600 1800 2419200 7200"])
  114. large_soa_rr = create_rrset("example.org", TEST_RRCLASS, RRType.SOA(),
  115. 3600, ["ns1.example.org. " +
  116. "admin.example.org. " +
  117. "1234 3600 1800 2419200 7200"])
  118. soa_update = DDNS_SOA()
  119. # The case of (i1 < i2 and i2 - i1 < 2^(SERIAL_BITS - 1)) in rfc 1982
  120. self.assertTrue(soa_update.soa_update_check(small_soa_rr,
  121. large_soa_rr))
  122. self.assertFalse(soa_update.soa_update_check(large_soa_rr,
  123. small_soa_rr))
  124. small_serial = 1235 + 2 ** 31
  125. soa_rdata = "%d %s"%(small_serial,"3600 1800 2419200 7200")
  126. small_soa_rr = create_rrset("example.org", TEST_RRCLASS, RRType.SOA(),
  127. 3600, ["ns1.example.org. " +
  128. "admin.example.org. " +
  129. soa_rdata])
  130. large_soa_rr = create_rrset("example.org", TEST_RRCLASS, RRType.SOA(),
  131. 3600, ["ns1.example.org. " +
  132. "admin.example.org. " +
  133. "1234 3600 1800 2419200 7200"])
  134. # The case of (i1 > i2 and i1 - i2 > 2^(SERIAL_BITS - 1)) in rfc 1982
  135. self.assertTrue(soa_update.soa_update_check(small_soa_rr,
  136. large_soa_rr))
  137. self.assertFalse(soa_update.soa_update_check(large_soa_rr,
  138. small_soa_rr))
  139. class SessionTest(unittest.TestCase):
  140. '''Session tests'''
  141. def setUp(self):
  142. shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
  143. self.__datasrc_client = DataSourceClient("sqlite3",
  144. WRITE_ZONE_DB_CONFIG)
  145. self.__update_msgdata, self.__update_msg = create_update_msg()
  146. self.__session = UpdateSession(self.__update_msg,
  147. self.__update_msgdata, TEST_CLIENT4,
  148. ZoneConfig([], TEST_RRCLASS,
  149. self.__datasrc_client))
  150. self.__session._UpdateSession__get_update_zone()
  151. def check_response(self, msg, expected_rcode):
  152. '''Perform common checks on update resposne message.'''
  153. self.assertTrue(msg.get_header_flag(Message.HEADERFLAG_QR))
  154. # note: we convert opcode to text it'd be more helpful on failure.
  155. self.assertEqual(Opcode.UPDATE().to_text(), msg.get_opcode().to_text())
  156. self.assertEqual(expected_rcode.to_text(), msg.get_rcode().to_text())
  157. # All sections should be cleared
  158. self.assertEqual(0, msg.get_rr_count(SECTION_ZONE))
  159. self.assertEqual(0, msg.get_rr_count(SECTION_PREREQUISITE))
  160. self.assertEqual(0, msg.get_rr_count(SECTION_UPDATE))
  161. self.assertEqual(0, msg.get_rr_count(Message.SECTION_ADDITIONAL))
  162. def test_handle(self):
  163. '''Basic update case'''
  164. result, zname, zclass = self.__session.handle()
  165. self.assertEqual(UPDATE_SUCCESS, result)
  166. self.assertEqual(TEST_ZONE_NAME, zname)
  167. self.assertEqual(TEST_RRCLASS, zclass)
  168. # Just checking these are different from the success code.
  169. self.assertNotEqual(UPDATE_ERROR, result)
  170. self.assertNotEqual(UPDATE_DROP, result)
  171. def test_broken_request(self):
  172. # Zone section is empty
  173. msg_data, msg = create_update_msg(zones=[])
  174. session = UpdateSession(msg, msg_data, TEST_CLIENT6, None)
  175. result, zname, zclass = session.handle()
  176. self.assertEqual(UPDATE_ERROR, result)
  177. self.assertEqual(None, zname)
  178. self.assertEqual(None, zclass)
  179. self.check_response(session.get_message(), Rcode.FORMERR())
  180. # Zone section contains multiple records
  181. msg_data, msg = create_update_msg(zones=[TEST_ZONE_RECORD,
  182. TEST_ZONE_RECORD])
  183. session = UpdateSession(msg, msg_data, TEST_CLIENT4, None)
  184. self.assertEqual(UPDATE_ERROR, session.handle()[0])
  185. self.check_response(session.get_message(), Rcode.FORMERR())
  186. # Zone section's type is not SOA
  187. msg_data, msg = create_update_msg(zones=[Question(TEST_ZONE_NAME,
  188. TEST_RRCLASS,
  189. RRType.A())])
  190. session = UpdateSession(msg, msg_data, TEST_CLIENT4, None)
  191. self.assertEqual(UPDATE_ERROR, session.handle()[0])
  192. self.check_response(session.get_message(), Rcode.FORMERR())
  193. def test_update_secondary(self):
  194. # specified zone is configured as a secondary. Since this
  195. # implementation doesn't support update forwarding, the result
  196. # should be NOTIMP.
  197. msg_data, msg = create_update_msg(zones=[Question(TEST_ZONE_NAME,
  198. TEST_RRCLASS,
  199. RRType.SOA())])
  200. session = UpdateSession(msg, msg_data, TEST_CLIENT4,
  201. ZoneConfig([(TEST_ZONE_NAME, TEST_RRCLASS)],
  202. TEST_RRCLASS,
  203. self.__datasrc_client))
  204. self.assertEqual(UPDATE_ERROR, session.handle()[0])
  205. self.check_response(session.get_message(), Rcode.NOTIMP())
  206. def check_notauth(self, zname, zclass=TEST_RRCLASS):
  207. '''Common test sequence for the 'notauth' test'''
  208. msg_data, msg = create_update_msg(zones=[Question(zname, zclass,
  209. RRType.SOA())])
  210. session = UpdateSession(msg, msg_data, TEST_CLIENT4,
  211. ZoneConfig([(TEST_ZONE_NAME, TEST_RRCLASS)],
  212. TEST_RRCLASS,
  213. self.__datasrc_client))
  214. self.assertEqual(UPDATE_ERROR, session.handle()[0])
  215. self.check_response(session.get_message(), Rcode.NOTAUTH())
  216. def test_update_notauth(self):
  217. '''Update attempt for non authoritative zones'''
  218. # zone name doesn't match
  219. self.check_notauth(Name('example.com'))
  220. # zone name is a subdomain of the actual authoritative zone
  221. # (match must be exact)
  222. self.check_notauth(Name('sub.example.org'))
  223. # zone class doesn't match
  224. self.check_notauth(Name('example.org'), RRClass.CH())
  225. def foreach_rr_in_rrset_helper(self, rr, l):
  226. l.append(rr.to_text())
  227. def test_foreach_rr_in_rrset(self):
  228. rrset = create_rrset("www.example.org", TEST_RRCLASS,
  229. RRType.A(), 3600, [ "192.0.2.1" ])
  230. l = []
  231. foreach_rr_in_rrset(rrset, self.foreach_rr_in_rrset_helper, rrset, l)
  232. self.assertEqual(["www.example.org. 3600 IN A 192.0.2.1\n"], l)
  233. add_rdata(rrset, "192.0.2.2")
  234. add_rdata(rrset, "192.0.2.3")
  235. # if the helper is called directly, the list should have
  236. # one entry, with a multiline string
  237. # but through the helper, there should be several 1-line entries
  238. l = []
  239. self.foreach_rr_in_rrset_helper(rrset, l)
  240. self.assertEqual(["www.example.org. 3600 IN A 192.0.2.1\n" +
  241. "www.example.org. 3600 IN A 192.0.2.2\n" +
  242. "www.example.org. 3600 IN A 192.0.2.3\n"
  243. ], l)
  244. # but through the helper, there should be several 1-line entries
  245. l = []
  246. foreach_rr_in_rrset(rrset, self.foreach_rr_in_rrset_helper, rrset, l)
  247. self.assertEqual(["www.example.org. 3600 IN A 192.0.2.1\n",
  248. "www.example.org. 3600 IN A 192.0.2.2\n",
  249. "www.example.org. 3600 IN A 192.0.2.3\n",
  250. ], l)
  251. def test_convert_rrset_class(self):
  252. # Converting an RRSET to a different class should work
  253. # if the rdata types can be converted
  254. rrset = create_rrset("www.example.org", RRClass.NONE(), RRType.A(),
  255. 3600, [ b'\xc0\x00\x02\x01', b'\xc0\x00\x02\x02'])
  256. rrset2 = convert_rrset_class(rrset, RRClass.IN())
  257. self.assertEqual("www.example.org. 3600 IN A 192.0.2.1\n" +
  258. "www.example.org. 3600 IN A 192.0.2.2\n",
  259. str(rrset2))
  260. rrset3 = convert_rrset_class(rrset2, RRClass.NONE())
  261. self.assertEqual("www.example.org. 3600 CLASS254 A \\# 4 " +
  262. "c0000201\nwww.example.org. 3600 CLASS254 " +
  263. "A \\# 4 c0000202\n",
  264. str(rrset3))
  265. # depending on what type of bad data is given, a number
  266. # of different exceptions could be raised (TODO: i recall
  267. # there was a ticket about making a better hierarchy for
  268. # dns/parsing related exceptions)
  269. self.assertRaises(InvalidRdataLength, convert_rrset_class,
  270. rrset, RRClass.CH())
  271. add_rdata(rrset, b'\xc0\x00')
  272. self.assertRaises(DNSMessageFORMERR, convert_rrset_class,
  273. rrset, RRClass.IN())
  274. def __prereq_helper(self, method, expected, rrset):
  275. '''Calls the given method with self.__datasrc_client
  276. and the given rrset, and compares the return value.
  277. Function does not do much but makes the code look nicer'''
  278. self.assertEqual(expected, method(rrset))
  279. def __check_prerequisite_exists_combined(self, method, rrclass, expected):
  280. '''shared code for the checks for the very similar (but reversed
  281. in behaviour) methods __prereq_rrset_exists and
  282. __prereq_rrset_does_not_exist.
  283. For rrset_exists, rrclass should be ANY, for rrset_does_not_exist,
  284. it should be NONE.
  285. '''
  286. # Basic existence checks
  287. # www.example.org should have an A, but not an MX
  288. rrset = create_rrset("www.example.org", rrclass, RRType.A(), 0)
  289. self.__prereq_helper(method, expected, rrset)
  290. rrset = create_rrset("www.example.org", rrclass, RRType.MX(), 0)
  291. self.__prereq_helper(method, not expected, rrset)
  292. # example.org should have an MX, but not an A
  293. rrset = create_rrset("example.org", rrclass, RRType.MX(), 0)
  294. self.__prereq_helper(method, expected, rrset)
  295. rrset = create_rrset("example.org", rrclass, RRType.A(), 0)
  296. self.__prereq_helper(method, not expected, rrset)
  297. # Also check the case where the name does not even exist
  298. rrset = create_rrset("doesnotexist.example.org", rrclass, RRType.A(), 0)
  299. self.__prereq_helper(method, not expected, rrset)
  300. # Wildcard expansion should not be applied, but literal matches
  301. # should work
  302. rrset = create_rrset("foo.wildcard.example.org", rrclass, RRType.A(), 0)
  303. self.__prereq_helper(method, not expected, rrset)
  304. rrset = create_rrset("*.wildcard.example.org", rrclass, RRType.A(), 0)
  305. self.__prereq_helper(method, expected, rrset)
  306. # Likewise, CNAME directly should match, but what it points to should
  307. # not
  308. rrset = create_rrset("cname.example.org", rrclass, RRType.A(), 0)
  309. self.__prereq_helper(method, not expected, rrset)
  310. rrset = create_rrset("cname.example.org", rrclass, RRType.CNAME(), 0)
  311. self.__prereq_helper(method, expected, rrset)
  312. # And also make sure a delegation (itself) is not treated as existing
  313. # data
  314. rrset = create_rrset("foo.sub.example.org", rrclass, RRType.A(), 0)
  315. self.__prereq_helper(method, not expected, rrset)
  316. # But the delegation data itself should match
  317. rrset = create_rrset("sub.example.org", rrclass, RRType.NS(), 0)
  318. self.__prereq_helper(method, expected, rrset)
  319. # As should glue
  320. rrset = create_rrset("ns.sub.example.org", rrclass, RRType.A(), 0)
  321. self.__prereq_helper(method, expected, rrset)
  322. def test_check_prerequisite_exists(self):
  323. method = self.__session._UpdateSession__prereq_rrset_exists
  324. self.__check_prerequisite_exists_combined(method,
  325. RRClass.ANY(),
  326. True)
  327. def test_check_prerequisite_does_not_exist(self):
  328. method = self.__session._UpdateSession__prereq_rrset_does_not_exist
  329. self.__check_prerequisite_exists_combined(method,
  330. RRClass.NONE(),
  331. False)
  332. def test_check_prerequisite_exists_value(self):
  333. method = self.__session._UpdateSession__prereq_rrset_exists_value
  334. rrset = create_rrset("www.example.org", RRClass.IN(), RRType.A(), 0)
  335. # empty one should not match
  336. self.__prereq_helper(method, False, rrset)
  337. # When the rdata is added, it should match
  338. add_rdata(rrset, "192.0.2.1")
  339. self.__prereq_helper(method, True, rrset)
  340. # But adding more should not
  341. add_rdata(rrset, "192.0.2.2")
  342. self.__prereq_helper(method, False, rrset)
  343. # Also test one with more than one RR
  344. rrset = create_rrset("example.org", RRClass.IN(), RRType.NS(), 0)
  345. self.__prereq_helper(method, False, rrset)
  346. add_rdata(rrset, "ns1.example.org.")
  347. self.__prereq_helper(method, False, rrset)
  348. add_rdata(rrset, "ns2.example.org")
  349. self.__prereq_helper(method, False, rrset)
  350. add_rdata(rrset, "ns3.example.org.")
  351. self.__prereq_helper(method, True, rrset)
  352. add_rdata(rrset, "ns4.example.org.")
  353. self.__prereq_helper(method, False, rrset)
  354. # Repeat that, but try a different order of Rdata addition
  355. rrset = create_rrset("example.org", RRClass.IN(), RRType.NS(), 0)
  356. self.__prereq_helper(method, False, rrset)
  357. add_rdata(rrset, "ns3.example.org.")
  358. self.__prereq_helper(method, False, rrset)
  359. add_rdata(rrset, "ns2.example.org.")
  360. self.__prereq_helper(method, False, rrset)
  361. add_rdata(rrset, "ns1.example.org.")
  362. self.__prereq_helper(method, True, rrset)
  363. add_rdata(rrset, "ns4.example.org.")
  364. self.__prereq_helper(method, False, rrset)
  365. # and test one where the name does not even exist
  366. rrset = create_rrset("doesnotexist.example.org", RRClass.IN(),
  367. RRType.A(), 0, [ "192.0.2.1" ])
  368. self.__prereq_helper(method, False, rrset)
  369. def __check_prerequisite_name_in_use_combined(self, method, rrclass,
  370. expected):
  371. '''shared code for the checks for the very similar (but reversed
  372. in behaviour) methods __prereq_name_in_use and
  373. __prereq_name_not_in_use
  374. '''
  375. rrset = create_rrset("example.org", rrclass, RRType.ANY(), 0)
  376. self.__prereq_helper(method, expected, rrset)
  377. rrset = create_rrset("www.example.org", rrclass, RRType.ANY(), 0)
  378. self.__prereq_helper(method, expected, rrset)
  379. rrset = create_rrset("doesnotexist.example.org", rrclass,
  380. RRType.ANY(), 0)
  381. self.__prereq_helper(method, not expected, rrset)
  382. rrset = create_rrset("belowdelegation.sub.example.org", rrclass,
  383. RRType.ANY(), 0)
  384. self.__prereq_helper(method, not expected, rrset)
  385. rrset = create_rrset("foo.wildcard.example.org", rrclass,
  386. RRType.ANY(), 0)
  387. self.__prereq_helper(method, not expected, rrset)
  388. # empty nonterminal should not match
  389. rrset = create_rrset("nonterminal.example.org", rrclass,
  390. RRType.ANY(), 0)
  391. self.__prereq_helper(method, not expected, rrset)
  392. rrset = create_rrset("empty.nonterminal.example.org", rrclass,
  393. RRType.ANY(), 0)
  394. self.__prereq_helper(method, expected, rrset)
  395. def test_check_prerequisite_name_in_use(self):
  396. method = self.__session._UpdateSession__prereq_name_in_use
  397. self.__check_prerequisite_name_in_use_combined(method,
  398. RRClass.ANY(),
  399. True)
  400. def test_check_prerequisite_name_not_in_use(self):
  401. method = self.__session._UpdateSession__prereq_name_not_in_use
  402. self.__check_prerequisite_name_in_use_combined(method,
  403. RRClass.NONE(),
  404. False)
  405. def check_prerequisite_result(self, expected, prerequisites):
  406. '''Helper method for checking the result of a prerequisite check;
  407. creates an update session, and fills it with the list of rrsets
  408. from 'prerequisites'. Then checks if __check_prerequisites()
  409. returns the Rcode specified in 'expected'.'''
  410. msg_data, msg = create_update_msg([TEST_ZONE_RECORD],
  411. prerequisites)
  412. zconfig = ZoneConfig([], TEST_RRCLASS, self.__datasrc_client)
  413. session = UpdateSession(msg, msg_data, TEST_CLIENT4, zconfig)
  414. session._UpdateSession__get_update_zone()
  415. # compare the to_text output of the rcodes (nicer error messages)
  416. # This call itself should also be done by handle(),
  417. # but just for better failures, it is first called on its own
  418. self.assertEqual(expected.to_text(),
  419. session._UpdateSession__check_prerequisites().to_text())
  420. # Now see if handle finds the same result
  421. (result, _, _) = session.handle()
  422. self.assertEqual(expected,
  423. session._UpdateSession__message.get_rcode())
  424. # And that the result looks right
  425. if expected == Rcode.NOERROR():
  426. self.assertEqual(UPDATE_SUCCESS, result)
  427. else:
  428. self.assertEqual(UPDATE_ERROR, result)
  429. def check_prescan_result(self, expected, updates, expected_soa = None):
  430. '''Helper method for checking the result of a prerequisite check;
  431. creates an update session, and fills it with the list of rrsets
  432. from 'updates'. Then checks if __do_prescan()
  433. returns the Rcode specified in 'expected'.'''
  434. msg_data, msg = create_update_msg([TEST_ZONE_RECORD],
  435. [], updates)
  436. zconfig = ZoneConfig([], TEST_RRCLASS, self.__datasrc_client)
  437. session = UpdateSession(msg, msg_data, TEST_CLIENT4, zconfig)
  438. session._UpdateSession__get_update_zone()
  439. # compare the to_text output of the rcodes (nicer error messages)
  440. # This call itself should also be done by handle(),
  441. # but just for better failures, it is first called on its own
  442. self.assertEqual(expected.to_text(),
  443. session._UpdateSession__do_prescan().to_text())
  444. # If there is an expected soa, check it
  445. self.assertEqual(str(expected_soa),
  446. str(session._UpdateSession__added_soa))
  447. def check_full_handle_result(self, expected, updates):
  448. '''Helper method for checking the result of a full handle;
  449. creates an update session, and fills it with the list of rrsets
  450. from 'updates'. Then checks if __handle()
  451. results in a response with rcode 'expected'.'''
  452. msg_data, msg = create_update_msg([TEST_ZONE_RECORD],
  453. [], updates)
  454. zconfig = ZoneConfig([], TEST_RRCLASS, self.__datasrc_client)
  455. session = UpdateSession(msg, msg_data, TEST_CLIENT4, zconfig)
  456. # Now see if handle finds the same result
  457. (result, _, _) = session.handle()
  458. self.assertEqual(expected.to_text(),
  459. session._UpdateSession__message.get_rcode().to_text())
  460. # And that the result looks right
  461. if expected == Rcode.NOERROR():
  462. self.assertEqual(UPDATE_SUCCESS, result)
  463. else:
  464. self.assertEqual(UPDATE_ERROR, result)
  465. def test_check_prerequisites(self):
  466. # This test checks if the actual prerequisite-type-specific
  467. # methods are called.
  468. # It does test all types of prerequisites, but it does not test
  469. # every possible result for those types (those are tested above,
  470. # in the specific prerequisite type tests)
  471. # Let's first define a number of prereq's that should succeed
  472. rrset_exists_yes = create_rrset("example.org", RRClass.ANY(),
  473. RRType.SOA(), 0)
  474. rrset_exists_value_yes = create_rrset("www.example.org", RRClass.IN(),
  475. RRType.A(), 0, [ "192.0.2.1" ])
  476. rrset_does_not_exist_yes = create_rrset("foo.example.org",
  477. RRClass.NONE(), RRType.SOA(),
  478. 0)
  479. name_in_use_yes = create_rrset("www.example.org", RRClass.ANY(),
  480. RRType.ANY(), 0)
  481. name_not_in_use_yes = create_rrset("foo.example.org", RRClass.NONE(),
  482. RRType.ANY(), 0)
  483. rrset_exists_value_1 = create_rrset("example.org", RRClass.IN(),
  484. RRType.NS(), 0,
  485. [ "ns1.example.org" ])
  486. rrset_exists_value_2 = create_rrset("example.org", RRClass.IN(),
  487. RRType.NS(), 0,
  488. [ "ns2.example.org" ])
  489. rrset_exists_value_3 = create_rrset("example.org", RRClass.IN(),
  490. RRType.NS(), 0,
  491. [ "ns3.example.org" ])
  492. # and a number that should not
  493. rrset_exists_no = create_rrset("foo.example.org", RRClass.ANY(),
  494. RRType.SOA(), 0)
  495. rrset_exists_value_no = create_rrset("www.example.org", RRClass.IN(),
  496. RRType.A(), 0, [ "192.0.2.2" ])
  497. rrset_does_not_exist_no = create_rrset("example.org", RRClass.NONE(),
  498. RRType.SOA(), 0)
  499. name_in_use_no = create_rrset("foo.example.org", RRClass.ANY(),
  500. RRType.ANY(), 0)
  501. name_not_in_use_no = create_rrset("www.example.org", RRClass.NONE(),
  502. RRType.ANY(), 0)
  503. # Create an UPDATE with all 5 'yes' prereqs
  504. data, update = create_update_msg([TEST_ZONE_RECORD],
  505. [
  506. rrset_exists_yes,
  507. rrset_does_not_exist_yes,
  508. name_in_use_yes,
  509. name_not_in_use_yes,
  510. rrset_exists_value_yes,
  511. ])
  512. # check 'no' result codes
  513. self.check_prerequisite_result(Rcode.NXRRSET(),
  514. [ rrset_exists_no ])
  515. self.check_prerequisite_result(Rcode.NXRRSET(),
  516. [ rrset_exists_value_no ])
  517. self.check_prerequisite_result(Rcode.YXRRSET(),
  518. [ rrset_does_not_exist_no ])
  519. self.check_prerequisite_result(Rcode.NXDOMAIN(),
  520. [ name_in_use_no ])
  521. self.check_prerequisite_result(Rcode.YXDOMAIN(),
  522. [ name_not_in_use_no ])
  523. # the 'yes' codes should result in ok
  524. self.check_prerequisite_result(Rcode.NOERROR(),
  525. [ rrset_exists_yes,
  526. rrset_exists_value_yes,
  527. rrset_does_not_exist_yes,
  528. name_in_use_yes,
  529. name_not_in_use_yes,
  530. rrset_exists_value_1,
  531. rrset_exists_value_2,
  532. rrset_exists_value_3])
  533. # try out a permutation, note that one rrset is split up,
  534. # and the order of the RRs should not matter
  535. self.check_prerequisite_result(Rcode.NOERROR(),
  536. [ rrset_exists_value_3,
  537. rrset_exists_yes,
  538. rrset_exists_value_2,
  539. name_in_use_yes,
  540. rrset_exists_value_1])
  541. # Should fail on the first error, even if most of the
  542. # prerequisites are ok
  543. self.check_prerequisite_result(Rcode.NXDOMAIN(),
  544. [ rrset_exists_value_3,
  545. rrset_exists_yes,
  546. rrset_exists_value_2,
  547. name_in_use_yes,
  548. name_in_use_no,
  549. rrset_exists_value_1])
  550. def test_prerequisite_notzone(self):
  551. rrset = create_rrset("some.other.zone.", RRClass.ANY(), RRType.SOA(), 0)
  552. self.check_prerequisite_result(Rcode.NOTZONE(), [ rrset ])
  553. def test_prerequisites_formerr(self):
  554. # test for form errors in the prerequisite section
  555. # Class ANY, non-zero TTL
  556. rrset = create_rrset("example.org", RRClass.ANY(), RRType.SOA(), 1)
  557. self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])
  558. # Class ANY, but with rdata
  559. rrset = create_rrset("example.org", RRClass.ANY(), RRType.A(), 0,
  560. [ b'\x00\x00\x00\x00' ])
  561. self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])
  562. # Class NONE, non-zero TTL
  563. rrset = create_rrset("example.org", RRClass.NONE(), RRType.SOA(), 1)
  564. self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])
  565. # Class NONE, but with rdata
  566. rrset = create_rrset("example.org", RRClass.NONE(), RRType.A(), 0,
  567. [ b'\x00\x00\x00\x00' ])
  568. self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])
  569. # Matching class and type, but non-zero TTL
  570. rrset = create_rrset("www.example.org", RRClass.IN(), RRType.A(), 1,
  571. [ "192.0.2.1" ])
  572. self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])
  573. # Completely different class
  574. rrset = create_rrset("example.org", RRClass.CH(), RRType.TXT(), 0,
  575. [ "foo" ])
  576. self.check_prerequisite_result(Rcode.FORMERR(), [ rrset ])
  577. def __prereq_helper(self, method, expected, rrset):
  578. '''Calls the given method with self.__datasrc_client
  579. and the given rrset, and compares the return value.
  580. Function does not do much but makes the code look nicer'''
  581. self.assertEqual(expected, method(rrset))
  582. def initialize_update_rrsets(self):
  583. '''Prepare a number of RRsets to be used in several update tests
  584. The rrsets are stored in self'''
  585. orig_a_rrset = create_rrset("www.example.org", TEST_RRCLASS,
  586. RRType.A(), 3600, [ "192.0.2.1" ])
  587. self.orig_a_rrset = orig_a_rrset
  588. rrset_update_a = create_rrset("www.example.org", TEST_RRCLASS,
  589. RRType.A(), 3600,
  590. [ "192.0.2.2", "192.0.2.3" ])
  591. self.rrset_update_a = rrset_update_a
  592. rrset_update_soa = create_rrset("example.org", TEST_RRCLASS,
  593. RRType.SOA(), 3600,
  594. [ "ns1.example.org. " +
  595. "admin.example.org. " +
  596. "1233 3600 1800 2419200 7200" ])
  597. self.rrset_update_soa = rrset_update_soa
  598. rrset_update_soa_del = create_rrset("example.org", RRClass.NONE(),
  599. RRType.SOA(), 0,
  600. [ "ns1.example.org. " +
  601. "admin.example.org. " +
  602. "1233 3600 1800 2419200 7200" ])
  603. self.rrset_update_soa_del = rrset_update_soa_del
  604. rrset_update_soa2 = create_rrset("example.org", TEST_RRCLASS,
  605. RRType.SOA(), 3600,
  606. [ "ns1.example.org. " +
  607. "admin.example.org. " +
  608. "4000 3600 1800 2419200 7200" ])
  609. self.rrset_update_soa2 = rrset_update_soa2
  610. rrset_update_del_name = create_rrset("www.example.org", RRClass.ANY(),
  611. RRType.ANY(), 0)
  612. self.rrset_update_del_name = rrset_update_del_name
  613. rrset_update_del_name_apex = create_rrset("example.org", RRClass.ANY(),
  614. RRType.ANY(), 0)
  615. self.rrset_update_del_name_apex = rrset_update_del_name_apex
  616. rrset_update_del_rrset = create_rrset("www.example.org", RRClass.ANY(),
  617. RRType.A(), 0)
  618. self.rrset_update_del_rrset = rrset_update_del_rrset
  619. rrset_update_del_mx_apex = create_rrset("example.org", RRClass.ANY(),
  620. RRType.MX(), 0)
  621. self.rrset_update_del_mx_apex = rrset_update_del_mx_apex
  622. rrset_update_del_soa_apex = create_rrset("example.org", RRClass.ANY(),
  623. RRType.SOA(), 0)
  624. self.rrset_update_del_soa_apex = rrset_update_del_soa_apex
  625. rrset_update_del_ns_apex = create_rrset("example.org", RRClass.ANY(),
  626. RRType.NS(), 0)
  627. self.rrset_update_del_ns_apex = rrset_update_del_ns_apex
  628. rrset_update_del_rrset_part = create_rrset("www.example.org",
  629. RRClass.NONE(), RRType.A(),
  630. 0,
  631. [ b'\xc0\x00\x02\x02',
  632. b'\xc0\x00\x02\x03' ])
  633. self.rrset_update_del_rrset_part = rrset_update_del_rrset_part
  634. rrset_update_del_rrset_ns = create_rrset("example.org", RRClass.NONE(),
  635. RRType.NS(), 0,
  636. [ b'\x03ns1\x07example\x03org\x00',
  637. b'\x03ns2\x07example\x03org\x00',
  638. b'\x03ns3\x07example\x03org\x00' ])
  639. self.rrset_update_del_rrset_ns = rrset_update_del_rrset_ns
  640. rrset_update_del_rrset_mx = create_rrset("example.org", RRClass.NONE(),
  641. RRType.MX(), 0,
  642. [ b'\x00\x0a\x04mail\x07example\x03org\x00' ])
  643. self.rrset_update_del_rrset_mx = rrset_update_del_rrset_mx
  644. def test_prescan(self):
  645. '''Test whether the prescan succeeds on data that is ok, and whether
  646. if notices the SOA if present'''
  647. # prepare a set of correct update statements
  648. self.initialize_update_rrsets()
  649. self.check_prescan_result(Rcode.NOERROR(), [ self.rrset_update_a ])
  650. # check if soa is noticed
  651. self.check_prescan_result(Rcode.NOERROR(), [ self.rrset_update_soa ],
  652. self.rrset_update_soa)
  653. # Other types of succesful prechecks
  654. self.check_prescan_result(Rcode.NOERROR(), [ self.rrset_update_soa2 ],
  655. self.rrset_update_soa2)
  656. self.check_prescan_result(Rcode.NOERROR(),
  657. [ self.rrset_update_del_name ])
  658. self.check_prescan_result(Rcode.NOERROR(),
  659. [ self.rrset_update_del_name_apex ])
  660. self.check_prescan_result(Rcode.NOERROR(),
  661. [ self.rrset_update_del_rrset ])
  662. self.check_prescan_result(Rcode.NOERROR(),
  663. [ self.rrset_update_del_mx_apex ])
  664. self.check_prescan_result(Rcode.NOERROR(),
  665. [ self.rrset_update_del_rrset_part ])
  666. # and check a few permutations of the above
  667. # all of them (with one of the soas)
  668. self.check_prescan_result(Rcode.NOERROR(),
  669. [
  670. self.rrset_update_a,
  671. self.rrset_update_soa,
  672. self.rrset_update_del_name,
  673. self.rrset_update_del_name_apex,
  674. self.rrset_update_del_rrset,
  675. self.rrset_update_del_mx_apex,
  676. self.rrset_update_del_rrset_part
  677. ],
  678. self.rrset_update_soa)
  679. # Two soas. Should we reject or simply use the last?
  680. # (RFC is not really explicit on this, but between the lines I read
  681. # use the last)
  682. self.check_prescan_result(Rcode.NOERROR(),
  683. [ self.rrset_update_soa,
  684. self.rrset_update_soa2 ],
  685. self.rrset_update_soa2)
  686. self.check_prescan_result(Rcode.NOERROR(),
  687. [ self.rrset_update_soa2,
  688. self.rrset_update_soa ],
  689. self.rrset_update_soa)
  690. self.check_prescan_result(Rcode.NOERROR(),
  691. [
  692. self.rrset_update_del_mx_apex,
  693. self.rrset_update_del_name,
  694. self.rrset_update_del_name_apex,
  695. self.rrset_update_del_rrset_part,
  696. self.rrset_update_a,
  697. self.rrset_update_del_rrset,
  698. self.rrset_update_soa
  699. ],
  700. self.rrset_update_soa)
  701. def test_prescan_failures(self):
  702. '''Test whether prescan fails on bad data'''
  703. # out of zone data
  704. rrset = create_rrset("different.zone", RRClass.ANY(), RRType.TXT(), 0)
  705. self.check_prescan_result(Rcode.NOTZONE(), [ rrset ])
  706. # forbidden type, zone class
  707. rrset = create_rrset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.ANY(), 0,
  708. [ b'\x00' ])
  709. self.check_prescan_result(Rcode.FORMERR(), [ rrset ])
  710. # non-zero TTL, class ANY
  711. rrset = create_rrset(TEST_ZONE_NAME, RRClass.ANY(), RRType.TXT(), 1)
  712. self.check_prescan_result(Rcode.FORMERR(), [ rrset ])
  713. # non-zero Rdata, class ANY
  714. rrset = create_rrset(TEST_ZONE_NAME, RRClass.ANY(), RRType.TXT(), 0,
  715. [ "foo" ])
  716. self.check_prescan_result(Rcode.FORMERR(), [ rrset ])
  717. # forbidden type, class ANY
  718. rrset = create_rrset(TEST_ZONE_NAME, RRClass.ANY(), RRType.AXFR(), 0,
  719. [ b'\x00' ])
  720. self.check_prescan_result(Rcode.FORMERR(), [ rrset ])
  721. # non-zero TTL, class NONE
  722. rrset = create_rrset(TEST_ZONE_NAME, RRClass.NONE(), RRType.TXT(), 1)
  723. self.check_prescan_result(Rcode.FORMERR(), [ rrset ])
  724. # forbidden type, class NONE
  725. rrset = create_rrset(TEST_ZONE_NAME, RRClass.NONE(), RRType.AXFR(), 0,
  726. [ b'\x00' ])
  727. self.check_prescan_result(Rcode.FORMERR(), [ rrset ])
  728. def check_inzone_data(self, expected_result, name, rrtype,
  729. expected_rrset = None):
  730. '''Does a find on TEST_ZONE for the given rrset's name and type,
  731. then checks if the result matches the expected result.
  732. If so, and if expected_rrset is given, they are compared as
  733. well.'''
  734. _, finder = self.__datasrc_client.find_zone(TEST_ZONE_NAME)
  735. result, found_rrset, _ = finder.find(name, rrtype,
  736. finder.NO_WILDCARD |
  737. finder.FIND_GLUE_OK)
  738. self.assertEqual(expected_result, result)
  739. # Sigh. Need rrsets.compare() again.
  740. # To be sure, compare name, class, type, and ttl
  741. if expected_rrset is not None:
  742. self.assertEqual(expected_rrset.get_name(), found_rrset.get_name())
  743. self.assertEqual(expected_rrset.get_class(), found_rrset.get_class())
  744. self.assertEqual(expected_rrset.get_type(), found_rrset.get_type())
  745. self.assertEqual(expected_rrset.get_ttl().to_text(),
  746. found_rrset.get_ttl().to_text())
  747. expected_rdata =\
  748. [ rdata.to_text() for rdata in expected_rrset.get_rdata() ]
  749. found_rdata =\
  750. [ rdata.to_text() for rdata in found_rrset.get_rdata() ]
  751. expected_rdata.sort()
  752. found_rdata.sort()
  753. self.assertEqual(expected_rdata, found_rdata)
  754. def test_update_add_delete_rrset(self):
  755. '''
  756. Tests a sequence of related add and delete updates. Some other
  757. cases are tested by later tests.
  758. '''
  759. self.initialize_update_rrsets()
  760. # initially, the www should only contain one rr
  761. # (set to self.orig_a_rrset)
  762. # during this test, we will extend it at some point
  763. extended_a_rrset = create_rrset("www.example.org", TEST_RRCLASS,
  764. RRType.A(), 3600,
  765. [ "192.0.2.1",
  766. "192.0.2.2",
  767. "192.0.2.3" ])
  768. # Sanity check, make sure original data is really there before updates
  769. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  770. isc.dns.Name("www.example.org"),
  771. RRType.A(),
  772. self.orig_a_rrset)
  773. # Add two rrs
  774. self.check_full_handle_result(Rcode.NOERROR(), [ self.rrset_update_a ])
  775. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  776. isc.dns.Name("www.example.org"),
  777. RRType.A(),
  778. extended_a_rrset)
  779. # Adding the same RRsets should not make a difference.
  780. self.check_full_handle_result(Rcode.NOERROR(), [ self.rrset_update_a ])
  781. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  782. isc.dns.Name("www.example.org"),
  783. RRType.A(),
  784. extended_a_rrset)
  785. # Now delete those two, and we should end up with the original RRset
  786. self.check_full_handle_result(Rcode.NOERROR(),
  787. [ self.rrset_update_del_rrset_part ])
  788. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  789. isc.dns.Name("www.example.org"),
  790. RRType.A(),
  791. self.orig_a_rrset)
  792. # 'Deleting' them again should make no difference
  793. self.check_full_handle_result(Rcode.NOERROR(),
  794. [ self.rrset_update_del_rrset_part ])
  795. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  796. isc.dns.Name("www.example.org"),
  797. RRType.A(),
  798. self.orig_a_rrset)
  799. # But deleting the entire rrset, independent of its contents, should
  800. # work
  801. self.check_full_handle_result(Rcode.NOERROR(),
  802. [ self.rrset_update_del_rrset ])
  803. self.check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
  804. isc.dns.Name("www.example.org"),
  805. RRType.A())
  806. # Check that if we update the SOA, it is updated to our value
  807. self.check_full_handle_result(Rcode.NOERROR(),
  808. [ self.rrset_update_soa2 ])
  809. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  810. isc.dns.Name("example.org"),
  811. RRType.SOA(),
  812. self.rrset_update_soa2)
  813. def test_update_add_new_data(self):
  814. '''
  815. This tests adds data where none is present
  816. '''
  817. # Add data at a completely new name
  818. self.check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
  819. isc.dns.Name("new.example.org"),
  820. RRType.A())
  821. rrset = create_rrset("new.example.org", TEST_RRCLASS, RRType.A(),
  822. 3600, [ "192.0.2.1", "192.0.2.2" ])
  823. self.check_full_handle_result(Rcode.NOERROR(), [ rrset ])
  824. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  825. isc.dns.Name("new.example.org"),
  826. RRType.A(),
  827. rrset)
  828. # Also try a name where data is present, but none of this
  829. # specific type
  830. self.check_inzone_data(isc.datasrc.ZoneFinder.NXRRSET,
  831. isc.dns.Name("new.example.org"),
  832. RRType.TXT())
  833. rrset = create_rrset("new.example.org", TEST_RRCLASS, RRType.TXT(),
  834. 3600, [ "foo" ])
  835. self.check_full_handle_result(Rcode.NOERROR(), [ rrset ])
  836. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  837. isc.dns.Name("new.example.org"),
  838. RRType.TXT(),
  839. rrset)
  840. def test_update_delete_name(self):
  841. self.initialize_update_rrsets()
  842. # First check it is there
  843. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  844. isc.dns.Name("www.example.org"),
  845. RRType.A())
  846. # Delete the entire name
  847. self.check_full_handle_result(Rcode.NOERROR(),
  848. [ self.rrset_update_del_name ])
  849. self.check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
  850. isc.dns.Name("www.example.org"),
  851. RRType.A())
  852. # Should still be gone after pointless second delete
  853. self.check_full_handle_result(Rcode.NOERROR(),
  854. [ self.rrset_update_del_name ])
  855. self.check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
  856. isc.dns.Name("www.example.org"),
  857. RRType.A())
  858. def test_update_apex_special_cases(self):
  859. '''
  860. Tests a few special cases when deleting data from the apex
  861. '''
  862. self.initialize_update_rrsets()
  863. # the original SOA
  864. orig_soa_rrset = create_rrset("example.org", TEST_RRCLASS,
  865. RRType.SOA(), 3600,
  866. [ "ns1.example.org. " +
  867. "admin.example.org. " +
  868. "1234 3600 1800 2419200 7200" ])
  869. # At some point, the SOA SERIAL will be auto-incremented
  870. incremented_soa_rrset_01 = create_rrset("example.org", TEST_RRCLASS,
  871. RRType.SOA(), 3600, ["ns1.example.org. " +
  872. "admin.example.org. " +
  873. "1235 3600 1800 2419200 7200" ])
  874. incremented_soa_rrset_02 = create_rrset("example.org", TEST_RRCLASS,
  875. RRType.SOA(), 3600, ["ns1.example.org. " +
  876. "admin.example.org. " +
  877. "1236 3600 1800 2419200 7200" ])
  878. # We will delete some of the NS records
  879. orig_ns_rrset = create_rrset("example.org", TEST_RRCLASS,
  880. RRType.NS(), 3600,
  881. [ "ns1.example.org.",
  882. "ns2.example.org.",
  883. "ns3.example.org." ])
  884. # When we are done, we should have a reduced NS rrset
  885. short_ns_rrset = create_rrset("example.org", TEST_RRCLASS,
  886. RRType.NS(), 3600,
  887. [ "ns3.example.org." ])
  888. # Sanity check, make sure original data is really there before updates
  889. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  890. isc.dns.Name("example.org"),
  891. RRType.NS(),
  892. orig_ns_rrset)
  893. # We will delete the MX record later in this test, so let's make
  894. # sure that it exists (we do not care about its value)
  895. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  896. isc.dns.Name("example.org"),
  897. RRType.MX())
  898. # Check that we cannot delete the SOA record by direction deletion
  899. # both by name+type and by full rrset
  900. self.check_full_handle_result(Rcode.NOERROR(),
  901. [ self.rrset_update_del_soa_apex,
  902. self.rrset_update_soa_del ])
  903. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  904. isc.dns.Name("example.org"),
  905. RRType.SOA(),
  906. incremented_soa_rrset_01)
  907. # If we delete everything at the apex, the SOA and NS rrsets should be
  908. # untouched (but serial will be incremented)
  909. self.check_full_handle_result(Rcode.NOERROR(),
  910. [ self.rrset_update_del_name_apex ])
  911. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  912. isc.dns.Name("example.org"),
  913. RRType.SOA(),
  914. incremented_soa_rrset_02)
  915. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  916. isc.dns.Name("example.org"),
  917. RRType.NS(),
  918. orig_ns_rrset)
  919. # but the MX should be gone
  920. self.check_inzone_data(isc.datasrc.ZoneFinder.NXRRSET,
  921. isc.dns.Name("example.org"),
  922. RRType.MX())
  923. # Deleting the NS rrset by name and type only, it should also be left
  924. # untouched
  925. self.check_full_handle_result(Rcode.NOERROR(),
  926. [ self.rrset_update_del_ns_apex ])
  927. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  928. isc.dns.Name("example.org"),
  929. RRType.NS(),
  930. orig_ns_rrset)
  931. # If we delete the NS at the apex specifically, it should still
  932. # keep one record
  933. self.check_full_handle_result(Rcode.NOERROR(),
  934. [ self.rrset_update_del_rrset_ns ])
  935. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  936. isc.dns.Name("example.org"),
  937. RRType.NS(),
  938. short_ns_rrset)
  939. def test_update_delete_normal_rrset_at_apex(self):
  940. '''
  941. Tests a number of 'normal rrset' deletes at the apex
  942. '''
  943. # MX should simply be deleted
  944. self.initialize_update_rrsets()
  945. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  946. isc.dns.Name("example.org"),
  947. RRType.MX())
  948. self.check_full_handle_result(Rcode.NOERROR(),
  949. [ self.rrset_update_del_rrset_mx ])
  950. self.check_inzone_data(isc.datasrc.ZoneFinder.NXRRSET,
  951. isc.dns.Name("example.org"),
  952. RRType.MX())
  953. def test_update_cname_special_cases(self):
  954. self.initialize_update_rrsets()
  955. # Sanity check
  956. orig_cname_rrset = create_rrset("cname.example.org", TEST_RRCLASS,
  957. RRType.CNAME(), 3600,
  958. [ "www.example.org." ])
  959. self.check_inzone_data(isc.datasrc.ZoneFinder.CNAME,
  960. isc.dns.Name("cname.example.org"),
  961. RRType.A(),
  962. orig_cname_rrset)
  963. # If we try to add data where a cname is preset
  964. rrset = create_rrset("cname.example.org", TEST_RRCLASS, RRType.A(),
  965. 3600, [ "192.0.2.1" ])
  966. self.check_full_handle_result(Rcode.NOERROR(), [ rrset ])
  967. self.check_inzone_data(isc.datasrc.ZoneFinder.CNAME,
  968. isc.dns.Name("cname.example.org"),
  969. RRType.A(),
  970. orig_cname_rrset)
  971. # But updating the cname itself should work
  972. new_cname_rrset = create_rrset("cname.example.org", TEST_RRCLASS,
  973. RRType.CNAME(), 3600,
  974. [ "mail.example.org." ])
  975. self.check_full_handle_result(Rcode.NOERROR(), [ new_cname_rrset ])
  976. self.check_inzone_data(isc.datasrc.ZoneFinder.CNAME,
  977. isc.dns.Name("cname.example.org"),
  978. RRType.A(),
  979. new_cname_rrset)
  980. self.initialize_update_rrsets()
  981. # Likewise, adding a cname where other data is
  982. # present should do nothing either
  983. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  984. isc.dns.Name("www.example.org"),
  985. RRType.A(),
  986. self.orig_a_rrset)
  987. new_cname_rrset = create_rrset("www.example.org", TEST_RRCLASS,
  988. RRType.CNAME(), 3600,
  989. [ "mail.example.org." ])
  990. self.check_full_handle_result(Rcode.NOERROR(), [ new_cname_rrset ])
  991. self.check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  992. isc.dns.Name("www.example.org"),
  993. RRType.A(),
  994. self.orig_a_rrset)
  995. def test_update_bad_class(self):
  996. rrset = create_rrset("example.org.", RRClass.CH(), RRType.TXT(), 0,
  997. [ "foo" ])
  998. self.check_full_handle_result(Rcode.FORMERR(), [ rrset ])
  999. def test_uncaught_exception(self):
  1000. def my_exc():
  1001. raise Exception("foo")
  1002. self.__session._UpdateSession__update_soa = my_exc
  1003. self.assertEqual(Rcode.SERVFAIL().to_text(),
  1004. self.__session._UpdateSession__do_update().to_text())
  1005. if __name__ == "__main__":
  1006. isc.log.init("bind10")
  1007. isc.log.resetUnitTestRootLogger()
  1008. unittest.main()