dns_test.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. # Copyright (C) 2011 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import unittest
  16. import socket
  17. from isc.acl.acl import LoaderError, Error, ACCEPT, REJECT, DROP
  18. from isc.acl.dns import *
  19. def get_sockaddr(address, port):
  20. '''This is a simple shortcut wrapper for getaddrinfo'''
  21. ai = socket.getaddrinfo(address, port, 0, 0, 0, socket.AI_NUMERICHOST)[0]
  22. return ai[4]
  23. def get_acl(prefix):
  24. '''This is a simple shortcut for creating an ACL containing single rule
  25. that accepts addresses for the given IP prefix (and reject any others
  26. by default)
  27. '''
  28. return load_request_acl('[{"action": "ACCEPT", "from": "' + prefix + '"}]')
  29. def get_context(address):
  30. '''This is a simple shortcut wrapper for creating a RequestContext
  31. object with a given IP address. Port number doesn't matter in the test
  32. (as of the initial implementation), so it's fixed for simplicity.
  33. '''
  34. return RequestContext(get_sockaddr(address, 53000))
  35. # These are commonly used RequestContext object
  36. CONTEXT4 = get_context('192.0.2.1')
  37. CONTEXT6 = get_context('2001:db8::1')
  38. class RequestContextTest(unittest.TestCase):
  39. def test_construct(self):
  40. # Construct the context from IPv4/IPv6 addresses, check the object
  41. # by printing it.
  42. self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
  43. 'remote_addr=[192.0.2.1]:53001>',
  44. RequestContext(('192.0.2.1', 53001)).__str__())
  45. self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
  46. 'remote_addr=[2001:db8::1234]:53006>',
  47. RequestContext(('2001:db8::1234', 53006,
  48. 0, 0)).__str__())
  49. # Unusual case: port number overflows (this constructor allows that,
  50. # although it should be rare anyway; the socket address should
  51. # normally come from the Python socket module.
  52. self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
  53. 'remote_addr=[192.0.2.1]:0>',
  54. RequestContext(('192.0.2.1', 65536)).__str__())
  55. # same test using socket.getaddrinfo() to ensure it accepts the sock
  56. # address representation used in the Python socket module.
  57. self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
  58. 'remote_addr=[192.0.2.1]:53001>',
  59. RequestContext(get_sockaddr('192.0.2.1',
  60. 53001)).__str__())
  61. self.assertEqual('<isc.acl.dns.RequestContext object, ' + \
  62. 'remote_addr=[2001:db8::1234]:53006>',
  63. RequestContext(get_sockaddr('2001:db8::1234',
  64. 53006)).__str__())
  65. #
  66. # Invalid parameters (in our expected usage this should not happen
  67. # because the sockaddr would come from the Python socket module, but
  68. # validation should still be performed correctly)
  69. #
  70. # not a tuple
  71. self.assertRaises(TypeError, RequestContext, 1)
  72. # invalid number of parameters
  73. self.assertRaises(TypeError, RequestContext, ('192.0.2.1', 53), 0)
  74. # tuple is not in the form of sockaddr
  75. self.assertRaises(TypeError, RequestContext, (0, 53))
  76. self.assertRaises(TypeError, RequestContext, ('192.0.2.1', 'http'))
  77. self.assertRaises(TypeError, RequestContext, ('::', 0, 'flow', 0))
  78. # invalid address
  79. self.assertRaises(Error, RequestContext, ('example.com', 5300))
  80. self.assertRaises(Error, RequestContext, ('192.0.2.1.1', 5300))
  81. self.assertRaises(Error, RequestContext, ('2001:db8:::1', 5300))
  82. class RequestACLTest(unittest.TestCase):
  83. def test_direct_construct(self):
  84. self.assertRaises(TypeError, RequestACL)
  85. def test_request_loader(self):
  86. # these shouldn't raise an exception
  87. load_request_acl('[{"action": "DROP"}]')
  88. load_request_acl('[{"action": "DROP", "from": "192.0.2.1"}]')
  89. # Invalid types
  90. self.assertRaises(TypeError, load_request_acl, 1)
  91. self.assertRaises(TypeError, load_request_acl, [])
  92. # Incorrect number of arguments
  93. self.assertRaises(TypeError, load_request_acl,
  94. '[{"action": "DROP"}]', 0)
  95. def test_bad_acl_syntax(self):
  96. # the following are derived from loader_test.cc
  97. self.assertRaises(LoaderError, load_request_acl, '{}');
  98. self.assertRaises(LoaderError, load_request_acl, '42');
  99. self.assertRaises(LoaderError, load_request_acl, 'true');
  100. self.assertRaises(LoaderError, load_request_acl, 'null');
  101. self.assertRaises(LoaderError, load_request_acl, '"hello"');
  102. self.assertRaises(LoaderError, load_request_acl, '[42]');
  103. self.assertRaises(LoaderError, load_request_acl, '["hello"]');
  104. self.assertRaises(LoaderError, load_request_acl, '[[]]');
  105. self.assertRaises(LoaderError, load_request_acl, '[true]');
  106. self.assertRaises(LoaderError, load_request_acl, '[null]');
  107. self.assertRaises(LoaderError, load_request_acl, '[{}]');
  108. # the following are derived from dns_test.cc
  109. self.assertRaises(LoaderError, load_request_acl,
  110. '[{"action": "ACCEPT", "bad": "192.0.2.1"}]')
  111. self.assertRaises(LoaderError, load_request_acl,
  112. '[{"action": "ACCEPT", "from": 4}]')
  113. self.assertRaises(LoaderError, load_request_acl,
  114. '[{"action": "ACCEPT", "from": []}]')
  115. self.assertRaises(LoaderError, load_request_acl,
  116. '[{"action": "ACCEPT", "from": "bad"}]')
  117. self.assertRaises(LoaderError, load_request_acl,
  118. '[{"action": "ACCEPT", "from": null}]')
  119. def test_bad_acl_ipsyntax(self):
  120. # this test is derived from ip_check_unittest.cc
  121. self.assertRaises(LoaderError, load_request_acl,
  122. '[{"action": "DROP", "from": "192.0.2.43/-1"}]')
  123. self.assertRaises(LoaderError, load_request_acl,
  124. '[{"action": "DROP", "from": "192.0.2.43//1"')
  125. self.assertRaises(LoaderError, load_request_acl,
  126. '[{"action": "DROP", "from": "192.0.2.43/1/"')
  127. self.assertRaises(LoaderError, load_request_acl,
  128. '[{"action": "DROP", "from": "/192.0.2.43/1"')
  129. self.assertRaises(LoaderError, load_request_acl,
  130. '[{"action": "DROP", "from": "2001:db8::/xxxx"')
  131. self.assertRaises(LoaderError, load_request_acl,
  132. '[{"action": "DROP", "from": "2001:db8::/32/s"')
  133. self.assertRaises(LoaderError, load_request_acl,
  134. '[{"action": "DROP", "from": "1/"')
  135. self.assertRaises(LoaderError, load_request_acl,
  136. '[{"action": "DROP", "from": "/1"')
  137. self.assertRaises(LoaderError, load_request_acl,
  138. '[{"action": "DROP", "from": "192.0.2.0/33"')
  139. self.assertRaises(LoaderError, load_request_acl,
  140. '[{"action": "DROP", "from": "::1/129"')
  141. def test_execute(self):
  142. # tests derived from dns_test.cc. We don't directly expose checks
  143. # in the python wrapper, so we test it via execute().
  144. self.assertEqual(ACCEPT, get_acl('192.0.2.1').execute(CONTEXT4))
  145. self.assertEqual(REJECT, get_acl('192.0.2.53').execute(CONTEXT4))
  146. self.assertEqual(ACCEPT, get_acl('192.0.2.0/24').execute(CONTEXT4))
  147. self.assertEqual(REJECT, get_acl('192.0.1.0/24').execute(CONTEXT4))
  148. self.assertEqual(REJECT, get_acl('192.0.1.0/24').execute(CONTEXT4))
  149. self.assertEqual(ACCEPT, get_acl('2001:db8::1').execute(CONTEXT6))
  150. self.assertEqual(REJECT, get_acl('2001:db8::53').execute(CONTEXT6))
  151. self.assertEqual(ACCEPT, get_acl('2001:db8::/64').execute(CONTEXT6))
  152. self.assertEqual(REJECT, get_acl('2001:db8:1::/64').execute(CONTEXT6))
  153. self.assertEqual(REJECT, get_acl('32.1.13.184').execute(CONTEXT6))
  154. # A bit more complicated example, derived from resolver_config_unittest
  155. acl = load_request_acl('[ {"action": "ACCEPT", ' +
  156. ' "from": "192.0.2.1"},' +
  157. ' {"action": "REJECT",' +
  158. ' "from": "192.0.2.0/24"},' +
  159. ' {"action": "DROP",' +
  160. ' "from": "2001:db8::1"},' +
  161. '] }')
  162. self.assertEqual(ACCEPT, acl.execute(CONTEXT4))
  163. self.assertEqual(REJECT, acl.execute(get_context('192.0.2.2')))
  164. self.assertEqual(DROP, acl.execute(get_context('2001:db8::1')))
  165. self.assertEqual(REJECT, acl.execute(get_context('2001:db8::2')))
  166. def test_bad_execute(self):
  167. acl = get_acl('192.0.2.1')
  168. # missing parameter
  169. self.assertRaises(TypeError, acl.execute)
  170. # too many parameters
  171. self.assertRaises(TypeError, acl.execute, get_context('192.0.2.2'), 0)
  172. # type mismatch
  173. self.assertRaises(TypeError, acl.execute, 'bad parameter')
  174. if __name__ == '__main__':
  175. unittest.main()