#!@PYTHON@ # Copyright (C) 2010 Internet Systems Consortium. # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. import configparser, re, time, socket, sys from datetime import datetime from optparse import OptionParser re_hex = re.compile(r'^0x[0-9a-fA-F]+') re_decimal = re.compile(r'^\d+$') re_string = re.compile(r"\'(.*)\'$") dnssec_timefmt = '%Y%m%d%H%M%S' dict_qr = { 'query' : 0, 'response' : 1 } dict_opcode = { 'query' : 0, 'iquery' : 1, 'status' : 2, 'notify' : 4, 'update' : 5 } rdict_opcode = dict([(dict_opcode[k], k.upper()) for k in dict_opcode.keys()]) dict_rcode = { 'noerror' : 0, 'formerr' : 1, 'servfail' : 2, 'nxdomain' : 3, 'notimp' : 4, 'refused' : 5, 'yxdomain' : 6, 'yxrrset' : 7, 'nxrrset' : 8, 'notauth' : 9, 'notzone' : 10 } rdict_rcode = dict([(dict_rcode[k], k.upper()) for k in dict_rcode.keys()]) dict_rrtype = { 'none' : 0, 'a' : 1, 'ns' : 2, 'md' : 3, 'mf' : 4, 'cname' : 5, 'soa' : 6, 'mb' : 7, 'mg' : 8, 'mr' : 9, 'null' : 10, 'wks' : 11, 'ptr' : 12, 'hinfo' : 13, 'minfo' : 14, 'mx' : 15, 'txt' : 16, 'rp' : 17, 'afsdb' : 18, 'x25' : 19, 'isdn' : 20, 'rt' : 21, 'nsap' : 22, 'nsap_tr' : 23, 'sig' : 24, 'key' : 25, 'px' : 26, 'gpos' : 27, 'aaaa' : 28, 'loc' : 29, 'nxt' : 30, 'srv' : 33, 'naptr' : 35, 'kx' : 36, 'cert' : 37, 'a6' : 38, 'dname' : 39, 'opt' : 41, 'apl' : 42, 'ds' : 43, 'sshfp' : 44, 'ipseckey' : 45, 'rrsig' : 46, 'nsec' : 47, 'dnskey' : 48, 'dhcid' : 49, 'nsec3' : 50, 'nsec3param' : 51, 'hip' : 55, 'spf' : 99, 'unspec' : 103, 'tkey' : 249, 'tsig' : 250, 'dlv' : 32769, 'ixfr' : 251, 'axfr' : 252, 'mailb' : 253, 'maila' : 254, 'any' : 255 } rdict_rrtype = dict([(dict_rrtype[k], k.upper()) for k in dict_rrtype.keys()]) dict_rrclass = { 'in' : 1, 'ch' : 3, 'hs' : 4, 'any' : 255 } rdict_rrclass = dict([(dict_rrclass[k], k.upper()) for k in \ dict_rrclass.keys()]) dict_algorithm = { 'rsamd5' : 1, 'dh' : 2, 'dsa' : 3, 'ecc' : 4, 'rsasha1' : 5 } dict_nsec3_algorithm = { 'reserved' : 0, 'sha1' : 1 } rdict_algorithm = dict([(dict_algorithm[k], k.upper()) for k in \ dict_algorithm.keys()]) rdict_nsec3_algorithm = dict([(dict_nsec3_algorithm[k], k.upper()) for k in \ dict_nsec3_algorithm.keys()]) header_xtables = { 'qr' : dict_qr, 'opcode' : dict_opcode, 'rcode' : dict_rcode } question_xtables = { 'rrtype' : dict_rrtype, 'rrclass' : dict_rrclass } rrsig_xtables = { 'algorithm' : dict_algorithm } def parse_value(value, xtable = {}): if re.search(re_hex, value): return int(value, 16) if re.search(re_decimal, value): return int(value) m = re.match(re_string, value) if m: return m.group(1) lovalue = value.lower() if lovalue in xtable: return xtable[lovalue] return value def code_totext(code, dict): if code in dict.keys(): return dict[code] + '(' + str(code) + ')' return str(code) def encode_name(name, absolute=True): # make sure the name is dot-terminated. duplicate dots will be ignored # below. name += '.' labels = name.split('.') wire = '' for l in labels: if len(l) > 4 and l[0:4] == 'ptr=': # special meta-syntax for compression pointer wire += '%04x' % (0xc000 | int(l[4:])) break if absolute or len(l) > 0: wire += '%02x' % len(l) wire += ''.join(['%02x' % ord(ch) for ch in l]) if len(l) == 0: break return wire def encode_string(name, len=None): if type(name) is int and len is not None: return '%0.*x' % (len * 2, name) return ''.join(['%02x' % ord(ch) for ch in name]) def count_namelabels(name): if name == '.': # special case return 0 m = re.match('^(.*)\.$', name) if m: name = m.group(1) return len(name.split('.')) def get_config(config, section, configobj, xtables = {}): try: for field in config.options(section): value = config.get(section, field) if field in xtables.keys(): xtable = xtables[field] else: xtable = {} configobj.__dict__[field] = parse_value(value, xtable) except configparser.NoSectionError: return False return True def print_header(f, input_file): f.write('''### ### This data file was auto-generated from ''' + input_file + ''' ### ''') class Name: name = 'example.com' pointer = None # no compression by default def dump(self, f): name = self.name if self.pointer is not None: if len(name) > 0 and name[-1] != '.': name += '.' name += 'ptr=%d' % self.pointer name_wire = encode_name(name) f.write('\n# DNS Name: %s' % self.name) if self.pointer is not None: f.write(' + compression pointer: %d' % self.pointer) f.write('\n') f.write('%s' % name_wire) f.write('\n') class DNSHeader: id = 0x1035 (qr, aa, tc, rd, ra, ad, cd) = 0, 0, 0, 0, 0, 0, 0 mbz = 0 rcode = 0 # noerror opcode = 0 # query (qdcount, ancount, nscount, arcount) = 1, 0, 0, 0 def dump(self, f): f.write('\n# Header Section\n') f.write('# ID=' + str(self.id)) f.write(' QR=' + ('Response' if self.qr else 'Query')) f.write(' Opcode=' + code_totext(self.opcode, rdict_opcode)) f.write(' Rcode=' + code_totext(self.rcode, rdict_rcode)) f.write('%s' % (' AA' if self.aa else '')) f.write('%s' % (' TC' if self.tc else '')) f.write('%s' % (' RD' if self.rd else '')) f.write('%s' % (' AD' if self.ad else '')) f.write('%s' % (' CD' if self.cd else '')) f.write('\n') f.write('%04x ' % self.id) flag_and_code = 0 flag_and_code |= (self.qr << 15 | self.opcode << 14 | self.aa << 10 | self.tc << 9 | self.rd << 8 | self.ra << 7 | self.mbz << 6 | self.ad << 5 | self.cd << 4 | self.rcode) f.write('%04x\n' % flag_and_code) f.write('# QDCNT=%d, ANCNT=%d, NSCNT=%d, ARCNT=%d\n' % (self.qdcount, self.ancount, self.nscount, self.arcount)) f.write('%04x %04x %04x %04x\n' % (self.qdcount, self.ancount, self.nscount, self.arcount)) class DNSQuestion: name = 'example.com.' rrtype = parse_value('A', dict_rrtype) rrclass = parse_value('IN', dict_rrclass) def dump(self, f): f.write('\n# Question Section\n') f.write('# QNAME=%s QTYPE=%s QCLASS=%s\n' % (self.name, code_totext(self.rrtype, rdict_rrtype), code_totext(self.rrclass, rdict_rrclass))) f.write(encode_name(self.name)) f.write(' %04x %04x\n' % (self.rrtype, self.rrclass)) class EDNS: name = '.' udpsize = 4096 extrcode = 0 version = 0 do = 0 mbz = 0 rdlen = 0 def dump(self, f): f.write('\n# EDNS OPT RR\n') f.write('# NAME=%s TYPE=%s UDPSize=%d ExtRcode=%s Version=%s DO=%d\n' % (self.name, code_totext(dict_rrtype['opt'], rdict_rrtype), self.udpsize, self.extrcode, self.version, 1 if self.do else 0)) code_vers = (self.extrcode << 8) | (self.version & 0x00ff) extflags = (self.do << 15) | (self.mbz & 0x8000) f.write('%s %04x %04x %04x %04x\n' % (encode_name(self.name), dict_rrtype['opt'], self.udpsize, code_vers, extflags)) f.write('# RDLEN=%d\n' % self.rdlen) f.write('%04x\n' % self.rdlen) class RR: '''This is a base class for various types of RR test data. For each RR type (A, AAAA, NS, etc), we define a derived class of RR to dump type specific RDATA parameters. This class defines parameters common to all types of RDATA, namely the owner name, RR class and TTL. The dump() method of derived classes are expected to call dump_header(), whose default implementation is provided in this class. This method decides whether to dump the test data as an RR (with name, type, class) or only as RDATA (with its length), and dumps the corresponding data via the specified file object. By convention we assume derived classes are named after the common standard mnemonic of the corresponding RR types. For example, the derived class for the RR type SOA should be named "SOA". Configurable parameters are as follows: - as_rr (bool): Whether or not the data is to be dumped as an RR. False by default. - rr_class (string): The RR class of the data. Only meaningful when the data is dumped as an RR. Default is 'IN'. - rr_ttl (integer): The TTL value of the RR. Only meaningful when the data is dumped as an RR. Default is 86400 (1 day). ''' def __init__(self): self.as_rr = False # only when as_rr is True, same for class/TTL: self.rr_name = 'example.com' self.rr_class = 'IN' self.rr_ttl = 86400 def dump_header(self, f, rdlen): type_txt = self.__class__.__name__ type_code = parse_value(type_txt, dict_rrtype) if self.as_rr: rrclass = parse_value(self.rr_class, dict_rrclass) f.write('\n# %s RR (QNAME=%s Class=%s TTL=%d RDLEN=%d)\n' % (type_txt, self.rr_name, code_totext(rrclass, rdict_rrclass), self.rr_ttl, rdlen)) f.write('%s %04x %04x %08x %04x\n' % (encode_name(self.rr_name), type_code, rrclass, self.rr_ttl, rdlen)) else: f.write('\n# %s RDATA (RDLEN=%d)\n' % (type_txt, rdlen)) f.write('%04x\n' % rdlen) class A(RR): rdlen = 4 # fixed by default address = '192.0.2.1' def dump(self, f): self.dump_header(f, self.rdlen) f.write('# Address=%s\n' % (self.address)) bin_address = socket.inet_aton(self.address) f.write('%02x%02x%02x%02x\n' % (bin_address[0], bin_address[1], bin_address[2], bin_address[3])) class NS(RR): rdlen = None # auto calculate nsname = 'ns.example.com' def dump(self, f): nsname_wire = encode_name(self.nsname) if self.rdlen is None: self.rdlen = len(nsname_wire) / 2 self.dump_header(f, self.rdlen) f.write('# NS name=%s\n' % (self.nsname)) f.write('%s\n' % nsname_wire) class SOA: # this currently doesn't support name compression within the RDATA. rdlen = -1 # auto-calculate mname = 'ns.example.com' rname = 'root.example.com' serial = 2010012601 refresh = 3600 retry = 300 expire = 3600000 minimum = 1200 def dump(self, f): mname_wire = encode_name(self.mname) rname_wire = encode_name(self.rname) rdlen = self.rdlen if rdlen < 0: rdlen = int(20 + len(mname_wire) / 2 + len(str(rname_wire)) / 2) f.write('\n# SOA RDATA (RDLEN=%d)\n' % rdlen) f.write('%04x\n' % rdlen); f.write('# NNAME=%s RNAME=%s\n' % (self.mname, self.rname)) f.write('%s %s\n' % (mname_wire, rname_wire)) f.write('# SERIAL(%d) REFRESH(%d) RETRY(%d) EXPIRE(%d) MINIMUM(%d)\n' % (self.serial, self.refresh, self.retry, self.expire, self.minimum)) f.write('%08x %08x %08x %08x %08x\n' % (self.serial, self.refresh, self.retry, self.expire, self.minimum)) class TXT: rdlen = -1 # auto-calculate nstring = 1 # number of character-strings stringlen = -1 # default string length, auto-calculate string = 'Test String' # default string def dump(self, f): stringlen_list = [] string_list = [] wirestring_list = [] for i in range(0, self.nstring): key_string = 'string' + str(i) if key_string in self.__dict__: string_list.append(self.__dict__[key_string]) else: string_list.append(self.string) wirestring_list.append(encode_string(string_list[-1])) key_stringlen = 'stringlen' + str(i) if key_stringlen in self.__dict__: stringlen_list.append(self.__dict__[key_stringlen]) else: stringlen_list.append(self.stringlen) if stringlen_list[-1] < 0: stringlen_list[-1] = int(len(wirestring_list[-1]) / 2) rdlen = self.rdlen if rdlen < 0: rdlen = int(len(''.join(wirestring_list)) / 2) + self.nstring f.write('\n# TXT RDATA (RDLEN=%d)\n' % rdlen) f.write('%04x\n' % rdlen); for i in range(0, self.nstring): f.write('# String Len=%d, String=\"%s\"\n' % (stringlen_list[i], string_list[i])) f.write('%02x%s%s\n' % (stringlen_list[i], ' ' if len(wirestring_list[i]) > 0 else '', wirestring_list[i])) class RP: '''Implements rendering RP RDATA in the wire format. Configurable parameters are as follows: - rdlen: 16-bit RDATA length. If omitted, the accurate value is auto calculated and used; if negative, the RDLEN field will be omitted from the output data. - mailbox: The mailbox field. - text: The text field. All of these parameters have the default values and can be omitted. ''' rdlen = None # auto-calculate mailbox = 'root.example.com' text = 'rp-text.example.com' def dump(self, f): mailbox_wire = encode_name(self.mailbox) text_wire = encode_name(self.text) if self.rdlen is None: self.rdlen = (len(mailbox_wire) + len(text_wire)) / 2 else: self.rdlen = int(self.rdlen) if self.rdlen >= 0: f.write('\n# RP RDATA (RDLEN=%d)\n' % self.rdlen) f.write('%04x\n' % self.rdlen) else: f.write('\n# RP RDATA (RDLEN omitted)\n') f.write('# MAILBOX=%s TEXT=%s\n' % (self.mailbox, self.text)) f.write('%s %s\n' % (mailbox_wire, text_wire)) class NSECBASE: '''Implements rendering NSEC/NSEC3 type bitmaps commonly used for these RRs. The NSEC and NSEC3 classes will be inherited from this class.''' nbitmap = 1 # number of bitmaps block = 0 maplen = None # default bitmap length, auto-calculate bitmap = '040000000003' # an arbtrarily chosen bitmap sample def dump(self, f): # first, construct the bitmpa data block_list = [] maplen_list = [] bitmap_list = [] for i in range(0, self.nbitmap): key_bitmap = 'bitmap' + str(i) if key_bitmap in self.__dict__: bitmap_list.append(self.__dict__[key_bitmap]) else: bitmap_list.append(self.bitmap) key_maplen = 'maplen' + str(i) if key_maplen in self.__dict__: maplen_list.append(self.__dict__[key_maplen]) else: maplen_list.append(self.maplen) if maplen_list[-1] is None: # calculate it if not specified maplen_list[-1] = int(len(bitmap_list[-1]) / 2) key_block = 'block' + str(i) if key_block in self.__dict__: block_list.append(self.__dict__[key_block]) else: block_list.append(self.block) # dump RR-type specific part (NSEC or NSEC3) self.dump_fixedpart(f, 2 * self.nbitmap + \ int(len(''.join(bitmap_list)) / 2)) # dump the bitmap for i in range(0, self.nbitmap): f.write('# Bitmap: Block=%d, Length=%d\n' % (block_list[i], maplen_list[i])) f.write('%02x %02x %s\n' % (block_list[i], maplen_list[i], bitmap_list[i])) class NSEC(NSECBASE): rdlen = None # auto-calculate nextname = 'next.example.com' def dump_fixedpart(self, f, bitmap_totallen): name_wire = encode_name(self.nextname) if self.rdlen is None: # if rdlen needs to be calculated, it must be based on the bitmap # length, because the configured maplen can be fake. self.rdlen = int(len(name_wire) / 2) + bitmap_totallen f.write('\n# NSEC RDATA (RDLEN=%d)\n' % self.rdlen) f.write('%04x\n' % self.rdlen); f.write('# Next Name=%s (%d bytes)\n' % (self.nextname, int(len(name_wire) / 2))) f.write('%s\n' % name_wire) class NSEC3(NSECBASE): rdlen = None # auto-calculate hashalg = 1 # SHA-1 optout = False # opt-out flag mbz = 0 # other flag fields (none defined yet) iterations = 1 saltlen = 5 salt = 's' * saltlen hashlen = 20 hash = 'h' * hashlen def dump_fixedpart(self, f, bitmap_totallen): if self.rdlen is None: # if rdlen needs to be calculated, it must be based on the bitmap # length, because the configured maplen can be fake. self.rdlen = 4 + 1 + len(self.salt) + 1 + len(self.hash) \ + bitmap_totallen f.write('\n# NSEC3 RDATA (RDLEN=%d)\n' % self.rdlen) f.write('%04x\n' % self.rdlen) optout_val = 1 if self.optout else 0 f.write('# Hash Alg=%s, Opt-Out=%d, Other Flags=%0x, Iterations=%d\n' % (code_totext(self.hashalg, rdict_nsec3_algorithm), optout_val, self.mbz, self.iterations)) f.write('%02x %02x %04x\n' % (self.hashalg, (self.mbz << 1) | optout_val, self.iterations)) f.write("# Salt Len=%d, Salt='%s'\n" % (self.saltlen, self.salt)) f.write('%02x%s%s\n' % (self.saltlen, ' ' if len(self.salt) > 0 else '', encode_string(self.salt))) f.write("# Hash Len=%d, Hash='%s'\n" % (self.hashlen, self.hash)) f.write('%02x%s%s\n' % (self.hashlen, ' ' if len(self.hash) > 0 else '', encode_string(self.hash))) class RRSIG: rdlen = -1 # auto-calculate covered = 1 # A algorithm = 5 # RSA-SHA1 labels = -1 # auto-calculate (#labels of signer) originalttl = 3600 expiration = int(time.mktime(datetime.strptime('20100131120000', dnssec_timefmt).timetuple())) inception = int(time.mktime(datetime.strptime('20100101120000', dnssec_timefmt).timetuple())) tag = 0x1035 signer = 'example.com' signature = 0x123456789abcdef123456789abcdef def dump(self, f): name_wire = encode_name(self.signer) sig_wire = '%x' % self.signature rdlen = self.rdlen if rdlen < 0: rdlen = int(18 + len(name_wire) / 2 + len(str(sig_wire)) / 2) labels = self.labels if labels < 0: labels = count_namelabels(self.signer) f.write('\n# RRSIG RDATA (RDLEN=%d)\n' % rdlen) f.write('%04x\n' % rdlen); f.write('# Covered=%s Algorithm=%s Labels=%d OrigTTL=%d\n' % (code_totext(self.covered, rdict_rrtype), code_totext(self.algorithm, rdict_algorithm), labels, self.originalttl)) f.write('%04x %02x %02x %08x\n' % (self.covered, self.algorithm, labels, self.originalttl)) f.write('# Expiration=%s, Inception=%s\n' % (str(self.expiration), str(self.inception))) f.write('%08x %08x\n' % (self.expiration, self.inception)) f.write('# Tag=%d Signer=%s and Signature\n' % (self.tag, self.signer)) f.write('%04x %s %s\n' % (self.tag, name_wire, sig_wire)) class TSIG(RR): rdlen = None # auto-calculate algorithm = 'hmac-sha256' time_signed = 1286978795 # arbitrarily chosen default fudge = 300 mac_size = None # use a common value for the algorithm mac = None # use 'x' * mac_size original_id = 2845 # arbitrarily chosen default error = 0 other_len = None # 6 if error is BADTIME; otherwise 0 other_data = None # use time_signed + fudge + 1 for BADTIME dict_macsize = { 'hmac-md5' : 16, 'hmac-sha1' : 20, 'hmac-sha256' : 32 } # TSIG has some special defaults def __init__(self): self.rr_class = 'ANY' self.rr_ttl = 0 def dump(self, f): if str(self.algorithm) == 'hmac-md5': name_wire = encode_name('hmac-md5.sig-alg.reg.int') else: name_wire = encode_name(self.algorithm) mac_size = self.mac_size if mac_size is None: if self.algorithm in self.dict_macsize.keys(): mac_size = self.dict_macsize[self.algorithm] else: raise RuntimeError('TSIG Mac Size cannot be determined') mac = encode_string('x' * mac_size) if self.mac is None else \ encode_string(self.mac, mac_size) other_len = self.other_len if other_len is None: # 18 = BADTIME other_len = 6 if self.error == 18 else 0 other_data = self.other_data if other_data is None: other_data = '%012x' % (self.time_signed + self.fudge + 1) \ if self.error == 18 else '' else: other_data = encode_string(self.other_data, other_len) if self.rdlen is None: self.rdlen = int(len(name_wire) / 2 + 16 + len(mac) / 2 + \ len(other_data) / 2) self.dump_header(f, self.rdlen) f.write('# Algorithm=%s Time-Signed=%d Fudge=%d\n' % (self.algorithm, self.time_signed, self.fudge)) f.write('%s %012x %04x\n' % (name_wire, self.time_signed, self.fudge)) f.write('# MAC Size=%d MAC=(see hex)\n' % mac_size) f.write('%04x%s\n' % (mac_size, ' ' + mac if len(mac) > 0 else '')) f.write('# Original-ID=%d Error=%d\n' % (self.original_id, self.error)) f.write('%04x %04x\n' % (self.original_id, self.error)) f.write('# Other-Len=%d Other-Data=(see hex)\n' % other_len) f.write('%04x%s\n' % (other_len, ' ' + other_data if len(other_data) > 0 else '')) def get_config_param(section): config_param = {'name' : (Name, {}), 'header' : (DNSHeader, header_xtables), 'question' : (DNSQuestion, question_xtables), 'edns' : (EDNS, {}), 'a' : (A, {}), 'ns' : (NS, {}), 'soa' : (SOA, {}), 'txt' : (TXT, {}), 'rp' : (RP, {}), 'rrsig' : (RRSIG, {}), 'nsec' : (NSEC, {}), 'nsec3' : (NSEC3, {}), 'tsig' : (TSIG, {}) } s = section m = re.match('^([^:]+)/\d+$', section) if m: s = m.group(1) return config_param[s] usage = '''usage: %prog [options] input_file''' if __name__ == "__main__": parser = OptionParser(usage=usage) parser.add_option('-o', '--output', action='store', dest='output', default=None, metavar='FILE', help='output file name [default: prefix of input_file]') (options, args) = parser.parse_args() if len(args) == 0: parser.error('input file is missing') configfile = args[0] outputfile = options.output if not outputfile: m = re.match('(.*)\.[^.]+$', configfile) if m: outputfile = m.group(1) else: raise ValueError('output file is not specified and input file is not in the form of "output_file.suffix"') config = configparser.SafeConfigParser() config.read(configfile) output = open(outputfile, 'w') print_header(output, configfile) # First try the 'custom' mode; if it fails assume the standard mode. try: sections = config.get('custom', 'sections').split(':') except configparser.NoSectionError: sections = ['header', 'question', 'edns'] for s in sections: section_param = get_config_param(s) (obj, xtables) = (section_param[0](), section_param[1]) if get_config(config, s, obj, xtables): obj.dump(output) output.close()