xfrin_test.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591
  1. # Copyright (C) 2009 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. # $Id$
  16. import unittest
  17. import socket
  18. from xfrin import *
  19. #
  20. # Commonly used (mostly constant) test parameters
  21. #
  22. TEST_ZONE_NAME = "example.com"
  23. TEST_RRCLASS = RRClass.IN()
  24. TEST_DB_FILE = 'db_file'
  25. TEST_MASTER_IPV4_ADDRESS = '127.0.0.1'
  26. TEST_MASTER_IPV4_ADDRINFO = (socket.AF_INET, socket.SOCK_STREAM,
  27. socket.IPPROTO_TCP, '',
  28. (TEST_MASTER_IPV4_ADDRESS, 53))
  29. TEST_MASTER_IPV6_ADDRESS = '::1'
  30. TEST_MASTER_IPV6_ADDRINFO = (socket.AF_INET6, socket.SOCK_STREAM,
  31. socket.IPPROTO_TCP, '',
  32. (TEST_MASTER_IPV6_ADDRESS, 53))
  33. # XXX: This should be a non priviledge port that is unlikely to be used.
  34. # If some other process uses this port test will fail.
  35. TEST_MASTER_PORT = '53535'
  36. soa_rdata = Rdata(RRType.SOA(), TEST_RRCLASS,
  37. 'master.example.com. admin.example.com ' +
  38. '1234 3600 1800 2419200 7200')
  39. soa_rrset = RRset(Name(TEST_ZONE_NAME), TEST_RRCLASS, RRType.SOA(),
  40. RRTTL(3600))
  41. soa_rrset.add_rdata(soa_rdata)
  42. example_axfr_question = Question(Name(TEST_ZONE_NAME), TEST_RRCLASS,
  43. RRType.AXFR())
  44. example_soa_question = Question(Name(TEST_ZONE_NAME), TEST_RRCLASS,
  45. RRType.SOA())
  46. default_questions = [example_axfr_question]
  47. default_answers = [soa_rrset]
  48. class XfrinTestException(Exception):
  49. pass
  50. class MockXfrin(Xfrin):
  51. # This is a class attribute of a callable object that specifies a non
  52. # default behavior triggered in _cc_check_command(). Specific test methods
  53. # are expected to explicitly set this attribute before creating a
  54. # MockXfrin object (when it needs a non default behavior).
  55. # See the TestMain class.
  56. check_command_hook = None
  57. def _cc_setup(self):
  58. pass
  59. def _get_db_file(self):
  60. pass
  61. def _cc_check_command(self):
  62. self._shutdown_event.set()
  63. if MockXfrin.check_command_hook:
  64. MockXfrin.check_command_hook()
  65. class MockXfrinConnection(XfrinConnection):
  66. def __init__(self, sock_map, zone_name, rrclass, db_file, shutdown_event,
  67. master_addr):
  68. super().__init__(sock_map, zone_name, rrclass, db_file, shutdown_event,
  69. master_addr)
  70. self.query_data = b''
  71. self.reply_data = b''
  72. self.force_time_out = False
  73. self.force_close = False
  74. self.qlen = None
  75. self.qid = None
  76. self.response_generator = None
  77. def _asyncore_loop(self):
  78. if self.force_close:
  79. self.handle_close()
  80. elif not self.force_time_out:
  81. self.handle_read()
  82. def connect_to_master(self):
  83. return True
  84. def recv(self, size):
  85. data = self.reply_data[:size]
  86. self.reply_data = self.reply_data[size:]
  87. if len(data) < size:
  88. raise XfrinTestException('cannot get reply data')
  89. return data
  90. def send(self, data):
  91. if self.qlen != None and len(self.query_data) >= self.qlen:
  92. # This is a new query. reset the internal state.
  93. self.qlen = None
  94. self.qid = None
  95. self.query_data = b''
  96. self.query_data += data
  97. # when the outgoing data is sufficiently large to contain the length
  98. # and the QID fields (4 octets or more), extract these fields.
  99. # The length will be reset the internal query data to support multiple
  100. # queries in a single test.
  101. # The QID will be used to construct a matching response.
  102. if len(self.query_data) >= 4 and self.qid == None:
  103. self.qlen = socket.htons(struct.unpack('H',
  104. self.query_data[0:2])[0])
  105. self.qid = socket.htons(struct.unpack('H', self.query_data[2:4])[0])
  106. # if the response generator method is specified, invoke it now.
  107. if self.response_generator != None:
  108. self.response_generator()
  109. return len(data)
  110. def create_response_data(self, response = True, bad_qid = False,
  111. rcode = Rcode.NOERROR(),
  112. questions = default_questions,
  113. answers = default_answers):
  114. resp = Message(Message.RENDER)
  115. qid = self.qid
  116. if bad_qid:
  117. qid += 1
  118. resp.set_qid(qid)
  119. resp.set_opcode(Opcode.QUERY())
  120. resp.set_rcode(rcode)
  121. if response:
  122. resp.set_header_flag(MessageFlag.QR())
  123. [resp.add_question(q) for q in questions]
  124. [resp.add_rrset(Section.ANSWER(), a) for a in answers]
  125. renderer = MessageRenderer()
  126. resp.to_wire(renderer)
  127. reply_data = struct.pack('H', socket.htons(renderer.get_length()))
  128. reply_data += renderer.get_data()
  129. return reply_data
  130. class TestXfrinConnection(unittest.TestCase):
  131. def setUp(self):
  132. if os.path.exists(TEST_DB_FILE):
  133. os.remove(TEST_DB_FILE)
  134. self.sock_map = {}
  135. self.conn = MockXfrinConnection(self.sock_map, 'example.com.',
  136. TEST_RRCLASS, TEST_DB_FILE,
  137. threading.Event(),
  138. TEST_MASTER_IPV4_ADDRINFO)
  139. self.axfr_after_soa = False
  140. self.soa_response_params = {
  141. 'questions': [example_soa_question],
  142. 'bad_qid': False,
  143. 'response': True,
  144. 'rcode': Rcode.NOERROR(),
  145. 'axfr_after_soa': self._create_normal_response_data
  146. }
  147. def tearDown(self):
  148. self.conn.close()
  149. if os.path.exists(TEST_DB_FILE):
  150. os.remove(TEST_DB_FILE)
  151. def test_close(self):
  152. # we shouldn't be using the global asyncore map.
  153. self.assertEqual(len(asyncore.socket_map), 0)
  154. # there should be exactly one entry in our local map
  155. self.assertEqual(len(self.sock_map), 1)
  156. # once closing the dispatch the map should become empty
  157. self.conn.close()
  158. self.assertEqual(len(self.sock_map), 0)
  159. def test_init_ip6(self):
  160. # This test simply creates a new XfrinConnection object with an
  161. # IPv6 address, tries to bind it to an IPv6 wildcard address/port
  162. # to confirm an AF_INET6 socket has been created. A naive application
  163. # tends to assume it's IPv4 only and hardcode AF_INET. This test
  164. # uncovers such a bug.
  165. c = MockXfrinConnection({}, 'example.com.', TEST_RRCLASS, TEST_DB_FILE,
  166. threading.Event(),
  167. TEST_MASTER_IPV6_ADDRINFO)
  168. c.bind(('::', 0))
  169. c.close()
  170. def test_init_chclass(self):
  171. c = XfrinConnection({}, 'example.com.', RRClass.CH(), TEST_DB_FILE,
  172. threading.Event(), TEST_MASTER_IPV4_ADDRINFO)
  173. axfrmsg = c._create_query(RRType.AXFR())
  174. self.assertEqual(axfrmsg.get_question()[0].get_class(),
  175. RRClass.CH())
  176. c.close()
  177. def test_response_with_invalid_msg(self):
  178. self.conn.reply_data = b'aaaxxxx'
  179. self.assertRaises(XfrinTestException, self._handle_xfrin_response)
  180. def test_response_without_end_soa(self):
  181. self.conn._send_query(RRType.AXFR())
  182. self.conn.reply_data = self.conn.create_response_data()
  183. self.assertRaises(XfrinTestException, self._handle_xfrin_response)
  184. def test_response_bad_qid(self):
  185. self.conn._send_query(RRType.AXFR())
  186. self.conn.reply_data = self.conn.create_response_data(bad_qid = True)
  187. self.assertRaises(XfrinException, self._handle_xfrin_response)
  188. def test_response_non_response(self):
  189. self.conn._send_query(RRType.AXFR())
  190. self.conn.reply_data = self.conn.create_response_data(response = False)
  191. self.assertRaises(XfrinException, self._handle_xfrin_response)
  192. def test_response_error_code(self):
  193. self.conn._send_query(RRType.AXFR())
  194. self.conn.reply_data = self.conn.create_response_data(
  195. rcode=Rcode.SERVFAIL())
  196. self.assertRaises(XfrinException, self._handle_xfrin_response)
  197. def test_response_multi_question(self):
  198. self.conn._send_query(RRType.AXFR())
  199. self.conn.reply_data = self.conn.create_response_data(
  200. questions=[example_axfr_question, example_axfr_question])
  201. self.assertRaises(XfrinException, self._handle_xfrin_response)
  202. def test_response_empty_answer(self):
  203. self.conn._send_query(RRType.AXFR())
  204. self.conn.reply_data = self.conn.create_response_data(answers=[])
  205. # Should an empty answer trigger an exception? Even though it's very
  206. # unusual it's not necessarily invalid. Need to revisit.
  207. self.assertRaises(XfrinException, self._handle_xfrin_response)
  208. def test_response_non_response(self):
  209. self.conn._send_query(RRType.AXFR())
  210. self.conn.reply_data = self.conn.create_response_data(response = False)
  211. self.assertRaises(XfrinException, self._handle_xfrin_response)
  212. def test_soacheck(self):
  213. # we need to defer the creation until we know the QID, which is
  214. # determined in _check_soa_serial(), so we use response_generator.
  215. self.conn.response_generator = self._create_soa_response_data
  216. self.assertEqual(self.conn._check_soa_serial(), XFRIN_OK)
  217. def test_soacheck_with_bad_response(self):
  218. self.conn.response_generator = self._create_broken_response_data
  219. self.assertRaises(MessageTooShort, self.conn._check_soa_serial)
  220. def test_soacheck_badqid(self):
  221. self.soa_response_params['bad_qid'] = True
  222. self.conn.response_generator = self._create_soa_response_data
  223. self.assertRaises(XfrinException, self.conn._check_soa_serial)
  224. def test_soacheck_non_response(self):
  225. self.soa_response_params['response'] = False
  226. self.conn.response_generator = self._create_soa_response_data
  227. self.assertRaises(XfrinException, self.conn._check_soa_serial)
  228. def test_soacheck_error_code(self):
  229. self.soa_response_params['rcode'] = Rcode.SERVFAIL()
  230. self.conn.response_generator = self._create_soa_response_data
  231. self.assertRaises(XfrinException, self.conn._check_soa_serial)
  232. def test_response_shutdown(self):
  233. self.conn.response_generator = self._create_normal_response_data
  234. self.conn._shutdown_event.set()
  235. self.conn._send_query(RRType.AXFR())
  236. self.assertRaises(XfrinException, self._handle_xfrin_response)
  237. def test_response_timeout(self):
  238. self.conn.response_generator = self._create_normal_response_data
  239. self.conn.force_time_out = True
  240. self.assertRaises(XfrinException, self._handle_xfrin_response)
  241. def test_response_remote_close(self):
  242. self.conn.response_generator = self._create_normal_response_data
  243. self.conn.force_close = True
  244. self.assertRaises(XfrinException, self._handle_xfrin_response)
  245. def test_response_bad_message(self):
  246. self.conn.response_generator = self._create_broken_response_data
  247. self.conn._send_query(RRType.AXFR())
  248. self.assertRaises(Exception, self._handle_xfrin_response)
  249. def test_response(self):
  250. # normal case.
  251. self.conn.response_generator = self._create_normal_response_data
  252. self.conn._send_query(RRType.AXFR())
  253. # two SOAs, and only these have been transfered. the 2nd SOA is just
  254. # a marker, so only 1 RR has been provided in the iteration.
  255. self.assertEqual(self._handle_xfrin_response(), 1)
  256. def test_do_xfrin(self):
  257. self.conn.response_generator = self._create_normal_response_data
  258. self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
  259. def test_do_xfrin_empty_response(self):
  260. # skipping the creation of response data, so the transfer will fail.
  261. self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
  262. def test_do_xfrin_bad_response(self):
  263. self.conn.response_generator = self._create_broken_response_data
  264. self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
  265. def test_do_xfrin_dberror(self):
  266. # DB file is under a non existent directory, so its creation will fail,
  267. # which will make the transfer fail.
  268. self.conn._db_file = "not_existent/" + TEST_DB_FILE
  269. self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
  270. def test_do_soacheck_and_xfrin(self):
  271. self.conn.response_generator = self._create_soa_response_data
  272. self.assertEqual(self.conn.do_xfrin(True), XFRIN_OK)
  273. def test_do_soacheck_broken_response(self):
  274. self.conn.response_generator = self._create_broken_response_data
  275. # XXX: TODO: this test failed here, should xfr not raise an
  276. # exception but simply drop and return FAIL?
  277. #self.assertEqual(self.conn.do_xfrin(True), XFRIN_FAIL)
  278. self.assertRaises(MessageTooShort, self.conn.do_xfrin, True)
  279. def test_do_soacheck_badqid(self):
  280. # the QID mismatch would internally trigger a XfrinException exception,
  281. # and covers part of the code that other tests can't.
  282. self.soa_response_params['bad_qid'] = True
  283. self.conn.response_generator = self._create_soa_response_data
  284. self.assertEqual(self.conn.do_xfrin(True), XFRIN_FAIL)
  285. def _handle_xfrin_response(self):
  286. # This helper methods iterates over all RRs (excluding the ending SOA)
  287. # transferred, and simply returns the number of RRs. The return value
  288. # may be used an assertion value for test cases.
  289. rrs = 0
  290. for rr in self.conn._handle_xfrin_response():
  291. rrs += 1
  292. return rrs
  293. def _create_normal_response_data(self):
  294. # This helper method creates a simple sequence of DNS messages that
  295. # forms a valid XFR transaction. It consists of two messages, each
  296. # containing just a single SOA RR.
  297. self.conn.reply_data = self.conn.create_response_data()
  298. self.conn.reply_data += self.conn.create_response_data()
  299. def _create_soa_response_data(self):
  300. # This helper method creates a DNS message that is supposed to be
  301. # used a valid response to SOA queries prior to XFR.
  302. # If axfr_after_soa is True, it resets the response_generator so that
  303. # a valid XFR messages will follow.
  304. self.conn.reply_data = self.conn.create_response_data(
  305. bad_qid=self.soa_response_params['bad_qid'],
  306. response=self.soa_response_params['response'],
  307. rcode=self.soa_response_params['rcode'],
  308. questions=self.soa_response_params['questions'])
  309. if self.soa_response_params['axfr_after_soa'] != None:
  310. self.conn.response_generator = self.soa_response_params['axfr_after_soa']
  311. def _create_broken_response_data(self):
  312. # This helper method creates a bogus "DNS message" that only contains
  313. # 4 octets of data. The DNS message parser will raise an exception.
  314. bogus_data = b'xxxx'
  315. self.conn.reply_data = struct.pack('H', socket.htons(len(bogus_data)))
  316. self.conn.reply_data += bogus_data
  317. class TestXfrinRecorder(unittest.TestCase):
  318. def setUp(self):
  319. self.recorder = XfrinRecorder()
  320. def test_increment(self):
  321. self.assertEqual(self.recorder.count(), 0)
  322. self.recorder.increment(TEST_ZONE_NAME)
  323. self.assertEqual(self.recorder.count(), 1)
  324. # duplicate "increment" should probably be rejected. but it's not
  325. # checked at this moment
  326. self.recorder.increment(TEST_ZONE_NAME)
  327. self.assertEqual(self.recorder.count(), 2)
  328. def test_decrement(self):
  329. self.assertEqual(self.recorder.count(), 0)
  330. self.recorder.increment(TEST_ZONE_NAME)
  331. self.assertEqual(self.recorder.count(), 1)
  332. self.recorder.decrement(TEST_ZONE_NAME)
  333. self.assertEqual(self.recorder.count(), 0)
  334. def test_decrement_from_empty(self):
  335. self.assertEqual(self.recorder.count(), 0)
  336. self.recorder.decrement(TEST_ZONE_NAME)
  337. self.assertEqual(self.recorder.count(), 0)
  338. def test_inprogress(self):
  339. self.assertEqual(self.recorder.count(), 0)
  340. self.recorder.increment(TEST_ZONE_NAME)
  341. self.assertEqual(self.recorder.xfrin_in_progress(TEST_ZONE_NAME), True)
  342. self.recorder.decrement(TEST_ZONE_NAME)
  343. self.assertEqual(self.recorder.xfrin_in_progress(TEST_ZONE_NAME), False)
  344. class TestXfrin(unittest.TestCase):
  345. def setUp(self):
  346. self.xfr = MockXfrin()
  347. self.args = {}
  348. self.args['zone_name'] = TEST_ZONE_NAME
  349. self.args['port'] = TEST_MASTER_PORT
  350. self.args['master'] = TEST_MASTER_IPV4_ADDRESS
  351. self.args['db_file'] = TEST_DB_FILE
  352. def tearDown(self):
  353. self.xfr.shutdown()
  354. def _do_parse_zone_name_class(self):
  355. return self.xfr._parse_zone_name_and_class(self.args)
  356. def _do_parse_master_port(self):
  357. return self.xfr._parse_master_and_port(self.args)
  358. def test_parse_cmd_params(self):
  359. name, rrclass = self._do_parse_zone_name_class()
  360. master_addrinfo = self._do_parse_master_port()
  361. db_file = self.args.get('db_file')
  362. self.assertEqual(master_addrinfo[4][1], int(TEST_MASTER_PORT))
  363. self.assertEqual(name, TEST_ZONE_NAME)
  364. self.assertEqual(rrclass, TEST_RRCLASS)
  365. self.assertEqual(master_addrinfo[4][0], TEST_MASTER_IPV4_ADDRESS)
  366. self.assertEqual(db_file, TEST_DB_FILE)
  367. def test_parse_cmd_params_default_port(self):
  368. del self.args['port']
  369. master_addrinfo = self._do_parse_master_port()
  370. self.assertEqual(master_addrinfo[4][1], 53)
  371. def test_parse_cmd_params_ip6master(self):
  372. self.args['master'] = TEST_MASTER_IPV6_ADDRESS
  373. master_addrinfo = self._do_parse_master_port()
  374. self.assertEqual(master_addrinfo[4][0], TEST_MASTER_IPV6_ADDRESS)
  375. def test_parse_cmd_params_chclass(self):
  376. self.args['zone_class'] = 'CH'
  377. self.assertEqual(self._do_parse_zone_name_class()[1], RRClass.CH())
  378. def test_parse_cmd_params_bogusclass(self):
  379. self.args['zone_class'] = 'XXX'
  380. self.assertRaises(XfrinException, self._do_parse_zone_name_class)
  381. def test_parse_cmd_params_nozone(self):
  382. # zone name is mandatory.
  383. del self.args['zone_name']
  384. self.assertRaises(XfrinException, self._do_parse_zone_name_class)
  385. def test_parse_cmd_params_nomaster(self):
  386. # master address is mandatory.
  387. del self.args['master']
  388. master_addrinfo = self._do_parse_master_port()
  389. self.assertEqual(master_addrinfo[4][0], DEFAULT_MASTER)
  390. def test_parse_cmd_params_bad_ip4(self):
  391. self.args['master'] = '3.3.3.3.3'
  392. self.assertRaises(XfrinException, self._do_parse_master_port)
  393. def test_parse_cmd_params_bad_ip6(self):
  394. self.args['master'] = '1::1::1'
  395. self.assertRaises(XfrinException, self._do_parse_master_port)
  396. def test_parse_cmd_params_bad_port(self):
  397. self.args['port'] = '-1'
  398. self.assertRaises(XfrinException, self._do_parse_master_port)
  399. self.args['port'] = '65536'
  400. self.assertRaises(XfrinException, self._do_parse_master_port)
  401. self.args['port'] = 'http'
  402. self.assertRaises(XfrinException, self._do_parse_master_port)
  403. def test_command_handler_shutdown(self):
  404. self.assertEqual(self.xfr.command_handler("shutdown",
  405. None)['result'][0], 0)
  406. # shutdown command doesn't expect an argument, but accepts it if any.
  407. self.assertEqual(self.xfr.command_handler("shutdown",
  408. "unused")['result'][0], 0)
  409. def test_command_handler_retransfer(self):
  410. self.assertEqual(self.xfr.command_handler("retransfer",
  411. self.args)['result'][0], 0)
  412. def test_command_handler_retransfer_badcommand(self):
  413. self.args['master'] = 'invalid'
  414. self.assertEqual(self.xfr.command_handler("retransfer",
  415. self.args)['result'][0], 1)
  416. def test_command_handler_retransfer_quota(self):
  417. for i in range(self.xfr._max_transfers_in - 1):
  418. self.xfr.recorder.increment(str(i) + TEST_ZONE_NAME)
  419. # there can be one more outstanding transfer.
  420. self.assertEqual(self.xfr.command_handler("retransfer",
  421. self.args)['result'][0], 0)
  422. # make sure the # xfrs would excceed the quota
  423. self.xfr.recorder.increment(str(self.xfr._max_transfers_in) + TEST_ZONE_NAME)
  424. # this one should fail
  425. self.assertEqual(self.xfr.command_handler("retransfer",
  426. self.args)['result'][0], 1)
  427. def test_command_handler_retransfer_inprogress(self):
  428. self.xfr.recorder.increment(TEST_ZONE_NAME)
  429. self.assertEqual(self.xfr.command_handler("retransfer",
  430. self.args)['result'][0], 1)
  431. def test_command_handler_retransfer_nomodule(self):
  432. dns_module = sys.modules['pydnspp'] # this must exist
  433. del sys.modules['pydnspp']
  434. self.assertEqual(self.xfr.command_handler("retransfer",
  435. self.args)['result'][0], 1)
  436. # sys.modules is global, so we must recover it
  437. sys.modules['pydnspp'] = dns_module
  438. def test_command_handler_refresh(self):
  439. # at this level, refresh is no different than retransfer.
  440. # just confirm the successful case with a different family of address.
  441. self.args['master'] = TEST_MASTER_IPV6_ADDRESS
  442. self.assertEqual(self.xfr.command_handler("refresh",
  443. self.args)['result'][0], 0)
  444. def test_command_handler_notify(self):
  445. # at this level, refresh is no different than retransfer.
  446. self.args['master'] = TEST_MASTER_IPV6_ADDRESS
  447. # ...but right now we disable the feature due to security concerns.
  448. self.assertEqual(self.xfr.command_handler("notify",
  449. self.args)['result'][0], 0)
  450. def test_command_handler_unknown(self):
  451. self.assertEqual(self.xfr.command_handler("xxx", None)['result'][0], 1)
  452. def test_command_handler_transfers_in(self):
  453. self.assertEqual(self.xfr.config_handler({})['result'][0], 0)
  454. self.assertEqual(self.xfr.config_handler({'transfers_in': 3})['result'][0], 0)
  455. self.assertEqual(self.xfr._max_transfers_in, 3)
  456. def test_command_handler_masters(self):
  457. master_info = {'master_addr': '1.1.1.1', 'master_port':53}
  458. self.assertEqual(self.xfr.config_handler(master_info)['result'][0], 0)
  459. master_info = {'master_addr': '1111.1.1.1', 'master_port':53 }
  460. self.assertEqual(self.xfr.config_handler(master_info)['result'][0], 1)
  461. master_info = {'master_addr': '2.2.2.2', 'master_port':530000 }
  462. self.assertEqual(self.xfr.config_handler(master_info)['result'][0], 1)
  463. master_info = {'master_addr': '2.2.2.2', 'master_port':53 }
  464. self.xfr.config_handler(master_info)
  465. self.assertEqual(self.xfr._master_addr, '2.2.2.2')
  466. self.assertEqual(self.xfr._master_port, 53)
  467. def raise_interrupt():
  468. raise KeyboardInterrupt()
  469. def raise_ccerror():
  470. raise isc.cc.session.SessionError('test error')
  471. def raise_exception():
  472. raise Exception('test exception')
  473. class TestMain(unittest.TestCase):
  474. def setUp(self):
  475. MockXfrin.check_command_hook = None
  476. def tearDown(self):
  477. MockXfrin.check_command_hook = None
  478. def test_startup(self):
  479. main(MockXfrin, False)
  480. def test_startup_interrupt(self):
  481. MockXfrin.check_command_hook = raise_interrupt
  482. main(MockXfrin, False)
  483. def test_startup_ccerror(self):
  484. MockXfrin.check_command_hook = raise_ccerror
  485. main(MockXfrin, False)
  486. def test_startup_generalerror(self):
  487. MockXfrin.check_command_hook = raise_exception
  488. main(MockXfrin, False)
  489. if __name__== "__main__":
  490. try:
  491. unittest.main()
  492. except KeyboardInterrupt as e:
  493. print(e)