dns_test.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. # Copyright (C) 2011 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import unittest
  16. import socket
  17. from pydnspp import *
  18. from isc.acl.acl import LoaderError, Error, ACCEPT, REJECT, DROP
  19. from isc.acl.dns import *
  20. def get_sockaddr(address, port):
  21. '''This is a simple shortcut wrapper for getaddrinfo'''
  22. ai = socket.getaddrinfo(address, port, 0, socket.SOCK_DGRAM,
  23. socket.IPPROTO_UDP, socket.AI_NUMERICHOST)[0]
  24. return ai[4]
  25. def get_acl(prefix):
  26. '''This is a simple shortcut for creating an ACL containing single rule
  27. that accepts addresses for the given IP prefix (and reject any others
  28. by default)
  29. '''
  30. return REQUEST_LOADER.load('[{"action": "ACCEPT", "from": "' + \
  31. prefix + '"}]')
  32. def get_acl_json(prefix):
  33. '''Same as get_acl, but this function passes a Python representation of
  34. JSON to the loader, not a string.'''
  35. json = [{"action": "ACCEPT"}]
  36. json[0]["from"] = prefix
  37. return REQUEST_LOADER.load(json)
  38. # The following two are similar to the previous two, but use a TSIG key name
  39. # instead of IP prefix.
  40. def get_tsig_acl(key):
  41. return REQUEST_LOADER.load('[{"action": "ACCEPT", "key": "' + \
  42. key + '"}]')
  43. def get_tsig_acl_json(key):
  44. json = [{"action": "ACCEPT"}]
  45. json[0]["key"] = key
  46. return REQUEST_LOADER.load(json)
  47. # commonly used TSIG RDATA. For the purpose of ACL checks only the key name
  48. # matters; other parrameters are simply borrowed from some other tests, which
  49. # can be anything for the purpose of the tests here.
  50. TSIG_RDATA = TSIG("hmac-md5.sig-alg.reg.int. 1302890362 " + \
  51. "300 16 2tra2tra2tra2tra2tra2g== " + \
  52. "11621 0 0")
  53. def get_context(address, key_name=None):
  54. '''This is a simple shortcut wrapper for creating a RequestContext
  55. object with a given IP address and optionally TSIG key name.
  56. Port number doesn't matter in the test (as of the initial implementation),
  57. so it's fixed for simplicity.
  58. If key_name is not None, it internally creates a (faked) TSIG record
  59. and constructs a context with that key. Note that only the key name
  60. matters for the purpose of ACL checks.
  61. '''
  62. tsig_record = None
  63. if key_name is not None:
  64. tsig_record = TSIGRecord(Name(key_name), TSIG_RDATA)
  65. return RequestContext(get_sockaddr(address, 53000), tsig_record)
  66. # These are commonly used RequestContext object
  67. CONTEXT4 = get_context('192.0.2.1')
  68. CONTEXT6 = get_context('2001:db8::1')
  69. class RequestContextTest(unittest.TestCase):
  70. def test_construct(self):
  71. # Construct the context from IPv4/IPv6 addresses, check the object
  72. # by printing it.
  73. self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
  74. 'remote_addr=[192.0.2.1]:53001>',
  75. RequestContext(('192.0.2.1', 53001)).__str__())
  76. self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
  77. 'remote_addr=[2001:db8::1234]:53006>',
  78. RequestContext(('2001:db8::1234', 53006,
  79. 0, 0)).__str__())
  80. # Construct the context from IP address and a TSIG record.
  81. tsig_record = TSIGRecord(Name("key.example.com"), TSIG_RDATA)
  82. self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
  83. 'remote_addr=[192.0.2.1]:53001, ' + \
  84. 'key=key.example.com.>',
  85. RequestContext(('192.0.2.1', 53001),
  86. tsig_record).__str__())
  87. # same with IPv6 address, just in case.
  88. self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
  89. 'remote_addr=[2001:db8::1234]:53006, ' + \
  90. 'key=key.example.com.>',
  91. RequestContext(('2001:db8::1234', 53006,
  92. 0, 0), tsig_record).__str__())
  93. # Unusual case: port number overflows (this constructor allows that,
  94. # although it should be rare anyway; the socket address should
  95. # normally come from the Python socket module.
  96. self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
  97. 'remote_addr=[192.0.2.1]:0>',
  98. RequestContext(('192.0.2.1', 65536)).__str__())
  99. # same test using socket.getaddrinfo() to ensure it accepts the sock
  100. # address representation used in the Python socket module.
  101. self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
  102. 'remote_addr=[192.0.2.1]:53001>',
  103. RequestContext(get_sockaddr('192.0.2.1',
  104. 53001)).__str__())
  105. self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
  106. 'remote_addr=[2001:db8::1234]:53006>',
  107. RequestContext(get_sockaddr('2001:db8::1234',
  108. 53006)).__str__())
  109. #
  110. # Invalid parameters (in our expected usage this should not happen
  111. # because the sockaddr would come from the Python socket module, but
  112. # validation should still be performed correctly)
  113. #
  114. # not a tuple
  115. self.assertRaises(TypeError, RequestContext, 1)
  116. # invalid number of parameters
  117. self.assertRaises(TypeError, RequestContext, ('192.0.2.1', 53), 0, 1)
  118. # type error for TSIG
  119. self.assertRaises(TypeError, RequestContext, ('192.0.2.1', 53), tsig=1)
  120. # tuple is not in the form of sockaddr
  121. self.assertRaises(TypeError, RequestContext, (0, 53))
  122. self.assertRaises(TypeError, RequestContext, ('192.0.2.1', 'http'))
  123. self.assertRaises(TypeError, RequestContext, ('::', 0, 'flow', 0))
  124. # invalid address
  125. self.assertRaises(Error, RequestContext, ('example.com', 5300))
  126. self.assertRaises(Error, RequestContext, ('192.0.2.1.1', 5300))
  127. self.assertRaises(Error, RequestContext, ('2001:db8:::1', 5300))
  128. class RequestACLTest(unittest.TestCase):
  129. def test_direct_construct(self):
  130. self.assertRaises(Error, RequestACL)
  131. def test_request_loader(self):
  132. # these shouldn't raise an exception
  133. REQUEST_LOADER.load('[{"action": "DROP"}]')
  134. REQUEST_LOADER.load([{"action": "DROP"}])
  135. REQUEST_LOADER.load('[{"action": "DROP", "from": "192.0.2.1"}]')
  136. REQUEST_LOADER.load([{"action": "DROP", "from": "192.0.2.1"}])
  137. # Invalid types (note that arguments like '1' or '[]' is of valid
  138. # 'type' (but syntax error at a higher level)). So we need to use
  139. # something that is not really JSON nor string.
  140. self.assertRaises(TypeError, REQUEST_LOADER.load, b'')
  141. # Incorrect number of arguments
  142. self.assertRaises(TypeError, REQUEST_LOADER.load,
  143. '[{"action": "DROP"}]', 0)
  144. def test_bad_acl_syntax(self):
  145. # the following are derived from loader_test.cc
  146. self.assertRaises(LoaderError, REQUEST_LOADER.load, '{}');
  147. self.assertRaises(LoaderError, REQUEST_LOADER.load, {});
  148. self.assertRaises(LoaderError, REQUEST_LOADER.load, '42');
  149. self.assertRaises(LoaderError, REQUEST_LOADER.load, 42);
  150. self.assertRaises(LoaderError, REQUEST_LOADER.load, 'true');
  151. self.assertRaises(LoaderError, REQUEST_LOADER.load, True);
  152. self.assertRaises(LoaderError, REQUEST_LOADER.load, 'null');
  153. self.assertRaises(LoaderError, REQUEST_LOADER.load, None);
  154. self.assertRaises(LoaderError, REQUEST_LOADER.load, '"hello"');
  155. self.assertRaises(LoaderError, REQUEST_LOADER.load, "hello");
  156. self.assertRaises(LoaderError, REQUEST_LOADER.load, '[42]');
  157. self.assertRaises(LoaderError, REQUEST_LOADER.load, [42]);
  158. self.assertRaises(LoaderError, REQUEST_LOADER.load, '["hello"]');
  159. self.assertRaises(LoaderError, REQUEST_LOADER.load, ["hello"]);
  160. self.assertRaises(LoaderError, REQUEST_LOADER.load, '[[]]');
  161. self.assertRaises(LoaderError, REQUEST_LOADER.load, [[]]);
  162. self.assertRaises(LoaderError, REQUEST_LOADER.load, '[true]');
  163. self.assertRaises(LoaderError, REQUEST_LOADER.load, [True]);
  164. self.assertRaises(LoaderError, REQUEST_LOADER.load, '[null]');
  165. self.assertRaises(LoaderError, REQUEST_LOADER.load, [None]);
  166. self.assertRaises(LoaderError, REQUEST_LOADER.load, '[{}]');
  167. self.assertRaises(LoaderError, REQUEST_LOADER.load, [{}]);
  168. # the following are derived from dns_test.cc
  169. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  170. '[{"action": "ACCEPT", "bad": "192.0.2.1"}]')
  171. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  172. [{"action": "ACCEPT", "bad": "192.0.2.1"}])
  173. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  174. '[{"action": "ACCEPT", "from": 4}]')
  175. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  176. [{"action": "ACCEPT", "from": 4}])
  177. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  178. '[{"action": "ACCEPT", "key": 1}]')
  179. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  180. [{"action": "ACCEPT", "key": 1}])
  181. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  182. '[{"action": "ACCEPT", "key": {}}]')
  183. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  184. [{"action": "ACCEPT", "key": {}}])
  185. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  186. '[{"action": "ACCEPT", "from": "bad"}]')
  187. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  188. [{"action": "ACCEPT", "from": "bad"}])
  189. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  190. [{"action": "ACCEPT", "key": "bad..name"}])
  191. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  192. [{"action": "ACCEPT", "key": "bad..name"}])
  193. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  194. '[{"action": "ACCEPT", "from": null}]')
  195. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  196. [{"action": "ACCEPT", "from": None}])
  197. def test_bad_acl_ipsyntax(self):
  198. # this test is derived from ip_check_unittest.cc
  199. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  200. '[{"action": "DROP", "from": "192.0.2.43/-1"}]')
  201. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  202. [{"action": "DROP", "from": "192.0.2.43/-1"}])
  203. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  204. '[{"action": "DROP", "from": "192.0.2.43//1"}]')
  205. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  206. [{"action": "DROP", "from": "192.0.2.43//1"}])
  207. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  208. '[{"action": "DROP", "from": "192.0.2.43/1/"}]')
  209. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  210. [{"action": "DROP", "from": "192.0.2.43/1/"}])
  211. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  212. '[{"action": "DROP", "from": "/192.0.2.43/1"}]')
  213. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  214. [{"action": "DROP", "from": "/192.0.2.43/1"}])
  215. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  216. '[{"action": "DROP", "from": "2001:db8::/xxxx"}]')
  217. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  218. [{"action": "DROP", "from": "2001:db8::/xxxx"}])
  219. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  220. '[{"action": "DROP", "from": "2001:db8::/32/s"}]')
  221. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  222. [{"action": "DROP", "from": "2001:db8::/32/s"}])
  223. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  224. '[{"action": "DROP", "from": "1/"}]')
  225. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  226. [{"action": "DROP", "from": "1/"}])
  227. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  228. '[{"action": "DROP", "from": "/1"}]')
  229. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  230. [{"action": "DROP", "from": "/1"}])
  231. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  232. '[{"action": "DROP", "from": "192.0.2.0/33"}]')
  233. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  234. [{"action": "DROP", "from": "192.0.2.0/33"}])
  235. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  236. '[{"action": "DROP", "from": "::1/129"}]')
  237. self.assertRaises(LoaderError, REQUEST_LOADER.load,
  238. [{"action": "DROP", "from": "::1/129"}])
  239. def test_execute(self):
  240. # tests derived from dns_test.cc. We don't directly expose checks
  241. # in the python wrapper, so we test it via execute().
  242. self.assertEqual(ACCEPT, get_acl('192.0.2.1').execute(CONTEXT4))
  243. self.assertEqual(ACCEPT, get_acl_json('192.0.2.1').execute(CONTEXT4))
  244. self.assertEqual(REJECT, get_acl('192.0.2.53').execute(CONTEXT4))
  245. self.assertEqual(REJECT, get_acl_json('192.0.2.53').execute(CONTEXT4))
  246. self.assertEqual(ACCEPT, get_acl('192.0.2.0/24').execute(CONTEXT4))
  247. self.assertEqual(ACCEPT, get_acl_json('192.0.2.0/24').execute(CONTEXT4))
  248. self.assertEqual(REJECT, get_acl('192.0.1.0/24').execute(CONTEXT4))
  249. self.assertEqual(REJECT, get_acl_json('192.0.1.0/24').execute(CONTEXT4))
  250. self.assertEqual(REJECT, get_acl('192.0.1.0/24').execute(CONTEXT4))
  251. self.assertEqual(REJECT, get_acl_json('192.0.1.0/24').execute(CONTEXT4))
  252. self.assertEqual(ACCEPT, get_acl('2001:db8::1').execute(CONTEXT6))
  253. self.assertEqual(ACCEPT, get_acl_json('2001:db8::1').execute(CONTEXT6))
  254. self.assertEqual(REJECT, get_acl('2001:db8::53').execute(CONTEXT6))
  255. self.assertEqual(REJECT, get_acl_json('2001:db8::53').execute(CONTEXT6))
  256. self.assertEqual(ACCEPT, get_acl('2001:db8::/64').execute(CONTEXT6))
  257. self.assertEqual(ACCEPT,
  258. get_acl_json('2001:db8::/64').execute(CONTEXT6))
  259. self.assertEqual(REJECT, get_acl('2001:db8:1::/64').execute(CONTEXT6))
  260. self.assertEqual(REJECT,
  261. get_acl_json('2001:db8:1::/64').execute(CONTEXT6))
  262. self.assertEqual(REJECT, get_acl('32.1.13.184').execute(CONTEXT6))
  263. self.assertEqual(REJECT, get_acl_json('32.1.13.184').execute(CONTEXT6))
  264. # TSIG checks, derived from dns_test.cc
  265. self.assertEqual(ACCEPT, get_tsig_acl('key.example.com').\
  266. execute(get_context('192.0.2.1',
  267. 'key.example.com')))
  268. self.assertEqual(REJECT, get_tsig_acl_json('key.example.com').\
  269. execute(get_context('192.0.2.1',
  270. 'badkey.example.com')))
  271. self.assertEqual(ACCEPT, get_tsig_acl('key.example.com').\
  272. execute(get_context('2001:db8::1',
  273. 'key.example.com')))
  274. self.assertEqual(REJECT, get_tsig_acl_json('key.example.com').\
  275. execute(get_context('2001:db8::1',
  276. 'badkey.example.com')))
  277. self.assertEqual(REJECT, get_tsig_acl('key.example.com').\
  278. execute(CONTEXT4))
  279. self.assertEqual(REJECT, get_tsig_acl_json('key.example.com').\
  280. execute(CONTEXT4))
  281. self.assertEqual(REJECT, get_tsig_acl('key.example.com').\
  282. execute(CONTEXT6))
  283. self.assertEqual(REJECT, get_tsig_acl_json('key.example.com').\
  284. execute(CONTEXT6))
  285. # A bit more complicated example, derived from resolver_config_unittest
  286. acl = REQUEST_LOADER.load('[ {"action": "ACCEPT", ' +
  287. ' "from": "192.0.2.1"},' +
  288. ' {"action": "REJECT",' +
  289. ' "from": "192.0.2.0/24"},' +
  290. ' {"action": "DROP",' +
  291. ' "from": "2001:db8::1"},' +
  292. ']')
  293. self.assertEqual(ACCEPT, acl.execute(CONTEXT4))
  294. self.assertEqual(REJECT, acl.execute(get_context('192.0.2.2')))
  295. self.assertEqual(DROP, acl.execute(get_context('2001:db8::1')))
  296. self.assertEqual(REJECT, acl.execute(get_context('2001:db8::2')))
  297. # same test using the JSON representation
  298. acl = REQUEST_LOADER.load([{"action": "ACCEPT", "from": "192.0.2.1"},
  299. {"action": "REJECT",
  300. "from": "192.0.2.0/24"},
  301. {"action": "DROP", "from": "2001:db8::1"}])
  302. self.assertEqual(ACCEPT, acl.execute(CONTEXT4))
  303. self.assertEqual(REJECT, acl.execute(get_context('192.0.2.2')))
  304. self.assertEqual(DROP, acl.execute(get_context('2001:db8::1')))
  305. self.assertEqual(REJECT, acl.execute(get_context('2001:db8::2')))
  306. def test_bad_execute(self):
  307. acl = get_acl('192.0.2.1')
  308. # missing parameter
  309. self.assertRaises(TypeError, acl.execute)
  310. # too many parameters
  311. self.assertRaises(TypeError, acl.execute, get_context('192.0.2.2'), 0)
  312. # type mismatch
  313. self.assertRaises(TypeError, acl.execute, 'bad parameter')
  314. class RequestLoaderTest(unittest.TestCase):
  315. # Note: loading ACLs is tested in other test cases.
  316. def test_construct(self):
  317. # at least for now, we don't allow direct construction.
  318. self.assertRaises(Error, RequestLoader)
  319. if __name__ == '__main__':
  320. unittest.main()