|
@@ -64,44 +64,55 @@ class QueryResult(object):
|
|
|
for out in dig_process.stdout:
|
|
|
self.line_handler(out)
|
|
|
|
|
|
+ def _check_next_header(self, line):
|
|
|
+ """Returns true if we found a next header, and sets the internal
|
|
|
+ line handler to the appropriate value.
|
|
|
+ """
|
|
|
+ if line == ";; ANSWER SECTION:\n":
|
|
|
+ self.line_handler = self.parse_answer
|
|
|
+ elif line == ";; AUTHORITY SECTION:\n":
|
|
|
+ self.line_handler = self.parse_authority
|
|
|
+ elif line == ";; ADDITIONAL SECTION:\n":
|
|
|
+ self.line_handler = self.parse_additional
|
|
|
+ elif line.startswith(";; Query time"):
|
|
|
+ self.line_handler = self.parse_footer
|
|
|
+ else:
|
|
|
+ return False
|
|
|
+ return True
|
|
|
+
|
|
|
def parse_header(self, line):
|
|
|
- status_match = self.status_re.search(line)
|
|
|
- flags_match = self.flags_re.search(line)
|
|
|
- if status_match is not None:
|
|
|
- self.opcode = status_match.group(1)
|
|
|
- self.rcode = status_match.group(2)
|
|
|
- elif flags_match is not None:
|
|
|
- self.flags = flags_match.group(1)
|
|
|
- self.qdcount = flags_match.group(2)
|
|
|
- self.ancount = flags_match.group(3)
|
|
|
- self.nscount = flags_match.group(4)
|
|
|
- self.adcount = flags_match.group(5)
|
|
|
- elif line == ";; QUESTION SECTION:\n":
|
|
|
- self.line_handler = self.parse_question
|
|
|
+ if not self._check_next_header(line):
|
|
|
+ status_match = self.status_re.search(line)
|
|
|
+ flags_match = self.flags_re.search(line)
|
|
|
+ if status_match is not None:
|
|
|
+ self.opcode = status_match.group(1)
|
|
|
+ self.rcode = status_match.group(2)
|
|
|
+ elif flags_match is not None:
|
|
|
+ self.flags = flags_match.group(1)
|
|
|
+ self.qdcount = flags_match.group(2)
|
|
|
+ self.ancount = flags_match.group(3)
|
|
|
+ self.nscount = flags_match.group(4)
|
|
|
+ self.adcount = flags_match.group(5)
|
|
|
|
|
|
def parse_question(self, line):
|
|
|
- if line == ";; ANSWER SECTION:\n":
|
|
|
- self.line_handler = self.parse_answer
|
|
|
- elif line != "\n":
|
|
|
- self.question_section.append(line)
|
|
|
+ if not self._check_next_header(line):
|
|
|
+ if line != "\n":
|
|
|
+ self.question_section.append(line.strip())
|
|
|
|
|
|
def parse_answer(self, line):
|
|
|
- if line == ";; AUTHORITY SECTION:\n":
|
|
|
- self.line_handler = self.parse_authority
|
|
|
- elif line != "\n":
|
|
|
- self.answer_section.append(line)
|
|
|
+ if not self._check_next_header(line):
|
|
|
+ if line != "\n":
|
|
|
+ self.answer_section.append(line.strip())
|
|
|
|
|
|
def parse_authority(self, line):
|
|
|
- if line == ";; ADDITIONAL SECTION:\n":
|
|
|
- self.line_handler = self.parse_additional
|
|
|
- elif line != "\n":
|
|
|
- self.additional_section.append(line)
|
|
|
+ if not self._check_next_header(line):
|
|
|
+ if line != "\n":
|
|
|
+ self.authority_section.append(line.strip())
|
|
|
|
|
|
def parse_authority(self, line):
|
|
|
- if line.startswith(";; Query time"):
|
|
|
- self.line_handler = self.parse_footer
|
|
|
- elif line != "\n":
|
|
|
- self.additional_section.append(line)
|
|
|
+ if not self._check_next_header(line):
|
|
|
+ if line != "\n":
|
|
|
+ self.additional_section.append(line.strip())
|
|
|
|
|
|
def parse_footer(self, line):
|
|
|
pass
|
|
@@ -133,10 +144,31 @@ def query_soa(step, query_name, serial):
|
|
|
assert serial == soa_parts[6],\
|
|
|
"Got SOA serial " + soa_parts[6] + ", expected " + serial
|
|
|
|
|
|
-@step('last query should have (\S+) (.+)')
|
|
|
+@step('last query response should have (\S+) (.+)')
|
|
|
def check_last_query(step, item, value):
|
|
|
assert world.last_query_result is not None
|
|
|
assert item in world.last_query_result.__dict__
|
|
|
lq_val = world.last_query_result.__dict__[item]
|
|
|
assert str(value) == str(lq_val),\
|
|
|
"Got: " + str(lq_val) + ", expected: " + str(value)
|
|
|
+
|
|
|
+@step('([a-zA-Z]+) section of the last query response should be')
|
|
|
+def check_last_query_section(step, section):
|
|
|
+ response_string = None
|
|
|
+ if section.lower() == 'question':
|
|
|
+ response_string = "\n".join(world.last_query_result.question_section)
|
|
|
+ elif section.lower() == 'answer':
|
|
|
+ response_string = "\n".join(world.last_query_result.answer_section)
|
|
|
+ elif section.lower() == 'authority':
|
|
|
+ response_string = "\n".join(world.last_query_result.answer_section)
|
|
|
+ elif section.lower() == 'additional':
|
|
|
+ response_string = "\n".join(world.last_query_result.answer_section)
|
|
|
+ else:
|
|
|
+ assert False, "Unknown section " + section
|
|
|
+ # replace whitespace of any length by one space
|
|
|
+ response_string = re.sub("[ \t]+", " ", response_string)
|
|
|
+ expect = re.sub("[ \t]+", " ", step.multiline)
|
|
|
+ assert response_string.strip() == expect.strip(),\
|
|
|
+ "Got:\n'" + response_string + "'\nExpected:\n'" + step.multiline +"'"
|
|
|
+
|
|
|
+
|