message_test.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. from bind10_dns import *
  2. from struct import *
  3. id = ["10","35"]
  4. flags = ["85", "00"]
  5. sections = ["00","01","00","02","00","00","00","00"]
  6. query_raw_name = ["04","74","65","73","74","07","65","78","61","6d","70","6c","65","03","63","6f","6d","00"]
  7. query_type_class = ["00","01","00","01"]
  8. answer_compress = ["c0","0c"]
  9. answer1_ttl_type_class_rdlen_rdata = ["00","01","00","01","00","00","0e","10","00","04","c0","00","02","01"]
  10. answer2_ttl_type_class_rdlen_rdata = ["00","01","00","01","00","00","1c","20","00","04","c0","00","02","02"]
  11. bytes = pack("B" * len(query_raw_name), *[int(i,16) for i in query_raw_name])
  12. query_name = name(input_buffer(bytes))
  13. print("query name is ", query_name.to_text())
  14. message_raw_data = id + flags + sections + query_raw_name + query_type_class + answer_compress + answer1_ttl_type_class_rdlen_rdata + answer_compress + \
  15. answer2_ttl_type_class_rdlen_rdata
  16. m = message(message_mode.PARSE)
  17. message_bytes = pack("B" * len(message_raw_data), *[int(i,16) for i in message_raw_data])
  18. m.from_wire(input_buffer(message_bytes))
  19. if m.get_qid() == int("1035", 16):
  20. print("id is correct")
  21. if m.get_opcode() == op_code.QUERY():
  22. print("opcode is correct")
  23. if m.get_rcode() == rcode.NOERROR():
  24. print("rcode is correct")
  25. if m.get_header_flag(message_flag.QR()):
  26. print("qr is correct")
  27. if m.get_header_flag(message_flag.RD()):
  28. print("rd is correct")
  29. if m.get_header_flag(message_flag.AA()):
  30. print("aa is correct")
  31. if m.get_rr_count(section.QUESTION()) == 1:
  32. print("qustion rr count is correct")
  33. if m.get_rr_count(section.ANSWER()) == 2:
  34. print("answer rr count is ok")
  35. if m.get_rr_count(section.AUTHORITY()) == 0 and m.get_rr_count(section.ADDITIONAL()) == 0:
  36. print("authority and additional rr count is ok")
  37. question_iter = question_iter(m)
  38. question = question_iter.get_question()
  39. print("question name is ", question.get_name().to_text())
  40. if question.get_type() == rr_type.A():
  41. print("question rr type is A")
  42. if question.get_class() == rr_class.IN():
  43. print("question rr class is IN")
  44. answer_rrset_iter = section_iter(m, section.ANSWER())
  45. answer = answer_rrset_iter.get_rrset()
  46. print("answer part name is ", answer.get_name().to_text())
  47. if answer.get_type() == rr_type.A() and answer.get_class() == rr_class.IN():
  48. print("answer part is A record and class IN")
  49. rdata_iter = answer.get_rdata_iterator()
  50. rdata_iter.first()
  51. print("first part of rdata is ", rdata_iter.get_current().to_text())
  52. rdata_iter.next()
  53. print("second part of rdata is", rdata_iter.get_current().to_text())
  54. rdata_iter.next()
  55. if rdata_iter.is_last():
  56. print("answer part has two rr")