xfrin_test.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640
  1. # Copyright (C) 2009 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import unittest
  16. import socket
  17. from xfrin import *
  18. #
  19. # Commonly used (mostly constant) test parameters
  20. #
  21. TEST_ZONE_NAME = "example.com"
  22. TEST_RRCLASS = RRClass.IN()
  23. TEST_DB_FILE = 'db_file'
  24. TEST_MASTER_IPV4_ADDRESS = '127.0.0.1'
  25. TEST_MASTER_IPV4_ADDRINFO = (socket.AF_INET, socket.SOCK_STREAM,
  26. socket.IPPROTO_TCP, '',
  27. (TEST_MASTER_IPV4_ADDRESS, 53))
  28. TEST_MASTER_IPV6_ADDRESS = '::1'
  29. TEST_MASTER_IPV6_ADDRINFO = (socket.AF_INET6, socket.SOCK_STREAM,
  30. socket.IPPROTO_TCP, '',
  31. (TEST_MASTER_IPV6_ADDRESS, 53))
  32. # XXX: This should be a non priviledge port that is unlikely to be used.
  33. # If some other process uses this port test will fail.
  34. TEST_MASTER_PORT = '53535'
  35. TSIG_KEY = TSIGKey("example.com:SFuWd/q99SzF8Yzd1QbB9g==")
  36. soa_rdata = Rdata(RRType.SOA(), TEST_RRCLASS,
  37. 'master.example.com. admin.example.com ' +
  38. '1234 3600 1800 2419200 7200')
  39. soa_rrset = RRset(Name(TEST_ZONE_NAME), TEST_RRCLASS, RRType.SOA(),
  40. RRTTL(3600))
  41. soa_rrset.add_rdata(soa_rdata)
  42. example_axfr_question = Question(Name(TEST_ZONE_NAME), TEST_RRCLASS,
  43. RRType.AXFR())
  44. example_soa_question = Question(Name(TEST_ZONE_NAME), TEST_RRCLASS,
  45. RRType.SOA())
  46. default_questions = [example_axfr_question]
  47. default_answers = [soa_rrset]
  48. class XfrinTestException(Exception):
  49. pass
  50. def strip_mutable_tsig_data(data):
  51. # Unfortunately we cannot easily compare TSIG RR because we can't tweak
  52. # current time. As a work around this helper function strips off the time
  53. # dependent part of TSIG RDATA, i.e., the MAC (assuming HMAC-MD5) and
  54. # Time Signed.
  55. return data[0:-32] + data[-26:-22] + data[-6:]
  56. class MockXfrin(Xfrin):
  57. # This is a class attribute of a callable object that specifies a non
  58. # default behavior triggered in _cc_check_command(). Specific test methods
  59. # are expected to explicitly set this attribute before creating a
  60. # MockXfrin object (when it needs a non default behavior).
  61. # See the TestMain class.
  62. check_command_hook = None
  63. def _cc_setup(self):
  64. self._tsig_key_str = None
  65. pass
  66. def _get_db_file(self):
  67. pass
  68. def _cc_check_command(self):
  69. self._shutdown_event.set()
  70. if MockXfrin.check_command_hook:
  71. MockXfrin.check_command_hook()
  72. class MockXfrinConnection(XfrinConnection):
  73. def __init__(self, sock_map, zone_name, rrclass, db_file, shutdown_event,
  74. master_addr):
  75. super().__init__(sock_map, zone_name, rrclass, db_file, shutdown_event,
  76. master_addr)
  77. self.query_data = b''
  78. self.reply_data = b''
  79. self.force_time_out = False
  80. self.force_close = False
  81. self.qlen = None
  82. self.qid = None
  83. self.response_generator = None
  84. def _asyncore_loop(self):
  85. if self.force_close:
  86. self.handle_close()
  87. elif not self.force_time_out:
  88. self.handle_read()
  89. def connect_to_master(self):
  90. return True
  91. def recv(self, size):
  92. data = self.reply_data[:size]
  93. self.reply_data = self.reply_data[size:]
  94. if len(data) < size:
  95. raise XfrinTestException('cannot get reply data')
  96. return data
  97. def send(self, data):
  98. if self.qlen != None and len(self.query_data) >= self.qlen:
  99. # This is a new query. reset the internal state.
  100. self.qlen = None
  101. self.qid = None
  102. self.query_data = b''
  103. self.query_data += data
  104. # when the outgoing data is sufficiently large to contain the length
  105. # and the QID fields (4 octets or more), extract these fields.
  106. # The length will be reset the internal query data to support multiple
  107. # queries in a single test.
  108. # The QID will be used to construct a matching response.
  109. if len(self.query_data) >= 4 and self.qid == None:
  110. self.qlen = socket.htons(struct.unpack('H',
  111. self.query_data[0:2])[0])
  112. self.qid = socket.htons(struct.unpack('H', self.query_data[2:4])[0])
  113. # if the response generator method is specified, invoke it now.
  114. if self.response_generator != None:
  115. self.response_generator()
  116. return len(data)
  117. def create_response_data(self, response = True, bad_qid = False,
  118. rcode = Rcode.NOERROR(),
  119. questions = default_questions,
  120. answers = default_answers):
  121. resp = Message(Message.RENDER)
  122. qid = self.qid
  123. if bad_qid:
  124. qid += 1
  125. resp.set_qid(qid)
  126. resp.set_opcode(Opcode.QUERY())
  127. resp.set_rcode(rcode)
  128. if response:
  129. resp.set_header_flag(Message.HEADERFLAG_QR)
  130. [resp.add_question(q) for q in questions]
  131. [resp.add_rrset(Message.SECTION_ANSWER, a) for a in answers]
  132. renderer = MessageRenderer()
  133. resp.to_wire(renderer)
  134. reply_data = struct.pack('H', socket.htons(renderer.get_length()))
  135. reply_data += renderer.get_data()
  136. return reply_data
  137. class TestXfrinConnection(unittest.TestCase):
  138. def setUp(self):
  139. if os.path.exists(TEST_DB_FILE):
  140. os.remove(TEST_DB_FILE)
  141. self.sock_map = {}
  142. self.conn = MockXfrinConnection(self.sock_map, 'example.com.',
  143. TEST_RRCLASS, TEST_DB_FILE,
  144. threading.Event(),
  145. TEST_MASTER_IPV4_ADDRINFO)
  146. self.axfr_after_soa = False
  147. self.soa_response_params = {
  148. 'questions': [example_soa_question],
  149. 'bad_qid': False,
  150. 'response': True,
  151. 'rcode': Rcode.NOERROR(),
  152. 'axfr_after_soa': self._create_normal_response_data
  153. }
  154. def tearDown(self):
  155. self.conn.close()
  156. if os.path.exists(TEST_DB_FILE):
  157. os.remove(TEST_DB_FILE)
  158. def test_close(self):
  159. # we shouldn't be using the global asyncore map.
  160. self.assertEqual(len(asyncore.socket_map), 0)
  161. # there should be exactly one entry in our local map
  162. self.assertEqual(len(self.sock_map), 1)
  163. # once closing the dispatch the map should become empty
  164. self.conn.close()
  165. self.assertEqual(len(self.sock_map), 0)
  166. def test_init_ip6(self):
  167. # This test simply creates a new XfrinConnection object with an
  168. # IPv6 address, tries to bind it to an IPv6 wildcard address/port
  169. # to confirm an AF_INET6 socket has been created. A naive application
  170. # tends to assume it's IPv4 only and hardcode AF_INET. This test
  171. # uncovers such a bug.
  172. c = MockXfrinConnection({}, 'example.com.', TEST_RRCLASS, TEST_DB_FILE,
  173. threading.Event(),
  174. TEST_MASTER_IPV6_ADDRINFO)
  175. c.bind(('::', 0))
  176. c.close()
  177. def test_init_chclass(self):
  178. c = XfrinConnection({}, 'example.com.', RRClass.CH(), TEST_DB_FILE,
  179. threading.Event(), TEST_MASTER_IPV4_ADDRINFO)
  180. axfrmsg = c._create_query(RRType.AXFR())
  181. self.assertEqual(axfrmsg.get_question()[0].get_class(),
  182. RRClass.CH())
  183. c.close()
  184. def test_send_query(self):
  185. def create_msg(query_type):
  186. msg = Message(Message.RENDER)
  187. query_id = 0x1035
  188. msg.set_qid(query_id)
  189. msg.set_opcode(Opcode.QUERY())
  190. msg.set_rcode(Rcode.NOERROR())
  191. query_question = Question(Name("example.com."), RRClass.IN(), query_type)
  192. msg.add_question(query_question)
  193. return msg
  194. self.conn._create_query = create_msg
  195. # soa request
  196. self.conn._send_query(RRType.SOA())
  197. self.assertEqual(self.conn.query_data, b'\x00\x1d\x105\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\x03com\x00\x00\x06\x00\x01')
  198. # axfr request
  199. self.conn._send_query(RRType.AXFR())
  200. self.assertEqual(self.conn.query_data, b'\x00\x1d\x105\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\x03com\x00\x00\xfc\x00\x01')
  201. # soa request with tsig
  202. self.conn._tsig_ctx = TSIGContext(TSIG_KEY)
  203. self.conn._send_query(RRType.SOA())
  204. tsig_soa_data = strip_mutable_tsig_data(self.conn.query_data)
  205. self.assertEqual(tsig_soa_data, b'\x00n\x105\x00\x00\x00\x01\x00\x00\x00\x00\x00\x01\x07example\x03com\x00\x00\x06\x00\x01\x07example\x03com\x00\x00\xfa\x00\xff\x00\x00\x00\x00\x00:\x08hmac-md5\x07sig-alg\x03reg\x03int\x00\x01,\x00\x10\x105\x00\x00\x00\x00')
  206. # axfr request with tsig
  207. self.conn._send_query(RRType.AXFR())
  208. tsig_axfr_data = strip_mutable_tsig_data(self.conn.query_data)
  209. self.assertEqual(tsig_axfr_data, b'\x00n\x105\x00\x00\x00\x01\x00\x00\x00\x00\x00\x01\x07example\x03com\x00\x00\xfc\x00\x01\x07example\x03com\x00\x00\xfa\x00\xff\x00\x00\x00\x00\x00:\x08hmac-md5\x07sig-alg\x03reg\x03int\x00\x01,\x00\x10\x105\x00\x00\x00\x00')
  210. def test_response_with_invalid_msg(self):
  211. self.conn.reply_data = b'aaaxxxx'
  212. self.assertRaises(XfrinTestException, self._handle_xfrin_response)
  213. def test_response_with_tsig(self):
  214. self.conn._tsig_ctx = TSIGContext(TSIG_KEY)
  215. # server tsig check fail, return with RCODE 9 (NOTAUTH)
  216. self.conn._send_query(RRType.SOA())
  217. self.conn.reply_data = self.conn.create_response_data(rcode=Rcode.NOTAUTH())
  218. self.assertRaises(XfrinException, self._handle_xfrin_response)
  219. def test_response_without_end_soa(self):
  220. self.conn._send_query(RRType.AXFR())
  221. self.conn.reply_data = self.conn.create_response_data()
  222. self.assertRaises(XfrinTestException, self._handle_xfrin_response)
  223. def test_response_bad_qid(self):
  224. self.conn._send_query(RRType.AXFR())
  225. self.conn.reply_data = self.conn.create_response_data(bad_qid = True)
  226. self.assertRaises(XfrinException, self._handle_xfrin_response)
  227. def test_response_non_response(self):
  228. self.conn._send_query(RRType.AXFR())
  229. self.conn.reply_data = self.conn.create_response_data(response = False)
  230. self.assertRaises(XfrinException, self._handle_xfrin_response)
  231. def test_response_error_code(self):
  232. self.conn._send_query(RRType.AXFR())
  233. self.conn.reply_data = self.conn.create_response_data(
  234. rcode=Rcode.SERVFAIL())
  235. self.assertRaises(XfrinException, self._handle_xfrin_response)
  236. def test_response_multi_question(self):
  237. self.conn._send_query(RRType.AXFR())
  238. self.conn.reply_data = self.conn.create_response_data(
  239. questions=[example_axfr_question, example_axfr_question])
  240. self.assertRaises(XfrinException, self._handle_xfrin_response)
  241. def test_response_empty_answer(self):
  242. self.conn._send_query(RRType.AXFR())
  243. self.conn.reply_data = self.conn.create_response_data(answers=[])
  244. # Should an empty answer trigger an exception? Even though it's very
  245. # unusual it's not necessarily invalid. Need to revisit.
  246. self.assertRaises(XfrinException, self._handle_xfrin_response)
  247. def test_response_non_response(self):
  248. self.conn._send_query(RRType.AXFR())
  249. self.conn.reply_data = self.conn.create_response_data(response = False)
  250. self.assertRaises(XfrinException, self._handle_xfrin_response)
  251. def test_soacheck(self):
  252. # we need to defer the creation until we know the QID, which is
  253. # determined in _check_soa_serial(), so we use response_generator.
  254. self.conn.response_generator = self._create_soa_response_data
  255. self.assertEqual(self.conn._check_soa_serial(), XFRIN_OK)
  256. def test_soacheck_with_bad_response(self):
  257. self.conn.response_generator = self._create_broken_response_data
  258. self.assertRaises(MessageTooShort, self.conn._check_soa_serial)
  259. def test_soacheck_badqid(self):
  260. self.soa_response_params['bad_qid'] = True
  261. self.conn.response_generator = self._create_soa_response_data
  262. self.assertRaises(XfrinException, self.conn._check_soa_serial)
  263. def test_soacheck_non_response(self):
  264. self.soa_response_params['response'] = False
  265. self.conn.response_generator = self._create_soa_response_data
  266. self.assertRaises(XfrinException, self.conn._check_soa_serial)
  267. def test_soacheck_error_code(self):
  268. self.soa_response_params['rcode'] = Rcode.SERVFAIL()
  269. self.conn.response_generator = self._create_soa_response_data
  270. self.assertRaises(XfrinException, self.conn._check_soa_serial)
  271. def test_response_shutdown(self):
  272. self.conn.response_generator = self._create_normal_response_data
  273. self.conn._shutdown_event.set()
  274. self.conn._send_query(RRType.AXFR())
  275. self.assertRaises(XfrinException, self._handle_xfrin_response)
  276. def test_response_timeout(self):
  277. self.conn.response_generator = self._create_normal_response_data
  278. self.conn.force_time_out = True
  279. self.assertRaises(XfrinException, self._handle_xfrin_response)
  280. def test_response_remote_close(self):
  281. self.conn.response_generator = self._create_normal_response_data
  282. self.conn.force_close = True
  283. self.assertRaises(XfrinException, self._handle_xfrin_response)
  284. def test_response_bad_message(self):
  285. self.conn.response_generator = self._create_broken_response_data
  286. self.conn._send_query(RRType.AXFR())
  287. self.assertRaises(Exception, self._handle_xfrin_response)
  288. def test_response(self):
  289. # normal case.
  290. self.conn.response_generator = self._create_normal_response_data
  291. self.conn._send_query(RRType.AXFR())
  292. # two SOAs, and only these have been transfered. the 2nd SOA is just
  293. # a marker, so only 1 RR has been provided in the iteration.
  294. self.assertEqual(self._handle_xfrin_response(), 1)
  295. def test_do_xfrin(self):
  296. self.conn.response_generator = self._create_normal_response_data
  297. self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
  298. def test_do_xfrin_empty_response(self):
  299. # skipping the creation of response data, so the transfer will fail.
  300. self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
  301. def test_do_xfrin_bad_response(self):
  302. self.conn.response_generator = self._create_broken_response_data
  303. self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
  304. def test_do_xfrin_dberror(self):
  305. # DB file is under a non existent directory, so its creation will fail,
  306. # which will make the transfer fail.
  307. self.conn._db_file = "not_existent/" + TEST_DB_FILE
  308. self.assertEqual(self.conn.do_xfrin(False), XFRIN_FAIL)
  309. def test_do_soacheck_and_xfrin(self):
  310. self.conn.response_generator = self._create_soa_response_data
  311. self.assertEqual(self.conn.do_xfrin(True), XFRIN_OK)
  312. def test_do_soacheck_broken_response(self):
  313. self.conn.response_generator = self._create_broken_response_data
  314. # XXX: TODO: this test failed here, should xfr not raise an
  315. # exception but simply drop and return FAIL?
  316. #self.assertEqual(self.conn.do_xfrin(True), XFRIN_FAIL)
  317. self.assertRaises(MessageTooShort, self.conn.do_xfrin, True)
  318. def test_do_soacheck_badqid(self):
  319. # the QID mismatch would internally trigger a XfrinException exception,
  320. # and covers part of the code that other tests can't.
  321. self.soa_response_params['bad_qid'] = True
  322. self.conn.response_generator = self._create_soa_response_data
  323. self.assertEqual(self.conn.do_xfrin(True), XFRIN_FAIL)
  324. def _handle_xfrin_response(self):
  325. # This helper methods iterates over all RRs (excluding the ending SOA)
  326. # transferred, and simply returns the number of RRs. The return value
  327. # may be used an assertion value for test cases.
  328. rrs = 0
  329. for rr in self.conn._handle_xfrin_response():
  330. rrs += 1
  331. return rrs
  332. def _create_normal_response_data(self):
  333. # This helper method creates a simple sequence of DNS messages that
  334. # forms a valid XFR transaction. It consists of two messages, each
  335. # containing just a single SOA RR.
  336. self.conn.reply_data = self.conn.create_response_data()
  337. self.conn.reply_data += self.conn.create_response_data()
  338. def _create_soa_response_data(self):
  339. # This helper method creates a DNS message that is supposed to be
  340. # used a valid response to SOA queries prior to XFR.
  341. # If axfr_after_soa is True, it resets the response_generator so that
  342. # a valid XFR messages will follow.
  343. self.conn.reply_data = self.conn.create_response_data(
  344. bad_qid=self.soa_response_params['bad_qid'],
  345. response=self.soa_response_params['response'],
  346. rcode=self.soa_response_params['rcode'],
  347. questions=self.soa_response_params['questions'])
  348. if self.soa_response_params['axfr_after_soa'] != None:
  349. self.conn.response_generator = self.soa_response_params['axfr_after_soa']
  350. def _create_broken_response_data(self):
  351. # This helper method creates a bogus "DNS message" that only contains
  352. # 4 octets of data. The DNS message parser will raise an exception.
  353. bogus_data = b'xxxx'
  354. self.conn.reply_data = struct.pack('H', socket.htons(len(bogus_data)))
  355. self.conn.reply_data += bogus_data
  356. class TestXfrinRecorder(unittest.TestCase):
  357. def setUp(self):
  358. self.recorder = XfrinRecorder()
  359. def test_increment(self):
  360. self.assertEqual(self.recorder.count(), 0)
  361. self.recorder.increment(TEST_ZONE_NAME)
  362. self.assertEqual(self.recorder.count(), 1)
  363. # duplicate "increment" should probably be rejected. but it's not
  364. # checked at this moment
  365. self.recorder.increment(TEST_ZONE_NAME)
  366. self.assertEqual(self.recorder.count(), 2)
  367. def test_decrement(self):
  368. self.assertEqual(self.recorder.count(), 0)
  369. self.recorder.increment(TEST_ZONE_NAME)
  370. self.assertEqual(self.recorder.count(), 1)
  371. self.recorder.decrement(TEST_ZONE_NAME)
  372. self.assertEqual(self.recorder.count(), 0)
  373. def test_decrement_from_empty(self):
  374. self.assertEqual(self.recorder.count(), 0)
  375. self.recorder.decrement(TEST_ZONE_NAME)
  376. self.assertEqual(self.recorder.count(), 0)
  377. def test_inprogress(self):
  378. self.assertEqual(self.recorder.count(), 0)
  379. self.recorder.increment(TEST_ZONE_NAME)
  380. self.assertEqual(self.recorder.xfrin_in_progress(TEST_ZONE_NAME), True)
  381. self.recorder.decrement(TEST_ZONE_NAME)
  382. self.assertEqual(self.recorder.xfrin_in_progress(TEST_ZONE_NAME), False)
  383. class TestXfrin(unittest.TestCase):
  384. def setUp(self):
  385. # redirect output
  386. self.stderr_backup = sys.stderr
  387. sys.stderr = open(os.devnull, 'w')
  388. self.xfr = MockXfrin()
  389. self.args = {}
  390. self.args['zone_name'] = TEST_ZONE_NAME
  391. self.args['port'] = TEST_MASTER_PORT
  392. self.args['master'] = TEST_MASTER_IPV4_ADDRESS
  393. self.args['db_file'] = TEST_DB_FILE
  394. self.args['tsig_key'] = ''
  395. def tearDown(self):
  396. self.xfr.shutdown()
  397. sys.stderr= self.stderr_backup
  398. def _do_parse_zone_name_class(self):
  399. return self.xfr._parse_zone_name_and_class(self.args)
  400. def _do_parse_master_port(self):
  401. return self.xfr._parse_master_and_port(self.args)
  402. def test_parse_cmd_params(self):
  403. name, rrclass = self._do_parse_zone_name_class()
  404. master_addrinfo = self._do_parse_master_port()
  405. db_file = self.args.get('db_file')
  406. self.assertEqual(master_addrinfo[2][1], int(TEST_MASTER_PORT))
  407. self.assertEqual(name, TEST_ZONE_NAME)
  408. self.assertEqual(rrclass, TEST_RRCLASS)
  409. self.assertEqual(master_addrinfo[2][0], TEST_MASTER_IPV4_ADDRESS)
  410. self.assertEqual(db_file, TEST_DB_FILE)
  411. def test_parse_cmd_params_default_port(self):
  412. del self.args['port']
  413. master_addrinfo = self._do_parse_master_port()
  414. self.assertEqual(master_addrinfo[2][1], 53)
  415. def test_parse_cmd_params_ip6master(self):
  416. self.args['master'] = TEST_MASTER_IPV6_ADDRESS
  417. master_addrinfo = self._do_parse_master_port()
  418. self.assertEqual(master_addrinfo[2][0], TEST_MASTER_IPV6_ADDRESS)
  419. def test_parse_cmd_params_chclass(self):
  420. self.args['zone_class'] = 'CH'
  421. self.assertEqual(self._do_parse_zone_name_class()[1], RRClass.CH())
  422. def test_parse_cmd_params_bogusclass(self):
  423. self.args['zone_class'] = 'XXX'
  424. self.assertRaises(XfrinException, self._do_parse_zone_name_class)
  425. def test_parse_cmd_params_nozone(self):
  426. # zone name is mandatory.
  427. del self.args['zone_name']
  428. self.assertRaises(XfrinException, self._do_parse_zone_name_class)
  429. def test_parse_cmd_params_nomaster(self):
  430. # master address is mandatory.
  431. del self.args['master']
  432. master_addrinfo = self._do_parse_master_port()
  433. self.assertEqual(master_addrinfo[2][0], DEFAULT_MASTER)
  434. def test_parse_cmd_params_bad_ip4(self):
  435. self.args['master'] = '3.3.3.3.3'
  436. self.assertRaises(XfrinException, self._do_parse_master_port)
  437. def test_parse_cmd_params_bad_ip6(self):
  438. self.args['master'] = '1::1::1'
  439. self.assertRaises(XfrinException, self._do_parse_master_port)
  440. def test_parse_cmd_params_bad_port(self):
  441. self.args['port'] = '-1'
  442. self.assertRaises(XfrinException, self._do_parse_master_port)
  443. self.args['port'] = '65536'
  444. self.assertRaises(XfrinException, self._do_parse_master_port)
  445. self.args['port'] = 'http'
  446. self.assertRaises(XfrinException, self._do_parse_master_port)
  447. def test_command_handler_shutdown(self):
  448. self.assertEqual(self.xfr.command_handler("shutdown",
  449. None)['result'][0], 0)
  450. # shutdown command doesn't expect an argument, but accepts it if any.
  451. self.assertEqual(self.xfr.command_handler("shutdown",
  452. "unused")['result'][0], 0)
  453. def test_command_handler_retransfer(self):
  454. self.assertEqual(self.xfr.command_handler("retransfer",
  455. self.args)['result'][0], 0)
  456. def test_command_handler_retransfer_badcommand(self):
  457. self.args['master'] = 'invalid'
  458. self.assertEqual(self.xfr.command_handler("retransfer",
  459. self.args)['result'][0], 1)
  460. def test_command_handler_retransfer_quota(self):
  461. for i in range(self.xfr._max_transfers_in - 1):
  462. self.xfr.recorder.increment(str(i) + TEST_ZONE_NAME)
  463. # there can be one more outstanding transfer.
  464. self.assertEqual(self.xfr.command_handler("retransfer",
  465. self.args)['result'][0], 0)
  466. # make sure the # xfrs would excceed the quota
  467. self.xfr.recorder.increment(str(self.xfr._max_transfers_in) + TEST_ZONE_NAME)
  468. # this one should fail
  469. self.assertEqual(self.xfr.command_handler("retransfer",
  470. self.args)['result'][0], 1)
  471. def test_command_handler_retransfer_inprogress(self):
  472. self.xfr.recorder.increment(TEST_ZONE_NAME)
  473. self.assertEqual(self.xfr.command_handler("retransfer",
  474. self.args)['result'][0], 1)
  475. def test_command_handler_retransfer_nomodule(self):
  476. dns_module = sys.modules['pydnspp'] # this must exist
  477. del sys.modules['pydnspp']
  478. self.assertEqual(self.xfr.command_handler("retransfer",
  479. self.args)['result'][0], 1)
  480. # sys.modules is global, so we must recover it
  481. sys.modules['pydnspp'] = dns_module
  482. def test_command_handler_refresh(self):
  483. # at this level, refresh is no different than retransfer.
  484. # just confirm the successful case with a different family of address.
  485. self.args['master'] = TEST_MASTER_IPV6_ADDRESS
  486. self.assertEqual(self.xfr.command_handler("refresh",
  487. self.args)['result'][0], 0)
  488. def test_command_handler_notify(self):
  489. # at this level, refresh is no different than retransfer.
  490. self.args['master'] = TEST_MASTER_IPV6_ADDRESS
  491. # ...but the zone is unknown so this would return an error
  492. self.assertEqual(self.xfr.command_handler("notify",
  493. self.args)['result'][0], 1)
  494. def test_command_handler_unknown(self):
  495. self.assertEqual(self.xfr.command_handler("xxx", None)['result'][0], 1)
  496. def test_command_handler_transfers_in(self):
  497. self.assertEqual(self.xfr.config_handler({})['result'][0], 0)
  498. self.assertEqual(self.xfr.config_handler({'transfers_in': 3})['result'][0], 0)
  499. self.assertEqual(self.xfr._max_transfers_in, 3)
  500. def test_command_handler_zones(self):
  501. zones = { 'zones': [
  502. { 'name': 'test.com',
  503. 'master_addr': '1.1.1.1',
  504. 'master_port': 53
  505. }
  506. ]}
  507. self.assertEqual(self.xfr.config_handler(zones)['result'][0], 0)
  508. zones = { 'zones': [
  509. { 'master_addr': '1.1.1.1',
  510. 'master_port': 53
  511. }
  512. ]}
  513. self.assertEqual(self.xfr.config_handler(zones)['result'][0], 1)
  514. def raise_interrupt():
  515. raise KeyboardInterrupt()
  516. def raise_ccerror():
  517. raise isc.cc.session.SessionError('test error')
  518. def raise_exception():
  519. raise Exception('test exception')
  520. class TestMain(unittest.TestCase):
  521. def setUp(self):
  522. MockXfrin.check_command_hook = None
  523. def tearDown(self):
  524. MockXfrin.check_command_hook = None
  525. def test_startup(self):
  526. main(MockXfrin, False)
  527. def test_startup_interrupt(self):
  528. MockXfrin.check_command_hook = raise_interrupt
  529. main(MockXfrin, False)
  530. def test_startup_ccerror(self):
  531. MockXfrin.check_command_hook = raise_ccerror
  532. main(MockXfrin, False)
  533. def test_startup_generalerror(self):
  534. MockXfrin.check_command_hook = raise_exception
  535. main(MockXfrin, False)
  536. if __name__== "__main__":
  537. try:
  538. unittest.main()
  539. except KeyboardInterrupt as e:
  540. print(e)