session_tests.py 74 KB


  1. # Copyright (C) 2012 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import os
  16. import shutil
  17. import isc.log
  18. import unittest
  19. from isc.dns import *
  20. from isc.datasrc import DataSourceClient, ZoneFinder
  21. from isc.ddns.session import *
  22. from isc.ddns.zone_config import *
  23. # Some common test parameters
  24. TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
  25. READ_ZONE_DB_FILE = TESTDATA_PATH + "rwtest.sqlite3" # original, to be copied
  26. TESTDATA_WRITE_PATH = os.environ['TESTDATA_WRITE_PATH'] + os.sep
  27. WRITE_ZONE_DB_FILE = TESTDATA_WRITE_PATH + "rwtest.sqlite3.copied"
  28. WRITE_ZONE_DB_CONFIG = "{ \"database_file\": \"" + WRITE_ZONE_DB_FILE + "\"}"
  29. TEST_ZONE_NAME = Name('example.org')
  30. UPDATE_RRTYPE = RRType.SOA
  31. TEST_RRCLASS = RRClass.IN
  32. TEST_ZONE_RECORD = Question(TEST_ZONE_NAME, TEST_RRCLASS, UPDATE_RRTYPE)
  33. TEST_CLIENT6 = ('2001:db8::1', 53, 0, 0)
  34. TEST_CLIENT4 = ('192.0.2.1', 53)
  35. # TSIG key for tests when needed. The key name is TEST_ZONE_NAME.
  36. TEST_TSIG_KEY = TSIGKey("example.org:SFuWd/q99SzF8Yzd1QbB9g==")
  37. def create_update_msg(zones=[TEST_ZONE_RECORD], prerequisites=[],
  38. updates=[], tsig_key=None):
  39. msg = Message(Message.RENDER)
  40. msg.set_qid(5353) # arbitrary chosen
  41. msg.set_opcode(Opcode.UPDATE)
  42. msg.set_rcode(Rcode.NOERROR)
  43. for z in zones:
  44. msg.add_question(z)
  45. for p in prerequisites:
  46. msg.add_rrset(SECTION_PREREQUISITE, p)
  47. for u in updates:
  48. msg.add_rrset(SECTION_UPDATE, u)
  49. renderer = MessageRenderer()
  50. if tsig_key is not None:
  51. msg.to_wire(renderer, TSIGContext(tsig_key))
  52. else:
  53. msg.to_wire(renderer)
  54. # re-read the created data in the parse mode
  55. msg.clear(Message.PARSE)
  56. msg.from_wire(renderer.get_data(), Message.PRESERVE_ORDER)
  57. return msg
  58. def add_rdata(rrset, rdata):
  59. '''
  60. Helper function for easily adding Rdata fields to RRsets.
  61. This function assumes the given rdata is of type string or bytes,
  62. and corresponds to the given rrset
  63. '''
  64. rrset.add_rdata(isc.dns.Rdata(rrset.get_type(),
  65. rrset.get_class(),
  66. rdata))
  67. def create_rrset(name, rrclass, rrtype, ttl, rdatas = []):
  68. '''
  69. Helper method to easily create RRsets, auto-converts
  70. name, rrclass, rrtype, and ttl (if possibly through their
  71. respective constructors)
  72. rdatas is a list of rr data strings, or bytestrings, which
  73. should match the RRType of the rrset to create
  74. '''
  75. if type(name) != Name:
  76. name = Name(name)
  77. if type(rrclass) != RRClass:
  78. rrclass = RRClass(rrclass)
  79. if type(rrtype) != RRType:
  80. rrtype = RRType(rrtype)
  81. if type(ttl) != RRTTL:
  82. ttl = RRTTL(ttl)
  83. rrset = isc.dns.RRset(name, rrclass, rrtype, ttl)
  84. for rdata in rdatas:
  85. add_rdata(rrset, rdata)
  86. return rrset
  87. class SessionModuleTests(unittest.TestCase):
  88. '''Tests for module-level functions in the session.py module'''
  89. def test_foreach_rr_in_rrset(self):
  90. rrset = create_rrset("www.example.org", TEST_RRCLASS,
  91. RRType.A, 3600, [ "192.0.2.1" ])
  92. l = []
  93. for rr in foreach_rr(rrset):
  94. l.append(str(rr))
  95. self.assertEqual(["www.example.org. 3600 IN A 192.0.2.1\n"], l)
  96. add_rdata(rrset, "192.0.2.2")
  97. add_rdata(rrset, "192.0.2.3")
  98. # but through the generator, there should be several 1-line entries
  99. l = []
  100. for rr in foreach_rr(rrset):
  101. l.append(str(rr))
  102. self.assertEqual(["www.example.org. 3600 IN A 192.0.2.1\n",
  103. "www.example.org. 3600 IN A 192.0.2.2\n",
  104. "www.example.org. 3600 IN A 192.0.2.3\n",
  105. ], l)
  106. def test_convert_rrset_class(self):
  107. # Converting an RRSET to a different class should work
  108. # if the rdata types can be converted
  109. rrset = create_rrset("www.example.org", RRClass.NONE, RRType.A,
  110. 3600, [ b'\xc0\x00\x02\x01', b'\xc0\x00\x02\x02'])
  111. rrset2 = convert_rrset_class(rrset, RRClass.IN)
  112. self.assertEqual("www.example.org. 3600 IN A 192.0.2.1\n" +
  113. "www.example.org. 3600 IN A 192.0.2.2\n",
  114. str(rrset2))
  115. rrset3 = convert_rrset_class(rrset2, RRClass.NONE)
  116. self.assertEqual("www.example.org. 3600 NONE A \\# 4 " +
  117. "c0000201\nwww.example.org. 3600 NONE " +
  118. "A \\# 4 c0000202\n",
  119. str(rrset3))
  120. # depending on what type of bad data is given, a number
  121. # of different exceptions could be raised (TODO: i recall
  122. # there was a ticket about making a better hierarchy for
  123. # dns/parsing related exceptions)
  124. self.assertRaises(InvalidRdataLength, convert_rrset_class,
  125. rrset, RRClass.CH)
  126. add_rdata(rrset, b'\xc0\x00')
  127. self.assertRaises(DNSMessageFORMERR, convert_rrset_class,
  128. rrset, RRClass.IN)
  129. def test_collect_rrsets(self):
  130. '''
  131. Tests the 'rrset collector' method, which collects rrsets
  132. with the same name and type
  133. '''
  134. collected = []
  135. collect_rrsets(collected, create_rrset("a.example.org", RRClass.IN,
  136. RRType.A, 0, [ "192.0.2.1" ]))
  137. # Same name and class, different type
  138. collect_rrsets(collected, create_rrset("a.example.org", RRClass.IN,
  139. RRType.TXT, 0, [ "one" ]))
  140. collect_rrsets(collected, create_rrset("a.example.org", RRClass.IN,
  141. RRType.A, 0, [ "192.0.2.2" ]))
  142. collect_rrsets(collected, create_rrset("a.example.org", RRClass.IN,
  143. RRType.TXT, 0, [ "two" ]))
  144. # Same class and type as an existing one, different name
  145. collect_rrsets(collected, create_rrset("b.example.org", RRClass.IN,
  146. RRType.A, 0, [ "192.0.2.3" ]))
  147. # Same name and type as an existing one, different class
  148. collect_rrsets(collected, create_rrset("a.example.org", RRClass.CH,
  149. RRType.TXT, 0, [ "one" ]))
  150. collect_rrsets(collected, create_rrset("b.example.org", RRClass.IN,
  151. RRType.A, 0, [ "192.0.2.4" ]))
  152. collect_rrsets(collected, create_rrset("a.example.org", RRClass.CH,
  153. RRType.TXT, 0, [ "two" ]))
  154. strings = [ rrset.to_text() for rrset in collected ]
  155. # note + vs , in this list
  156. expected = ['a.example.org. 0 IN A 192.0.2.1\n' +
  157. 'a.example.org. 0 IN A 192.0.2.2\n',
  158. 'a.example.org. 0 IN TXT "one"\n' +
  159. 'a.example.org. 0 IN TXT "two"\n',
  160. 'b.example.org. 0 IN A 192.0.2.3\n' +
  161. 'b.example.org. 0 IN A 192.0.2.4\n',
  162. 'a.example.org. 0 CH TXT "one"\n' +
  163. 'a.example.org. 0 CH TXT "two"\n']
  164. self.assertEqual(expected, strings)
  165. class SessionTestBase(unittest.TestCase):
  166. '''Base class for all sesion related tests.
  167. It just initializes common test parameters in its setUp() and defines
  168. some common utility method(s).
  169. '''
  170. def setUp(self):
  171. shutil.copyfile(READ_ZONE_DB_FILE, WRITE_ZONE_DB_FILE)
  172. self._datasrc_client = DataSourceClient("sqlite3",
  173. WRITE_ZONE_DB_CONFIG)
  174. self._update_msg = create_update_msg()
  175. self._acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
  176. REQUEST_LOADER.load([{"action": "ACCEPT"}])}
  177. self._session = UpdateSession(self._update_msg, TEST_CLIENT4,
  178. ZoneConfig(set(), TEST_RRCLASS,
  179. self._datasrc_client,
  180. self._acl_map))
  181. self._session._get_update_zone()
  182. self._session._create_diff()
  183. def tearDown(self):
  184. # With the Updater created in _get_update_zone, and tests
  185. # doing all kinds of crazy stuff, one might get database locked
  186. # errors if it doesn't clean up explicitly after each test
  187. self._session = None
  188. def check_response(self, msg, expected_rcode):
  189. '''Perform common checks on update resposne message.'''
  190. self.assertTrue(msg.get_header_flag(Message.HEADERFLAG_QR))
  191. # note: we convert opcode to text it'd be more helpful on failure.
  192. self.assertEqual(Opcode.UPDATE.to_text(), msg.get_opcode().to_text())
  193. self.assertEqual(expected_rcode.to_text(), msg.get_rcode().to_text())
  194. # All sections should be cleared
  195. self.assertEqual(0, msg.get_rr_count(SECTION_ZONE))
  196. self.assertEqual(0, msg.get_rr_count(SECTION_PREREQUISITE))
  197. self.assertEqual(0, msg.get_rr_count(SECTION_UPDATE))
  198. self.assertEqual(0, msg.get_rr_count(Message.SECTION_ADDITIONAL))
  199. class TestDDNSSOA(unittest.TestCase):
  200. '''unittest for the DDNS_SOA'''
  201. def test_update_soa(self):
  202. '''unittest for update_soa function'''
  203. soa_update = DDNS_SOA()
  204. soa_rr = create_rrset("example.org", TEST_RRCLASS,
  205. RRType.SOA, 3600, ["ns1.example.org. " +
  206. "admin.example.org. " +
  207. "1233 3600 1800 2419200 7200"])
  208. expected_soa_rr = create_rrset("example.org", TEST_RRCLASS,
  209. RRType.SOA, 3600, ["ns1.example.org. "
  210. + "admin.example.org. " +
  211. "1234 3600 1800 2419200 7200"])
  212. self.assertEqual(soa_update.update_soa(soa_rr).get_rdata()[0].to_text(),
  213. expected_soa_rr.get_rdata()[0].to_text())
  214. max_serial = 2 ** 32 - 1
  215. soa_rdata = "%d %s"%(max_serial,"3600 1800 2419200 7200")
  216. soa_rr = create_rrset("example.org", TEST_RRCLASS, RRType.SOA, 3600,
  217. ["ns1.example.org. " + "admin.example.org. " +
  218. soa_rdata])
  219. expected_soa_rr = create_rrset("example.org", TEST_RRCLASS,
  220. RRType.SOA, 3600, ["ns1.example.org. "
  221. + "admin.example.org. " +
  222. "1 3600 1800 2419200 7200"])
  223. self.assertEqual(soa_update.update_soa(soa_rr).get_rdata()[0].to_text(),
  224. expected_soa_rr.get_rdata()[0].to_text())
  225. def test_soa_update_check(self):
  226. '''unittest for soa_update_check function'''
  227. small_soa_rr = create_rrset("example.org", TEST_RRCLASS, RRType.SOA,
  228. 3600, ["ns1.example.org. " +
  229. "admin.example.org. " +
  230. "1233 3600 1800 2419200 7200"])
  231. large_soa_rr = create_rrset("example.org", TEST_RRCLASS, RRType.SOA,
  232. 3600, ["ns1.example.org. " +
  233. "admin.example.org. " +
  234. "1234 3600 1800 2419200 7200"])
  235. soa_update = DDNS_SOA()
  236. # The case of (i1 < i2 and i2 - i1 < 2^(SERIAL_BITS - 1)) in rfc 1982
  237. self.assertTrue(soa_update.soa_update_check(small_soa_rr,
  238. large_soa_rr))
  239. self.assertFalse(soa_update.soa_update_check(large_soa_rr,
  240. small_soa_rr))
  241. small_serial = 1235 + 2 ** 31
  242. soa_rdata = "%d %s"%(small_serial,"3600 1800 2419200 7200")
  243. small_soa_rr = create_rrset("example.org", TEST_RRCLASS, RRType.SOA,
  244. 3600, ["ns1.example.org. " +
  245. "admin.example.org. " +
  246. soa_rdata])
  247. large_soa_rr = create_rrset("example.org", TEST_RRCLASS, RRType.SOA,
  248. 3600, ["ns1.example.org. " +
  249. "admin.example.org. " +
  250. "1234 3600 1800 2419200 7200"])
  251. # The case of (i1 > i2 and i1 - i2 > 2^(SERIAL_BITS - 1)) in rfc 1982
  252. self.assertTrue(soa_update.soa_update_check(small_soa_rr,
  253. large_soa_rr))
  254. self.assertFalse(soa_update.soa_update_check(large_soa_rr,
  255. small_soa_rr))
  256. class SessionTest(SessionTestBase):
  257. '''Basic session tests'''
  258. def test_handle(self):
  259. '''Basic update case'''
  260. result, zname, zclass = self._session.handle()
  261. self.assertEqual(UPDATE_SUCCESS, result)
  262. self.assertEqual(TEST_ZONE_NAME, zname)
  263. self.assertEqual(TEST_RRCLASS, zclass)
  264. # Just checking these are different from the success code.
  265. self.assertNotEqual(UPDATE_ERROR, result)
  266. self.assertNotEqual(UPDATE_DROP, result)
  267. def test_broken_request(self):
  268. # Zone section is empty
  269. msg = create_update_msg(zones=[])
  270. session = UpdateSession(msg, TEST_CLIENT6, None)
  271. result, zname, zclass = session.handle()
  272. self.assertEqual(UPDATE_ERROR, result)
  273. self.assertEqual(None, zname)
  274. self.assertEqual(None, zclass)
  275. self.check_response(session.get_message(), Rcode.FORMERR)
  276. # Zone section contains multiple records
  277. msg = create_update_msg(zones=[TEST_ZONE_RECORD, TEST_ZONE_RECORD])
  278. session = UpdateSession(msg, TEST_CLIENT4, None)
  279. self.assertEqual(UPDATE_ERROR, session.handle()[0])
  280. self.check_response(session.get_message(), Rcode.FORMERR)
  281. # Zone section's type is not SOA
  282. msg = create_update_msg(zones=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
  283. RRType.A)])
  284. session = UpdateSession(msg, TEST_CLIENT4, None)
  285. self.assertEqual(UPDATE_ERROR, session.handle()[0])
  286. self.check_response(session.get_message(), Rcode.FORMERR)
  287. def test_update_secondary(self):
  288. # specified zone is configured as a secondary. Since this
  289. # implementation doesn't support update forwarding, the result
  290. # should be NOTIMP.
  291. msg = create_update_msg(zones=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
  292. RRType.SOA)])
  293. session = UpdateSession(msg, TEST_CLIENT4,
  294. ZoneConfig({(TEST_ZONE_NAME, TEST_RRCLASS)},
  295. TEST_RRCLASS, self._datasrc_client))
  296. self.assertEqual(UPDATE_ERROR, session.handle()[0])
  297. self.check_response(session.get_message(), Rcode.NOTIMP)
  298. def check_notauth(self, zname, zclass=TEST_RRCLASS):
  299. '''Common test sequence for the 'notauth' test'''
  300. msg = create_update_msg(zones=[Question(zname, zclass, RRType.SOA)])
  301. session = UpdateSession(msg, TEST_CLIENT4,
  302. ZoneConfig({(TEST_ZONE_NAME, TEST_RRCLASS)},
  303. TEST_RRCLASS, self._datasrc_client))
  304. self.assertEqual(UPDATE_ERROR, session.handle()[0])
  305. self.check_response(session.get_message(), Rcode.NOTAUTH)
  306. def test_update_notauth(self):
  307. '''Update attempt for non authoritative zones'''
  308. # zone name doesn't match
  309. self.check_notauth(Name('example.com'))
  310. # zone name is a subdomain of the actual authoritative zone
  311. # (match must be exact)
  312. self.check_notauth(Name('sub.example.org'))
  313. # zone class doesn't match
  314. self.check_notauth(Name('example.org'), RRClass.CH)
  315. def test_update_datasrc_error(self):
  316. # if the data source client raises an exception, it should result in
  317. # a SERVFAIL.
  318. class BadDataSourceClient:
  319. def find_zone(self, name):
  320. raise isc.datasrc.Error('faked exception')
  321. msg = create_update_msg(zones=[Question(TEST_ZONE_NAME, TEST_RRCLASS,
  322. RRType.SOA)])
  323. session = UpdateSession(msg, TEST_CLIENT4,
  324. ZoneConfig({(TEST_ZONE_NAME, TEST_RRCLASS)},
  325. TEST_RRCLASS,
  326. BadDataSourceClient()))
  327. self.assertEqual(UPDATE_ERROR, session.handle()[0])
  328. self.check_response(session.get_message(), Rcode.SERVFAIL)
  329. def test_foreach_rr_in_rrset(self):
  330. rrset = create_rrset("www.example.org", TEST_RRCLASS,
  331. RRType.A, 3600, [ "192.0.2.1" ])
  332. l = []
  333. for rr in foreach_rr(rrset):
  334. l.append(str(rr))
  335. self.assertEqual(["www.example.org. 3600 IN A 192.0.2.1\n"], l)
  336. add_rdata(rrset, "192.0.2.2")
  337. add_rdata(rrset, "192.0.2.3")
  338. # but through the generator, there should be several 1-line entries
  339. l = []
  340. for rr in foreach_rr(rrset):
  341. l.append(str(rr))
  342. self.assertEqual(["www.example.org. 3600 IN A 192.0.2.1\n",
  343. "www.example.org. 3600 IN A 192.0.2.2\n",
  344. "www.example.org. 3600 IN A 192.0.2.3\n",
  345. ], l)
  346. def test_convert_rrset_class(self):
  347. # Converting an RRSET to a different class should work
  348. # if the rdata types can be converted
  349. rrset = create_rrset("www.example.org", RRClass.NONE, RRType.A,
  350. 3600, [ b'\xc0\x00\x02\x01', b'\xc0\x00\x02\x02'])
  351. rrset2 = convert_rrset_class(rrset, RRClass.IN)
  352. self.assertEqual("www.example.org. 3600 IN A 192.0.2.1\n" +
  353. "www.example.org. 3600 IN A 192.0.2.2\n",
  354. str(rrset2))
  355. rrset3 = convert_rrset_class(rrset2, RRClass.NONE)
  356. self.assertEqual("www.example.org. 3600 NONE A \\# 4 " +
  357. "c0000201\nwww.example.org. 3600 NONE " +
  358. "A \\# 4 c0000202\n",
  359. str(rrset3))
  360. # depending on what type of bad data is given, a number
  361. # of different exceptions could be raised (TODO: i recall
  362. # there was a ticket about making a better hierarchy for
  363. # dns/parsing related exceptions)
  364. self.assertRaises(InvalidRdataLength, convert_rrset_class,
  365. rrset, RRClass.CH)
  366. add_rdata(rrset, b'\xc0\x00')
  367. self.assertRaises(DNSMessageFORMERR, convert_rrset_class,
  368. rrset, RRClass.IN)
  369. def test_collect_rrsets(self):
  370. '''
  371. Tests the 'rrset collector' method, which collects rrsets
  372. with the same name and type
  373. '''
  374. collected = []
  375. collect_rrsets(collected, create_rrset("a.example.org", RRClass.IN,
  376. RRType.A, 0, [ "192.0.2.1" ]))
  377. # Same name and class, different type
  378. collect_rrsets(collected, create_rrset("a.example.org", RRClass.IN,
  379. RRType.TXT, 0, [ "one" ]))
  380. collect_rrsets(collected, create_rrset("a.example.org", RRClass.IN,
  381. RRType.A, 0, [ "192.0.2.2" ]))
  382. collect_rrsets(collected, create_rrset("a.example.org", RRClass.IN,
  383. RRType.TXT, 0, [ "two" ]))
  384. # Same class and type as an existing one, different name
  385. collect_rrsets(collected, create_rrset("b.example.org", RRClass.IN,
  386. RRType.A, 0, [ "192.0.2.3" ]))
  387. # Same name and type as an existing one, different class
  388. collect_rrsets(collected, create_rrset("a.example.org", RRClass.CH,
  389. RRType.TXT, 0, [ "one" ]))
  390. collect_rrsets(collected, create_rrset("b.example.org", RRClass.IN,
  391. RRType.A, 0, [ "192.0.2.4" ]))
  392. collect_rrsets(collected, create_rrset("a.example.org", RRClass.CH,
  393. RRType.TXT, 0, [ "two" ]))
  394. strings = [ rrset.to_text() for rrset in collected ]
  395. # note + vs , in this list
  396. expected = ['a.example.org. 0 IN A 192.0.2.1\n' +
  397. 'a.example.org. 0 IN A 192.0.2.2\n',
  398. 'a.example.org. 0 IN TXT "one"\n' +
  399. 'a.example.org. 0 IN TXT "two"\n',
  400. 'b.example.org. 0 IN A 192.0.2.3\n' +
  401. 'b.example.org. 0 IN A 192.0.2.4\n',
  402. 'a.example.org. 0 CH TXT "one"\n' +
  403. 'a.example.org. 0 CH TXT "two"\n']
  404. self.assertEqual(expected, strings)
  405. def __prereq_helper(self, method, expected, rrset):
  406. '''Calls the given method with self._datasrc_client
  407. and the given rrset, and compares the return value.
  408. Function does not do much but makes the code look nicer'''
  409. self.assertEqual(expected, method(rrset))
  410. def __check_prerequisite_exists_combined(self, method, rrclass, expected):
  411. '''shared code for the checks for the very similar (but reversed
  412. in behaviour) methods __prereq_rrset_exists and
  413. __prereq_rrset_does_not_exist.
  414. For rrset_exists, rrclass should be ANY, for rrset_does_not_exist,
  415. it should be NONE.
  416. '''
  417. # Basic existence checks
  418. # www.example.org should have an A, but not an MX
  419. rrset = create_rrset("www.example.org", rrclass, RRType.A, 0)
  420. self.__prereq_helper(method, expected, rrset)
  421. rrset = create_rrset("www.example.org", rrclass, RRType.MX, 0)
  422. self.__prereq_helper(method, not expected, rrset)
  423. # example.org should have an MX, but not an A
  424. rrset = create_rrset("example.org", rrclass, RRType.MX, 0)
  425. self.__prereq_helper(method, expected, rrset)
  426. rrset = create_rrset("example.org", rrclass, RRType.A, 0)
  427. self.__prereq_helper(method, not expected, rrset)
  428. # Also check the case where the name does not even exist
  429. rrset = create_rrset("doesnotexist.example.org", rrclass, RRType.A, 0)
  430. self.__prereq_helper(method, not expected, rrset)
  431. # Wildcard expansion should not be applied, but literal matches
  432. # should work
  433. rrset = create_rrset("foo.wildcard.example.org", rrclass, RRType.A, 0)
  434. self.__prereq_helper(method, not expected, rrset)
  435. rrset = create_rrset("*.wildcard.example.org", rrclass, RRType.A, 0)
  436. self.__prereq_helper(method, expected, rrset)
  437. # Likewise, CNAME directly should match, but what it points to should
  438. # not
  439. rrset = create_rrset("cname.example.org", rrclass, RRType.A, 0)
  440. self.__prereq_helper(method, not expected, rrset)
  441. rrset = create_rrset("cname.example.org", rrclass, RRType.CNAME, 0)
  442. self.__prereq_helper(method, expected, rrset)
  443. # And also make sure a delegation (itself) is not treated as existing
  444. # data
  445. rrset = create_rrset("foo.sub.example.org", rrclass, RRType.A, 0)
  446. self.__prereq_helper(method, not expected, rrset)
  447. # But the delegation data itself should match
  448. rrset = create_rrset("sub.example.org", rrclass, RRType.NS, 0)
  449. self.__prereq_helper(method, expected, rrset)
  450. # As should glue
  451. rrset = create_rrset("ns.sub.example.org", rrclass, RRType.A, 0)
  452. self.__prereq_helper(method, expected, rrset)
  453. def test_check_prerequisite_exists(self):
  454. method = self._session._UpdateSession__prereq_rrset_exists
  455. self.__check_prerequisite_exists_combined(method,
  456. RRClass.ANY,
  457. True)
  458. def test_check_prerequisite_does_not_exist(self):
  459. method = self._session._UpdateSession__prereq_rrset_does_not_exist
  460. self.__check_prerequisite_exists_combined(method,
  461. RRClass.NONE,
  462. False)
  463. def test_check_prerequisite_exists_value(self):
  464. method = self._session._UpdateSession__prereq_rrset_exists_value
  465. rrset = create_rrset("www.example.org", RRClass.IN, RRType.A, 0)
  466. # empty one should not match
  467. self.__prereq_helper(method, False, rrset)
  468. # When the rdata is added, it should match
  469. add_rdata(rrset, "192.0.2.1")
  470. self.__prereq_helper(method, True, rrset)
  471. # But adding more should not
  472. add_rdata(rrset, "192.0.2.2")
  473. self.__prereq_helper(method, False, rrset)
  474. # Also test one with more than one RR
  475. rrset = create_rrset("example.org", RRClass.IN, RRType.NS, 0)
  476. self.__prereq_helper(method, False, rrset)
  477. add_rdata(rrset, "ns1.example.org.")
  478. self.__prereq_helper(method, False, rrset)
  479. add_rdata(rrset, "ns2.example.org.")
  480. self.__prereq_helper(method, False, rrset)
  481. add_rdata(rrset, "ns3.example.org.")
  482. self.__prereq_helper(method, True, rrset)
  483. add_rdata(rrset, "ns4.example.org.")
  484. self.__prereq_helper(method, False, rrset)
  485. # Repeat that, but try a different order of Rdata addition
  486. rrset = create_rrset("example.org", RRClass.IN, RRType.NS, 0)
  487. self.__prereq_helper(method, False, rrset)
  488. add_rdata(rrset, "ns3.example.org.")
  489. self.__prereq_helper(method, False, rrset)
  490. add_rdata(rrset, "ns2.example.org.")
  491. self.__prereq_helper(method, False, rrset)
  492. add_rdata(rrset, "ns1.example.org.")
  493. self.__prereq_helper(method, True, rrset)
  494. add_rdata(rrset, "ns4.example.org.")
  495. self.__prereq_helper(method, False, rrset)
  496. # and test one where the name does not even exist
  497. rrset = create_rrset("doesnotexist.example.org", RRClass.IN,
  498. RRType.A, 0, [ "192.0.2.1" ])
  499. self.__prereq_helper(method, False, rrset)
  500. def __check_prerequisite_name_in_use_combined(self, method, rrclass,
  501. expected):
  502. '''shared code for the checks for the very similar (but reversed
  503. in behaviour) methods __prereq_name_in_use and
  504. __prereq_name_not_in_use
  505. '''
  506. rrset = create_rrset("example.org", rrclass, RRType.ANY, 0)
  507. self.__prereq_helper(method, expected, rrset)
  508. rrset = create_rrset("www.example.org", rrclass, RRType.ANY, 0)
  509. self.__prereq_helper(method, expected, rrset)
  510. rrset = create_rrset("doesnotexist.example.org", rrclass,
  511. RRType.ANY, 0)
  512. self.__prereq_helper(method, not expected, rrset)
  513. rrset = create_rrset("belowdelegation.sub.example.org", rrclass,
  514. RRType.ANY, 0)
  515. self.__prereq_helper(method, not expected, rrset)
  516. rrset = create_rrset("foo.wildcard.example.org", rrclass,
  517. RRType.ANY, 0)
  518. self.__prereq_helper(method, not expected, rrset)
  519. # empty nonterminal should not match
  520. rrset = create_rrset("nonterminal.example.org", rrclass,
  521. RRType.ANY, 0)
  522. self.__prereq_helper(method, not expected, rrset)
  523. rrset = create_rrset("empty.nonterminal.example.org", rrclass,
  524. RRType.ANY, 0)
  525. self.__prereq_helper(method, expected, rrset)
  526. def test_check_prerequisite_name_in_use(self):
  527. method = self._session._UpdateSession__prereq_name_in_use
  528. self.__check_prerequisite_name_in_use_combined(method,
  529. RRClass.ANY,
  530. True)
  531. def test_check_prerequisite_name_not_in_use(self):
  532. method = self._session._UpdateSession__prereq_name_not_in_use
  533. self.__check_prerequisite_name_in_use_combined(method,
  534. RRClass.NONE,
  535. False)
  536. def check_prerequisite_result(self, expected, prerequisites):
  537. '''Helper method for checking the result of a prerequisite check;
  538. creates an update session, and fills it with the list of rrsets
  539. from 'prerequisites'. Then checks if __check_prerequisites()
  540. returns the Rcode specified in 'expected'.'''
  541. msg = create_update_msg([TEST_ZONE_RECORD], prerequisites)
  542. zconfig = ZoneConfig(set(), TEST_RRCLASS, self._datasrc_client,
  543. self._acl_map)
  544. session = UpdateSession(msg, TEST_CLIENT4, zconfig)
  545. session._get_update_zone()
  546. session._create_diff()
  547. # compare the to_text output of the rcodes (nicer error messages)
  548. # This call itself should also be done by handle(),
  549. # but just for better failures, it is first called on its own
  550. self.assertEqual(expected.to_text(),
  551. session._UpdateSession__check_prerequisites().to_text())
  552. # Now see if handle finds the same result
  553. (result, _, _) = session.handle()
  554. self.assertEqual(expected.to_text(),
  555. session._UpdateSession__message.get_rcode().to_text())
  556. # And that the result looks right
  557. if expected == Rcode.NOERROR:
  558. self.assertEqual(UPDATE_SUCCESS, result)
  559. else:
  560. self.assertEqual(UPDATE_ERROR, result)
  561. def check_prescan_result(self, expected, updates, expected_soa = None):
  562. '''Helper method for checking the result of a prerequisite check;
  563. creates an update session, and fills it with the list of rrsets
  564. from 'updates'. Then checks if __do_prescan()
  565. returns the Rcode specified in 'expected'.'''
  566. msg = create_update_msg([TEST_ZONE_RECORD], [], updates)
  567. zconfig = ZoneConfig(set(), TEST_RRCLASS, self._datasrc_client,
  568. self._acl_map)
  569. session = UpdateSession(msg, TEST_CLIENT4, zconfig)
  570. session._get_update_zone()
  571. session._create_diff()
  572. # compare the to_text output of the rcodes (nicer error messages)
  573. # This call itself should also be done by handle(),
  574. # but just for better failures, it is first called on its own
  575. self.assertEqual(expected.to_text(),
  576. session._UpdateSession__do_prescan().to_text())
  577. # If there is an expected soa, check it
  578. self.assertEqual(str(expected_soa),
  579. str(session._UpdateSession__added_soa))
  580. def check_full_handle_result(self, expected, updates, prerequisites=[]):
  581. '''Helper method for checking the result of a full handle;
  582. creates an update session, and fills it with the list of rrsets
  583. from 'updates'. Then checks if __handle()
  584. results in a response with rcode 'expected'.'''
  585. msg = create_update_msg([TEST_ZONE_RECORD], prerequisites, updates)
  586. zconfig = ZoneConfig(set(), TEST_RRCLASS, self._datasrc_client,
  587. self._acl_map)
  588. session = UpdateSession(msg, TEST_CLIENT4, zconfig)
  589. # Now see if handle finds the same result
  590. (result, _, _) = session.handle()
  591. self.assertEqual(expected.to_text(),
  592. session._UpdateSession__message.get_rcode().to_text())
  593. # And that the result looks right
  594. if expected == Rcode.NOERROR:
  595. self.assertEqual(UPDATE_SUCCESS, result)
  596. else:
  597. self.assertEqual(UPDATE_ERROR, result)
  598. def test_check_prerequisites(self):
  599. # This test checks if the actual prerequisite-type-specific
  600. # methods are called.
  601. # It does test all types of prerequisites, but it does not test
  602. # every possible result for those types (those are tested above,
  603. # in the specific prerequisite type tests)
  604. # Let's first define a number of prereq's that should succeed
  605. rrset_exists_yes = create_rrset("example.org", RRClass.ANY,
  606. RRType.SOA, 0)
  607. rrset_exists_value_yes = create_rrset("www.example.org", RRClass.IN,
  608. RRType.A, 0, [ "192.0.2.1" ])
  609. rrset_does_not_exist_yes = create_rrset("foo.example.org",
  610. RRClass.NONE, RRType.SOA,
  611. 0)
  612. name_in_use_yes = create_rrset("www.example.org", RRClass.ANY,
  613. RRType.ANY, 0)
  614. name_not_in_use_yes = create_rrset("foo.example.org", RRClass.NONE,
  615. RRType.ANY, 0)
  616. rrset_exists_value_1 = create_rrset("example.org", RRClass.IN,
  617. RRType.NS, 0, ["ns1.example.org."])
  618. rrset_exists_value_2 = create_rrset("example.org", RRClass.IN,
  619. RRType.NS, 0, ["ns2.example.org."])
  620. rrset_exists_value_3 = create_rrset("example.org", RRClass.IN,
  621. RRType.NS, 0, ["ns3.example.org."])
  622. # and a number that should not
  623. rrset_exists_no = create_rrset("foo.example.org", RRClass.ANY,
  624. RRType.SOA, 0)
  625. rrset_exists_value_no = create_rrset("www.example.org", RRClass.IN,
  626. RRType.A, 0, [ "192.0.2.2" ])
  627. rrset_does_not_exist_no = create_rrset("example.org", RRClass.NONE,
  628. RRType.SOA, 0)
  629. name_in_use_no = create_rrset("foo.example.org", RRClass.ANY,
  630. RRType.ANY, 0)
  631. name_not_in_use_no = create_rrset("www.example.org", RRClass.NONE,
  632. RRType.ANY, 0)
  633. # check 'no' result codes
  634. self.check_prerequisite_result(Rcode.NXRRSET,
  635. [ rrset_exists_no ])
  636. self.check_prerequisite_result(Rcode.NXRRSET,
  637. [ rrset_exists_value_no ])
  638. self.check_prerequisite_result(Rcode.YXRRSET,
  639. [ rrset_does_not_exist_no ])
  640. self.check_prerequisite_result(Rcode.NXDOMAIN,
  641. [ name_in_use_no ])
  642. self.check_prerequisite_result(Rcode.YXDOMAIN,
  643. [ name_not_in_use_no ])
  644. # the 'yes' codes should result in ok
  645. # individually
  646. self.check_prerequisite_result(Rcode.NOERROR,
  647. [ rrset_exists_yes ] )
  648. self.check_prerequisite_result(Rcode.NOERROR,
  649. [ rrset_exists_value_yes ])
  650. self.check_prerequisite_result(Rcode.NOERROR,
  651. [ rrset_does_not_exist_yes ])
  652. self.check_prerequisite_result(Rcode.NOERROR,
  653. [ name_in_use_yes ])
  654. self.check_prerequisite_result(Rcode.NOERROR,
  655. [ name_not_in_use_yes ])
  656. self.check_prerequisite_result(Rcode.NOERROR,
  657. [ rrset_exists_value_1,
  658. rrset_exists_value_2,
  659. rrset_exists_value_3])
  660. # and together
  661. self.check_prerequisite_result(Rcode.NOERROR,
  662. [ rrset_exists_yes,
  663. rrset_exists_value_yes,
  664. rrset_does_not_exist_yes,
  665. name_in_use_yes,
  666. name_not_in_use_yes,
  667. rrset_exists_value_1,
  668. rrset_exists_value_2,
  669. rrset_exists_value_3])
  670. # try out a permutation, note that one rrset is split up,
  671. # and the order of the RRs should not matter
  672. self.check_prerequisite_result(Rcode.NOERROR,
  673. [ rrset_exists_value_3,
  674. rrset_exists_yes,
  675. rrset_exists_value_2,
  676. name_in_use_yes,
  677. rrset_exists_value_1])
  678. # Should fail on the first error, even if most of the
  679. # prerequisites are ok
  680. self.check_prerequisite_result(Rcode.NXDOMAIN,
  681. [ rrset_exists_value_3,
  682. rrset_exists_yes,
  683. rrset_exists_value_2,
  684. name_in_use_yes,
  685. name_in_use_no,
  686. rrset_exists_value_1])
  687. def test_prerequisite_notzone(self):
  688. rrset = create_rrset("some.other.zone.", RRClass.ANY, RRType.SOA, 0)
  689. self.check_prerequisite_result(Rcode.NOTZONE, [ rrset ])
  690. def test_prerequisites_formerr(self):
  691. # test for form errors in the prerequisite section
  692. # Class ANY, non-zero TTL
  693. rrset = create_rrset("example.org", RRClass.ANY, RRType.SOA, 1)
  694. self.check_prerequisite_result(Rcode.FORMERR, [ rrset ])
  695. # Class ANY, but with rdata
  696. rrset = create_rrset("example.org", RRClass.ANY, RRType.A, 0,
  697. [ b'\x00\x00\x00\x00' ])
  698. self.check_prerequisite_result(Rcode.FORMERR, [ rrset ])
  699. # Class NONE, non-zero TTL
  700. rrset = create_rrset("example.org", RRClass.NONE, RRType.SOA, 1)
  701. self.check_prerequisite_result(Rcode.FORMERR, [ rrset ])
  702. # Class NONE, but with rdata
  703. rrset = create_rrset("example.org", RRClass.NONE, RRType.A, 0,
  704. [ b'\x00\x00\x00\x00' ])
  705. self.check_prerequisite_result(Rcode.FORMERR, [ rrset ])
  706. # Matching class and type, but non-zero TTL
  707. rrset = create_rrset("www.example.org", RRClass.IN, RRType.A, 1,
  708. [ "192.0.2.1" ])
  709. self.check_prerequisite_result(Rcode.FORMERR, [ rrset ])
  710. # Completely different class
  711. rrset = create_rrset("example.org", RRClass.CH, RRType.TXT, 0,
  712. [ "foo" ])
  713. self.check_prerequisite_result(Rcode.FORMERR, [ rrset ])
  714. def __prereq_helper(self, method, expected, rrset):
  715. '''Calls the given method with self._datasrc_client
  716. and the given rrset, and compares the return value.
  717. Function does not do much but makes the code look nicer'''
  718. self.assertEqual(expected, method(rrset))
  719. def __initialize_update_rrsets(self):
  720. '''Prepare a number of RRsets to be used in several update tests
  721. The rrsets are stored in self'''
  722. orig_a_rrset = create_rrset("www.example.org", TEST_RRCLASS,
  723. RRType.A, 3600, [ "192.0.2.1" ])
  724. self.orig_a_rrset = orig_a_rrset
  725. rrset_update_a = create_rrset("www.example.org", TEST_RRCLASS,
  726. RRType.A, 3600,
  727. [ "192.0.2.2", "192.0.2.3" ])
  728. self.rrset_update_a = rrset_update_a
  729. rrset_update_soa = create_rrset("example.org", TEST_RRCLASS,
  730. RRType.SOA, 3600,
  731. [ "ns1.example.org. " +
  732. "admin.example.org. " +
  733. "1233 3600 1800 2419200 7200" ])
  734. self.rrset_update_soa = rrset_update_soa
  735. rrset_update_soa_del = create_rrset("example.org", RRClass.NONE,
  736. RRType.SOA, 0,
  737. [ "ns1.example.org. " +
  738. "admin.example.org. " +
  739. "1233 3600 1800 2419200 7200" ])
  740. self.rrset_update_soa_del = rrset_update_soa_del
  741. rrset_update_soa2 = create_rrset("example.org", TEST_RRCLASS,
  742. RRType.SOA, 3600,
  743. [ "ns1.example.org. " +
  744. "admin.example.org. " +
  745. "4000 3600 1800 2419200 7200" ])
  746. self.rrset_update_soa2 = rrset_update_soa2
  747. rrset_update_del_name = create_rrset("www.example.org", RRClass.ANY,
  748. RRType.ANY, 0)
  749. self.rrset_update_del_name = rrset_update_del_name
  750. rrset_update_del_name_apex = create_rrset("example.org", RRClass.ANY,
  751. RRType.ANY, 0)
  752. self.rrset_update_del_name_apex = rrset_update_del_name_apex
  753. rrset_update_del_rrset = create_rrset("www.example.org", RRClass.ANY,
  754. RRType.A, 0)
  755. self.rrset_update_del_rrset = rrset_update_del_rrset
  756. rrset_update_del_mx_apex = create_rrset("example.org", RRClass.ANY,
  757. RRType.MX, 0)
  758. self.rrset_update_del_mx_apex = rrset_update_del_mx_apex
  759. rrset_update_del_soa_apex = create_rrset("example.org", RRClass.ANY,
  760. RRType.SOA, 0)
  761. self.rrset_update_del_soa_apex = rrset_update_del_soa_apex
  762. rrset_update_del_ns_apex = create_rrset("example.org", RRClass.ANY,
  763. RRType.NS, 0)
  764. self.rrset_update_del_ns_apex = rrset_update_del_ns_apex
  765. rrset_update_del_rrset_part = create_rrset("www.example.org",
  766. RRClass.NONE, RRType.A,
  767. 0,
  768. [ b'\xc0\x00\x02\x02',
  769. b'\xc0\x00\x02\x03' ])
  770. self.rrset_update_del_rrset_part = rrset_update_del_rrset_part
  771. rrset_update_del_rrset_ns = create_rrset("example.org", RRClass.NONE,
  772. RRType.NS, 0,
  773. [ b'\x03ns1\x07example\x03org\x00',
  774. b'\x03ns2\x07example\x03org\x00',
  775. b'\x03ns3\x07example\x03org\x00' ])
  776. self.rrset_update_del_rrset_ns = rrset_update_del_rrset_ns
  777. rrset_update_del_rrset_mx = create_rrset("example.org", RRClass.NONE,
  778. RRType.MX, 0,
  779. [ b'\x00\x0a\x04mail\x07example\x03org\x00' ])
  780. self.rrset_update_del_rrset_mx = rrset_update_del_rrset_mx
  781. def test_acl_before_prereq(self):
  782. name_in_use_no = create_rrset("foo.example.org", RRClass.ANY,
  783. RRType.ANY, 0)
  784. # Test a prerequisite that would fail
  785. self.check_full_handle_result(Rcode.NXDOMAIN, [], [ name_in_use_no ])
  786. # Change ACL so that it would be denied
  787. self._acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
  788. REQUEST_LOADER.load([{"action": "REJECT"}])}
  789. # The prerequisite should now not be reached; it should fail on the
  790. # ACL
  791. self.check_full_handle_result(Rcode.REFUSED, [], [ name_in_use_no ])
  792. def test_prescan(self):
  793. '''Test whether the prescan succeeds on data that is ok, and whether
  794. if notices the SOA if present'''
  795. # prepare a set of correct update statements
  796. self.__initialize_update_rrsets()
  797. self.check_prescan_result(Rcode.NOERROR, [ self.rrset_update_a ])
  798. # check if soa is noticed
  799. self.check_prescan_result(Rcode.NOERROR, [ self.rrset_update_soa ],
  800. self.rrset_update_soa)
  801. # Other types of succesful prechecks
  802. self.check_prescan_result(Rcode.NOERROR, [ self.rrset_update_soa2 ],
  803. self.rrset_update_soa2)
  804. self.check_prescan_result(Rcode.NOERROR,
  805. [ self.rrset_update_del_name ])
  806. self.check_prescan_result(Rcode.NOERROR,
  807. [ self.rrset_update_del_name_apex ])
  808. self.check_prescan_result(Rcode.NOERROR,
  809. [ self.rrset_update_del_rrset ])
  810. self.check_prescan_result(Rcode.NOERROR,
  811. [ self.rrset_update_del_mx_apex ])
  812. self.check_prescan_result(Rcode.NOERROR,
  813. [ self.rrset_update_del_rrset_part ])
  814. # and check a few permutations of the above
  815. # all of them (with one of the soas)
  816. self.check_prescan_result(Rcode.NOERROR,
  817. [
  818. self.rrset_update_a,
  819. self.rrset_update_soa,
  820. self.rrset_update_del_name,
  821. self.rrset_update_del_name_apex,
  822. self.rrset_update_del_rrset,
  823. self.rrset_update_del_mx_apex,
  824. self.rrset_update_del_rrset_part
  825. ],
  826. self.rrset_update_soa)
  827. # Two soas. Should we reject or simply use the last?
  828. # (RFC is not really explicit on this, but between the lines I read
  829. # use the last)
  830. self.check_prescan_result(Rcode.NOERROR,
  831. [ self.rrset_update_soa,
  832. self.rrset_update_soa2 ],
  833. self.rrset_update_soa2)
  834. self.check_prescan_result(Rcode.NOERROR,
  835. [ self.rrset_update_soa2,
  836. self.rrset_update_soa ],
  837. self.rrset_update_soa)
  838. self.check_prescan_result(Rcode.NOERROR,
  839. [
  840. self.rrset_update_del_mx_apex,
  841. self.rrset_update_del_name,
  842. self.rrset_update_del_name_apex,
  843. self.rrset_update_del_rrset_part,
  844. self.rrset_update_a,
  845. self.rrset_update_del_rrset,
  846. self.rrset_update_soa
  847. ],
  848. self.rrset_update_soa)
  849. def test_prescan_failures(self):
  850. '''Test whether prescan fails on bad data'''
  851. # out of zone data
  852. rrset = create_rrset("different.zone", RRClass.ANY, RRType.TXT, 0)
  853. self.check_prescan_result(Rcode.NOTZONE, [ rrset ])
  854. # forbidden type, zone class
  855. rrset = create_rrset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.ANY, 0,
  856. [ b'\x00' ])
  857. self.check_prescan_result(Rcode.FORMERR, [ rrset ])
  858. # non-zero TTL, class ANY
  859. rrset = create_rrset(TEST_ZONE_NAME, RRClass.ANY, RRType.TXT, 1)
  860. self.check_prescan_result(Rcode.FORMERR, [ rrset ])
  861. # non-zero Rdata, class ANY
  862. rrset = create_rrset(TEST_ZONE_NAME, RRClass.ANY, RRType.TXT, 0,
  863. [ "foo" ])
  864. self.check_prescan_result(Rcode.FORMERR, [ rrset ])
  865. # forbidden type, class ANY
  866. rrset = create_rrset(TEST_ZONE_NAME, RRClass.ANY, RRType.AXFR, 0,
  867. [ b'\x00' ])
  868. self.check_prescan_result(Rcode.FORMERR, [ rrset ])
  869. # non-zero TTL, class NONE
  870. rrset = create_rrset(TEST_ZONE_NAME, RRClass.NONE, RRType.TXT, 1)
  871. self.check_prescan_result(Rcode.FORMERR, [ rrset ])
  872. # forbidden type, class NONE
  873. rrset = create_rrset(TEST_ZONE_NAME, RRClass.NONE, RRType.AXFR, 0,
  874. [ b'\x00' ])
  875. self.check_prescan_result(Rcode.FORMERR, [ rrset ])
  876. def __check_inzone_data(self, expected_result, name, rrtype,
  877. expected_rrset = None):
  878. '''Does a find on TEST_ZONE for the given rrset's name and type,
  879. then checks if the result matches the expected result.
  880. If so, and if expected_rrset is given, they are compared as
  881. well.'''
  882. _, finder = self._datasrc_client.find_zone(TEST_ZONE_NAME)
  883. result, found_rrset, _ = finder.find(name, rrtype,
  884. finder.NO_WILDCARD |
  885. finder.FIND_GLUE_OK)
  886. self.assertEqual(expected_result, result)
  887. # Sigh. Need rrsets.compare() again.
  888. # To be sure, compare name, class, type, and ttl
  889. if expected_rrset is not None:
  890. self.assertEqual(expected_rrset.get_name(), found_rrset.get_name())
  891. self.assertEqual(expected_rrset.get_class(), found_rrset.get_class())
  892. self.assertEqual(expected_rrset.get_type(), found_rrset.get_type())
  893. self.assertEqual(expected_rrset.get_ttl().to_text(),
  894. found_rrset.get_ttl().to_text())
  895. expected_rdata =\
  896. [ rdata.to_text() for rdata in expected_rrset.get_rdata() ]
  897. found_rdata =\
  898. [ rdata.to_text() for rdata in found_rrset.get_rdata() ]
  899. expected_rdata.sort()
  900. found_rdata.sort()
  901. self.assertEqual(expected_rdata, found_rdata)
  902. def test_update_add_delete_rrset(self):
  903. '''
  904. Tests a sequence of related add and delete updates. Some other
  905. cases are tested by later tests.
  906. '''
  907. self.__initialize_update_rrsets()
  908. # initially, the www should only contain one rr
  909. # (set to self.orig_a_rrset)
  910. # during this test, we will extend it at some point
  911. extended_a_rrset = create_rrset("www.example.org", TEST_RRCLASS,
  912. RRType.A, 3600,
  913. [ "192.0.2.1",
  914. "192.0.2.2",
  915. "192.0.2.3" ])
  916. # Sanity check, make sure original data is really there before updates
  917. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  918. isc.dns.Name("www.example.org"),
  919. RRType.A,
  920. self.orig_a_rrset)
  921. # Add two rrs
  922. self.check_full_handle_result(Rcode.NOERROR, [ self.rrset_update_a ])
  923. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  924. isc.dns.Name("www.example.org"),
  925. RRType.A,
  926. extended_a_rrset)
  927. # Adding the same RRsets should not make a difference.
  928. self.check_full_handle_result(Rcode.NOERROR, [ self.rrset_update_a ])
  929. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  930. isc.dns.Name("www.example.org"),
  931. RRType.A,
  932. extended_a_rrset)
  933. # Now delete those two, and we should end up with the original RRset
  934. self.check_full_handle_result(Rcode.NOERROR,
  935. [ self.rrset_update_del_rrset_part ])
  936. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  937. isc.dns.Name("www.example.org"),
  938. RRType.A,
  939. self.orig_a_rrset)
  940. # 'Deleting' them again should make no difference
  941. self.check_full_handle_result(Rcode.NOERROR,
  942. [ self.rrset_update_del_rrset_part ])
  943. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  944. isc.dns.Name("www.example.org"),
  945. RRType.A,
  946. self.orig_a_rrset)
  947. # But deleting the entire rrset, independent of its contents, should
  948. # work
  949. self.check_full_handle_result(Rcode.NOERROR,
  950. [ self.rrset_update_del_rrset ])
  951. self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
  952. isc.dns.Name("www.example.org"),
  953. RRType.A)
  954. # Check that if we update the SOA, it is updated to our value
  955. self.check_full_handle_result(Rcode.NOERROR,
  956. [ self.rrset_update_soa2 ])
  957. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  958. isc.dns.Name("example.org"),
  959. RRType.SOA,
  960. self.rrset_update_soa2)
  961. def test_glue_deletions(self):
  962. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  963. isc.dns.Name("sub.example.org."),
  964. RRType.NS)
  965. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  966. isc.dns.Name("ns.sub.example.org."),
  967. RRType.A)
  968. # See that we can delete glue
  969. rrset_delete_glue = create_rrset("ns.sub.example.org.",
  970. RRClass.ANY,
  971. RRType.A,
  972. 0)
  973. self.check_full_handle_result(Rcode.NOERROR,
  974. [ rrset_delete_glue ])
  975. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  976. isc.dns.Name("sub.example.org."),
  977. RRType.NS)
  978. self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
  979. isc.dns.Name("ns.sub.example.org."),
  980. RRType.A)
  981. # Check that we don't accidentally delete a delegation if we
  982. # try to delete non-existent glue
  983. rrset_delete_nonexistent_glue = create_rrset("foo.sub.example.org.",
  984. RRClass.ANY,
  985. RRType.A,
  986. 0)
  987. self.check_full_handle_result(Rcode.NOERROR,
  988. [ rrset_delete_nonexistent_glue ])
  989. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  990. isc.dns.Name("sub.example.org."),
  991. RRType.NS)
  992. def test_update_add_new_data(self):
  993. '''
  994. This tests adds data where none is present
  995. '''
  996. # Add data at a completely new name
  997. self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
  998. isc.dns.Name("new.example.org"),
  999. RRType.A)
  1000. rrset = create_rrset("new.example.org", TEST_RRCLASS, RRType.A,
  1001. 3600, [ "192.0.2.1", "192.0.2.2" ])
  1002. self.check_full_handle_result(Rcode.NOERROR, [ rrset ])
  1003. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1004. isc.dns.Name("new.example.org"),
  1005. RRType.A,
  1006. rrset)
  1007. # Also try a name where data is present, but none of this
  1008. # specific type
  1009. self.__check_inzone_data(isc.datasrc.ZoneFinder.NXRRSET,
  1010. isc.dns.Name("new.example.org"),
  1011. RRType.TXT)
  1012. rrset = create_rrset("new.example.org", TEST_RRCLASS, RRType.TXT,
  1013. 3600, [ "foo" ])
  1014. self.check_full_handle_result(Rcode.NOERROR, [ rrset ])
  1015. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1016. isc.dns.Name("new.example.org"),
  1017. RRType.TXT,
  1018. rrset)
  1019. def test_update_add_new_data_interspersed(self):
  1020. '''
  1021. This tests adds data where none is present, similar to
  1022. test_update_add_new_data, but this time the second RRset
  1023. is put into the record between the two RRs of the first
  1024. RRset.
  1025. '''
  1026. # Add data at a completely new name
  1027. self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
  1028. isc.dns.Name("new_a.example.org"),
  1029. RRType.A)
  1030. self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
  1031. isc.dns.Name("new_txt.example.org"),
  1032. RRType.TXT)
  1033. rrset1 = create_rrset("new_a.example.org", TEST_RRCLASS, RRType.A,
  1034. 3600, [ "192.0.2.1" ])
  1035. rrset2 = create_rrset("new_txt.example.org", TEST_RRCLASS, RRType.TXT,
  1036. 3600, [ "foo" ])
  1037. rrset3 = create_rrset("new_a.example.org", TEST_RRCLASS, RRType.A,
  1038. 3600, [ "192.0.2.2" ])
  1039. self.check_full_handle_result(Rcode.NOERROR,
  1040. [ rrset1, rrset2, rrset3 ])
  1041. # The update should have merged rrset1 and rrset3
  1042. rrset_merged = create_rrset("new_a.example.org", TEST_RRCLASS,
  1043. RRType.A, 3600,
  1044. [ "192.0.2.1", "192.0.2.2" ])
  1045. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1046. isc.dns.Name("new_a.example.org"),
  1047. RRType.A,
  1048. rrset_merged)
  1049. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1050. isc.dns.Name("new_txt.example.org"),
  1051. RRType.TXT,
  1052. rrset2)
  1053. def test_update_delete_name(self):
  1054. '''
  1055. Tests whether deletion of every RR for a name works
  1056. '''
  1057. self.__initialize_update_rrsets()
  1058. # First check it is there
  1059. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1060. isc.dns.Name("www.example.org"),
  1061. RRType.A)
  1062. # Delete the entire name
  1063. self.check_full_handle_result(Rcode.NOERROR,
  1064. [ self.rrset_update_del_name ])
  1065. self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
  1066. isc.dns.Name("www.example.org"),
  1067. RRType.A)
  1068. # Should still be gone after pointless second delete
  1069. self.check_full_handle_result(Rcode.NOERROR,
  1070. [ self.rrset_update_del_name ])
  1071. self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
  1072. isc.dns.Name("www.example.org"),
  1073. RRType.A)
  1074. def test_update_apex_special_cases(self):
  1075. '''
  1076. Tests a few special cases when deleting data from the apex
  1077. '''
  1078. self.__initialize_update_rrsets()
  1079. # the original SOA
  1080. orig_soa_rrset = create_rrset("example.org", TEST_RRCLASS,
  1081. RRType.SOA, 3600,
  1082. [ "ns1.example.org. " +
  1083. "admin.example.org. " +
  1084. "1234 3600 1800 2419200 7200" ])
  1085. # At some point, the SOA SERIAL will be auto-incremented
  1086. incremented_soa_rrset_01 = create_rrset("example.org", TEST_RRCLASS,
  1087. RRType.SOA, 3600, ["ns1.example.org. " +
  1088. "admin.example.org. " +
  1089. "1235 3600 1800 2419200 7200" ])
  1090. incremented_soa_rrset_02 = create_rrset("example.org", TEST_RRCLASS,
  1091. RRType.SOA, 3600, ["ns1.example.org. " +
  1092. "admin.example.org. " +
  1093. "1236 3600 1800 2419200 7200" ])
  1094. # We will delete some of the NS records
  1095. orig_ns_rrset = create_rrset("example.org", TEST_RRCLASS,
  1096. RRType.NS, 3600,
  1097. [ "ns1.example.org.",
  1098. "ns2.example.org.",
  1099. "ns3.example.org." ])
  1100. # Sanity check, make sure original data is really there before updates
  1101. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1102. isc.dns.Name("example.org"),
  1103. RRType.NS,
  1104. orig_ns_rrset)
  1105. # We will delete the MX record later in this test, so let's make
  1106. # sure that it exists (we do not care about its value)
  1107. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1108. isc.dns.Name("example.org"),
  1109. RRType.MX)
  1110. # Check that we cannot delete the SOA record by direct deletion
  1111. # both by name+type and by full rrset
  1112. self.check_full_handle_result(Rcode.NOERROR,
  1113. [ self.rrset_update_del_soa_apex,
  1114. self.rrset_update_soa_del ])
  1115. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1116. isc.dns.Name("example.org"),
  1117. RRType.SOA,
  1118. incremented_soa_rrset_01)
  1119. # If we delete everything at the apex, the SOA and NS rrsets should be
  1120. # untouched (but serial will be incremented)
  1121. self.check_full_handle_result(Rcode.NOERROR,
  1122. [ self.rrset_update_del_name_apex ])
  1123. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1124. isc.dns.Name("example.org"),
  1125. RRType.SOA,
  1126. incremented_soa_rrset_02)
  1127. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1128. isc.dns.Name("example.org"),
  1129. RRType.NS,
  1130. orig_ns_rrset)
  1131. # but the MX should be gone
  1132. self.__check_inzone_data(isc.datasrc.ZoneFinder.NXRRSET,
  1133. isc.dns.Name("example.org"),
  1134. RRType.MX)
  1135. # Deleting the NS rrset by name and type only, it should also be left
  1136. # untouched
  1137. self.check_full_handle_result(Rcode.NOERROR,
  1138. [ self.rrset_update_del_ns_apex ])
  1139. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1140. isc.dns.Name("example.org"),
  1141. RRType.NS,
  1142. orig_ns_rrset)
  1143. def test_update_apex_special_case_ns_rrset(self):
  1144. # If we delete the NS at the apex specifically, it should still
  1145. # keep one record
  1146. self.__initialize_update_rrsets()
  1147. # When we are done, we should have a reduced NS rrset
  1148. short_ns_rrset = create_rrset("example.org", TEST_RRCLASS,
  1149. RRType.NS, 3600,
  1150. [ "ns3.example.org." ])
  1151. self.check_full_handle_result(Rcode.NOERROR,
  1152. [ self.rrset_update_del_rrset_ns ])
  1153. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1154. isc.dns.Name("example.org"),
  1155. RRType.NS,
  1156. short_ns_rrset)
  1157. def test_update_apex_special_case_ns_rrset2(self):
  1158. # If we add new NS records, then delete all existing ones, it
  1159. # should not keep any
  1160. self.__initialize_update_rrsets()
  1161. new_ns = create_rrset("example.org", TEST_RRCLASS, RRType.NS, 3600,
  1162. [ "newns1.example.org.", "newns2.example.org." ])
  1163. self.check_full_handle_result(Rcode.NOERROR,
  1164. [ new_ns,
  1165. self.rrset_update_del_rrset_ns ])
  1166. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1167. isc.dns.Name("example.org"),
  1168. RRType.NS,
  1169. new_ns)
  1170. def test_update_delete_normal_rrset_at_apex(self):
  1171. '''
  1172. Tests a number of 'normal rrset' deletes at the apex
  1173. '''
  1174. # MX should simply be deleted
  1175. self.__initialize_update_rrsets()
  1176. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1177. isc.dns.Name("example.org"),
  1178. RRType.MX)
  1179. self.check_full_handle_result(Rcode.NOERROR,
  1180. [ self.rrset_update_del_rrset_mx ])
  1181. self.__check_inzone_data(isc.datasrc.ZoneFinder.NXRRSET,
  1182. isc.dns.Name("example.org"),
  1183. RRType.MX)
  1184. def test_update_add_then_delete_rrset(self):
  1185. # If we add data, then delete the whole rrset, added data should
  1186. # be gone as well
  1187. self.__initialize_update_rrsets()
  1188. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1189. isc.dns.Name("www.example.org"),
  1190. RRType.A)
  1191. self.check_full_handle_result(Rcode.NOERROR,
  1192. [ self.rrset_update_a,
  1193. self.rrset_update_del_rrset ])
  1194. self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
  1195. isc.dns.Name("www.example.org"),
  1196. RRType.A)
  1197. def test_update_add_then_delete_name(self):
  1198. # If we add data, then delete the entire name, added data should
  1199. # be gone as well
  1200. self.__initialize_update_rrsets()
  1201. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1202. isc.dns.Name("www.example.org"),
  1203. RRType.A)
  1204. self.check_full_handle_result(Rcode.NOERROR,
  1205. [ self.rrset_update_a,
  1206. self.rrset_update_del_name ])
  1207. self.__check_inzone_data(isc.datasrc.ZoneFinder.NXDOMAIN,
  1208. isc.dns.Name("www.example.org"),
  1209. RRType.A)
  1210. def test_update_delete_then_add_rrset(self):
  1211. # If we delete an entire rrset, then add something there again,
  1212. # the addition should be done
  1213. self.__initialize_update_rrsets()
  1214. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1215. isc.dns.Name("www.example.org"),
  1216. RRType.A)
  1217. self.check_full_handle_result(Rcode.NOERROR,
  1218. [ self.rrset_update_del_rrset,
  1219. self.rrset_update_a ])
  1220. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1221. isc.dns.Name("www.example.org"),
  1222. RRType.A,
  1223. self.rrset_update_a)
  1224. def test_update_delete_then_add_rrset(self):
  1225. # If we delete an entire name, then add something there again,
  1226. # the addition should be done
  1227. self.__initialize_update_rrsets()
  1228. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1229. isc.dns.Name("www.example.org"),
  1230. RRType.A)
  1231. self.check_full_handle_result(Rcode.NOERROR,
  1232. [ self.rrset_update_del_name,
  1233. self.rrset_update_a ])
  1234. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1235. isc.dns.Name("www.example.org"),
  1236. RRType.A,
  1237. self.rrset_update_a)
  1238. def test_update_cname_special_cases(self):
  1239. self.__initialize_update_rrsets()
  1240. # Sanity check
  1241. orig_cname_rrset = create_rrset("cname.example.org", TEST_RRCLASS,
  1242. RRType.CNAME, 3600,
  1243. [ "www.example.org." ])
  1244. self.__check_inzone_data(isc.datasrc.ZoneFinder.CNAME,
  1245. isc.dns.Name("cname.example.org"),
  1246. RRType.A,
  1247. orig_cname_rrset)
  1248. # If we try to add data where a cname is preset
  1249. rrset = create_rrset("cname.example.org", TEST_RRCLASS, RRType.A,
  1250. 3600, [ "192.0.2.1" ])
  1251. self.check_full_handle_result(Rcode.NOERROR, [ rrset ])
  1252. self.__check_inzone_data(isc.datasrc.ZoneFinder.CNAME,
  1253. isc.dns.Name("cname.example.org"),
  1254. RRType.A,
  1255. orig_cname_rrset)
  1256. # But updating the cname itself should work
  1257. new_cname_rrset = create_rrset("cname.example.org", TEST_RRCLASS,
  1258. RRType.CNAME, 3600,
  1259. [ "mail.example.org." ])
  1260. self.check_full_handle_result(Rcode.NOERROR, [ new_cname_rrset ])
  1261. self.__check_inzone_data(isc.datasrc.ZoneFinder.CNAME,
  1262. isc.dns.Name("cname.example.org"),
  1263. RRType.A,
  1264. new_cname_rrset)
  1265. self.__initialize_update_rrsets()
  1266. # Likewise, adding a cname where other data is
  1267. # present should do nothing either
  1268. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1269. isc.dns.Name("www.example.org"),
  1270. RRType.A,
  1271. self.orig_a_rrset)
  1272. new_cname_rrset = create_rrset("www.example.org", TEST_RRCLASS,
  1273. RRType.CNAME, 3600,
  1274. [ "mail.example.org." ])
  1275. self.check_full_handle_result(Rcode.NOERROR, [ new_cname_rrset ])
  1276. self.__check_inzone_data(isc.datasrc.ZoneFinder.SUCCESS,
  1277. isc.dns.Name("www.example.org"),
  1278. RRType.A,
  1279. self.orig_a_rrset)
  1280. def test_update_bad_class(self):
  1281. rrset = create_rrset("example.org.", RRClass.CH, RRType.TXT, 0,
  1282. [ "foo" ])
  1283. self.check_full_handle_result(Rcode.FORMERR, [ rrset ])
  1284. def test_uncaught_exception(self):
  1285. def my_exc():
  1286. raise Exception("foo")
  1287. self._session._UpdateSession__update_soa = my_exc
  1288. self.assertEqual(Rcode.SERVFAIL.to_text(),
  1289. self._session._UpdateSession__do_update().to_text())
  1290. class SessionACLTest(SessionTestBase):
  1291. '''ACL related tests for update session.'''
  1292. def test_update_acl_check(self):
  1293. '''Test for various ACL checks.
  1294. Note that accepted cases are covered in the basic tests.
  1295. '''
  1296. # create a separate session, with default (empty) ACL map.
  1297. session = UpdateSession(self._update_msg,
  1298. TEST_CLIENT4, ZoneConfig(set(), TEST_RRCLASS,
  1299. self._datasrc_client))
  1300. # then the request should be rejected.
  1301. self.assertEqual((UPDATE_ERROR, None, None), session.handle())
  1302. # recreate the request message, and test with an ACL that would result
  1303. # in 'DROP'. get_message() should return None.
  1304. msg = create_update_msg()
  1305. acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
  1306. REQUEST_LOADER.load([{"action": "DROP", "from":
  1307. TEST_CLIENT4[0]}])}
  1308. session = UpdateSession(msg, TEST_CLIENT4,
  1309. ZoneConfig([], TEST_RRCLASS,
  1310. self._datasrc_client, acl_map))
  1311. self.assertEqual((UPDATE_DROP, None, None), session.handle())
  1312. self.assertEqual(None, session.get_message())
  1313. def test_update_tsigacl_check(self):
  1314. '''Test for various ACL checks using TSIG.'''
  1315. # This ACL will accept requests from TEST_CLIENT4 (any port) *and*
  1316. # has TSIG signed by TEST_ZONE_NAME; all others will be rejected.
  1317. acl_map = {(TEST_ZONE_NAME, TEST_RRCLASS):
  1318. REQUEST_LOADER.load([{"action": "ACCEPT",
  1319. "from": TEST_CLIENT4[0],
  1320. "key": TEST_ZONE_NAME.to_text()}])}
  1321. # If the message doesn't contain TSIG, it doesn't match the ACCEPT
  1322. # ACL entry, and the request should be rejected.
  1323. session = UpdateSession(self._update_msg,
  1324. TEST_CLIENT4, ZoneConfig(set(), TEST_RRCLASS,
  1325. self._datasrc_client,
  1326. acl_map))
  1327. self.assertEqual((UPDATE_ERROR, None, None), session.handle())
  1328. self.check_response(session.get_message(), Rcode.REFUSED)
  1329. # If the message contains TSIG, it should match the ACCEPT
  1330. # ACL entry, and the request should be granted.
  1331. session = UpdateSession(create_update_msg(tsig_key=TEST_TSIG_KEY),
  1332. TEST_CLIENT4, ZoneConfig(set(), TEST_RRCLASS,
  1333. self._datasrc_client,
  1334. acl_map))
  1335. self.assertEqual((UPDATE_SUCCESS, TEST_ZONE_NAME, TEST_RRCLASS),
  1336. session.handle())
  1337. if __name__ == "__main__":
  1338. isc.log.init("bind10")
  1339. isc.log.resetUnitTestRootLogger()
  1340. unittest.main()