transfer.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # Copyright (C) 2011 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. # This script provides transfer (ixfr/axfr) test functionality
  16. # It provides steps to perform the client side of a transfer,
  17. # and inspect the results.
  18. #
  19. # Like querying.py, it uses dig to do the transfers, and
  20. # places its output in a result structure
  21. #
  22. # This is done in a different file with different steps than
  23. # querying, because the format of dig's output is
  24. # very different than that of normal queries
  25. from lettuce import *
  26. import subprocess
  27. import re
  28. class TransferResult(object):
  29. """This object stores transfer results, which is essentially simply
  30. a list of RR strings. These are stored, as read from dig's output,
  31. in the list 'records'. So for an IXFR transfer it contains
  32. the exact result as returned by the server.
  33. If this list is empty, the transfer failed for some reason (dig
  34. does not really show error results well, unfortunately).
  35. We may add some smarter inspection functionality to this class
  36. later.
  37. """
  38. def __init__(self, args):
  39. """Perform the transfer by calling dig, and store the results.
  40. args is the array of arguments to pass to Popen(), this
  41. is passed as is since for IXFR and AXFR there can be very
  42. different options"""
  43. self.records = []
  44. dig_process = subprocess.Popen(args, 1, None, None, subprocess.PIPE,
  45. None)
  46. result = dig_process.wait()
  47. assert result == 0
  48. print("[XX]")
  49. print("[XX]")
  50. print("[XX]")
  51. print("[XX]")
  52. for l in dig_process.stdout:
  53. line = l.strip()
  54. if len(line) > 0 and line[0] != ';':
  55. self.records.append(line)
  56. print(line)
  57. print("[XX]")
  58. print("[XX]")
  59. print("[XX]")
  60. print("[XX]")
  61. @step('An AXFR transfer of ([\w.]+)(?: from ([^:]+)(?::([0-9]+))?)?')
  62. def perform_axfr(step, zone_name, address, port):
  63. """Perform an AXFR transfer, and store the result in world.transfer_result
  64. """
  65. if address is None:
  66. address = "127.0.0.1"
  67. if port is None:
  68. port = 47806
  69. args = [ 'dig', 'AXFR', '@' + str(address), '-p', str(port), zone_name ]
  70. world.transfer_result = TransferResult(args)
  71. @step('An IXFR transfer of ([\w.]+) (\d+)(?: over (tcp|udp))?(?: from ([^:]+)(?::([0-9]+))?)?')
  72. def perform_ixfr(step, zone_name, serial, protocol, address, port):
  73. """Perform an AXFR transfer, and store the result in world.transfer_result
  74. """
  75. if address is None:
  76. address = "127.0.0.1"
  77. if port is None:
  78. port = 47806
  79. args = [ 'dig', 'IXFR=' + str(serial), '@' + str(address), '-p', str(port), zone_name ]
  80. if protocol is not None:
  81. assert protocol == 'tcp' or protocol == 'udp', "Unknown protocol: " + protocol
  82. if protocol == 'tcp':
  83. args.append('+tcp')
  84. elif protocol == 'udp':
  85. args.append('+notcp')
  86. world.transfer_result = TransferResult(args)
  87. @step('transfer result should have (\d+) rrs?')
  88. def check_transfer_result_count(step, number_of_rrs):
  89. """Check the number of rrs in the transfer result object created by
  90. the AXFR transfer or IXFR transfer step."""
  91. assert int(number_of_rrs) == len(world.transfer_result.records),\
  92. "Got " + str(len(world.transfer_result.records)) +\
  93. " records, expected " + str(number_of_rrs)
  94. @step('full result of the last transfer should be')
  95. def check_full_transfer_result(step, ):
  96. records_string = "\n".join(world.transfer_result.records)
  97. records_string = re.sub("[ \t]+", " ", records_string)
  98. expect = re.sub("[ \t]+", " ", step.multiline)
  99. assert records_string.strip() == expect.strip(),\
  100. "Got:\n'" + records_string + "'\nExpected:\n'" + expect + "'"