#!@PYTHON@ # Copyright (C) 2010 Internet Systems Consortium. # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. import configparser, re, time, sys from datetime import datetime from optparse import OptionParser re_hex = re.compile(r'^0x[0-9a-fA-F]+') re_decimal = re.compile(r'^\d+$') re_string = re.compile(r"\'(.*)\'$") dnssec_timefmt = '%Y%m%d%H%M%S' dict_qr = { 'query' : 0, 'response' : 1 } dict_opcode = { 'query' : 0, 'iquery' : 1, 'status' : 2, 'notify' : 4, 'update' : 5 } rdict_opcode = dict([(dict_opcode[k], k.upper()) for k in dict_opcode.keys()]) dict_rcode = { 'noerror' : 0, 'formerr' : 1, 'servfail' : 2, 'nxdomain' : 3, 'notimp' : 4, 'refused' : 5, 'yxdomain' : 6, 'yxrrset' : 7, 'nxrrset' : 8, 'notauth' : 9, 'notzone' : 10 } rdict_rcode = dict([(dict_rcode[k], k.upper()) for k in dict_rcode.keys()]) dict_rrtype = { 'none' : 0, 'a' : 1, 'ns' : 2, 'md' : 3, 'mf' : 4, 'cname' : 5, 'soa' : 6, 'mb' : 7, 'mg' : 8, 'mr' : 9, 'null' : 10, 'wks' : 11, 'ptr' : 12, 'hinfo' : 13, 'minfo' : 14, 'mx' : 15, 'txt' : 16, 'rp' : 17, 'afsdb' : 18, 'x25' : 19, 'isdn' : 20, 'rt' : 21, 'nsap' : 22, 'nsap_tr' : 23, 'sig' : 24, 'key' : 25, 'px' : 26, 'gpos' : 27, 'aaaa' : 28, 'loc' : 29, 'nxt' : 30, 'srv' : 33, 'naptr' : 35, 'kx' : 36, 'cert' : 37, 'a6' : 38, 'dname' : 39, 'opt' : 41, 'apl' : 42, 'ds' : 43, 'sshfp' : 44, 'ipseckey' : 45, 'rrsig' : 46, 'nsec' : 47, 'dnskey' : 48, 'dhcid' : 49, 'nsec3' : 50, 'nsec3param' : 51, 'hip' : 55, 'spf' : 99, 'unspec' : 103, 'tkey' : 249, 'tsig' : 250, 'dlv' : 32769, 'ixfr' : 251, 'axfr' : 252, 'mailb' : 253, 'maila' : 254, 'any' : 255 } rdict_rrtype = dict([(dict_rrtype[k], k.upper()) for k in dict_rrtype.keys()]) dict_rrclass = { 'in' : 1, 'ch' : 3, 'hs' : 4, 'any' : 255 } rdict_rrclass = dict([(dict_rrclass[k], k.upper()) for k in \ dict_rrclass.keys()]) dict_algorithm = { 'rsamd5' : 1, 'dh' : 2, 'dsa' : 3, 'ecc' : 4, 'rsasha1' : 5 } dict_nsec3_algorithm = { 'reserved' : 0, 'sha1' : 1 } rdict_algorithm = dict([(dict_algorithm[k], k.upper()) for k in \ dict_algorithm.keys()]) rdict_nsec3_algorithm = dict([(dict_nsec3_algorithm[k], k.upper()) for k in \ dict_nsec3_algorithm.keys()]) header_xtables = { 'qr' : dict_qr, 'opcode' : dict_opcode, 'rcode' : dict_rcode } question_xtables = { 'rrtype' : dict_rrtype, 'rrclass' : dict_rrclass } rrsig_xtables = { 'algorithm' : dict_algorithm } def parse_value(value, xtable = {}): if re.search(re_hex, value): return int(value, 16) if re.search(re_decimal, value): return int(value) m = re.match(re_string, value) if m: return m.group(1) lovalue = value.lower() if lovalue in xtable: return xtable[lovalue] return value def code_totext(code, dict): if code in dict.keys(): return dict[code] + '(' + str(code) + ')' return str(code) def encode_name(name, absolute=True): # make sure the name is dot-terminated. duplicate dots will be ignored # below. name += '.' labels = name.split('.') wire = '' for l in labels: if len(l) > 4 and l[0:4] == 'ptr=': # special meta-syntax for compression pointer wire += '%04x' % (0xc000 | int(l[4:])) break if absolute or len(l) > 0: wire += '%02x' % len(l) wire += ''.join(['%02x' % ord(ch) for ch in l]) if len(l) == 0: break return wire def encode_string(name, len=None): if type(name) is int and len is not None: return '%0.*x' % (len * 2, name) return ''.join(['%02x' % ord(ch) for ch in name]) def count_namelabels(name): if name == '.': # special case return 0 m = re.match('^(.*)\.$', name) if m: name = m.group(1) return len(name.split('.')) def get_config(config, section, configobj, xtables = {}): try: for field in config.options(section): value = config.get(section, field) if field in xtables.keys(): xtable = xtables[field] else: xtable = {} configobj.__dict__[field] = parse_value(value, xtable) except configparser.NoSectionError: return False return True def print_header(f, input_file): f.write('''### ### This data file was auto-generated from ''' + input_file + ''' ### ''') class Name: name = 'example.com' pointer = None # no compression by default def dump(self, f): name = self.name if self.pointer is not None: if len(name) > 0 and name[-1] != '.': name += '.' name += 'ptr=%d' % self.pointer name_wire = encode_name(name) f.write('\n# DNS Name: %s' % self.name) if self.pointer is not None: f.write(' + compression pointer: %d' % self.pointer) f.write('\n') f.write('%s' % name_wire) f.write('\n') class DNSHeader: id = 0x1035 (qr, aa, tc, rd, ra, ad, cd) = 0, 0, 0, 0, 0, 0, 0 mbz = 0 rcode = 0 # noerror opcode = 0 # query (qdcount, ancount, nscount, arcount) = 1, 0, 0, 0 def dump(self, f): f.write('\n# Header Section\n') f.write('# ID=' + str(self.id)) f.write(' QR=' + ('Response' if self.qr else 'Query')) f.write(' Opcode=' + code_totext(self.opcode, rdict_opcode)) f.write(' Rcode=' + code_totext(self.rcode, rdict_rcode)) f.write('%s' % (' AA' if self.aa else '')) f.write('%s' % (' TC' if self.tc else '')) f.write('%s' % (' RD' if self.rd else '')) f.write('%s' % (' AD' if self.ad else '')) f.write('%s' % (' CD' if self.cd else '')) f.write('\n') f.write('%04x ' % self.id) flag_and_code = 0 flag_and_code |= (self.qr << 15 | self.opcode << 14 | self.aa << 10 | self.tc << 9 | self.rd << 8 | self.ra << 7 | self.mbz << 6 | self.ad << 5 | self.cd << 4 | self.rcode) f.write('%04x\n' % flag_and_code) f.write('# QDCNT=%d, ANCNT=%d, NSCNT=%d, ARCNT=%d\n' % (self.qdcount, self.ancount, self.nscount, self.arcount)) f.write('%04x %04x %04x %04x\n' % (self.qdcount, self.ancount, self.nscount, self.arcount)) class DNSQuestion: name = 'example.com.' rrtype = parse_value('A', dict_rrtype) rrclass = parse_value('IN', dict_rrclass) def dump(self, f): f.write('\n# Question Section\n') f.write('# QNAME=%s QTYPE=%s QCLASS=%s\n' % (self.name, code_totext(self.rrtype, rdict_rrtype), code_totext(self.rrclass, rdict_rrclass))) f.write(encode_name(self.name)) f.write(' %04x %04x\n' % (self.rrtype, self.rrclass)) class EDNS: name = '.' udpsize = 4096 extrcode = 0 version = 0 do = 0 mbz = 0 rdlen = 0 def dump(self, f): f.write('\n# EDNS OPT RR\n') f.write('# NAME=%s TYPE=%s UDPSize=%d ExtRcode=%s Version=%s DO=%d\n' % (self.name, code_totext(dict_rrtype['opt'], rdict_rrtype), self.udpsize, self.extrcode, self.version, 1 if self.do else 0)) code_vers = (self.extrcode << 8) | (self.version & 0x00ff) extflags = (self.do << 15) | (self.mbz & 0x8000) f.write('%s %04x %04x %04x %04x\n' % (encode_name(self.name), dict_rrtype['opt'], self.udpsize, code_vers, extflags)) f.write('# RDLEN=%d\n' % self.rdlen) f.write('%04x\n' % self.rdlen) class SOA: # this currently doesn't support name compression within the RDATA. rdlen = -1 # auto-calculate mname = 'ns.example.com' rname = 'root.example.com' serial = 2010012601 refresh = 3600 retry = 300 expire = 3600000 minimum = 1200 def dump(self, f): mname_wire = encode_name(self.mname) rname_wire = encode_name(self.rname) rdlen = self.rdlen if rdlen < 0: rdlen = int(20 + len(mname_wire) / 2 + len(str(rname_wire)) / 2) f.write('\n# SOA RDATA (RDLEN=%d)\n' % rdlen) f.write('%04x\n' % rdlen); f.write('# NNAME=%s RNAME=%s\n' % (self.mname, self.rname)) f.write('%s %s\n' % (mname_wire, rname_wire)) f.write('# SERIAL(%d) REFRESH(%d) RETRY(%d) EXPIRE(%d) MINIMUM(%d)\n' % (self.serial, self.refresh, self.retry, self.expire, self.minimum)) f.write('%08x %08x %08x %08x %08x\n' % (self.serial, self.refresh, self.retry, self.expire, self.minimum)) class TXT: rdlen = -1 # auto-calculate nstring = 1 # number of character-strings stringlen = -1 # default string length, auto-calculate string = 'Test String' # default string def dump(self, f): stringlen_list = [] string_list = [] wirestring_list = [] for i in range(0, self.nstring): key_string = 'string' + str(i) if key_string in self.__dict__: string_list.append(self.__dict__[key_string]) else: string_list.append(self.string) wirestring_list.append(encode_string(string_list[-1])) key_stringlen = 'stringlen' + str(i) if key_stringlen in self.__dict__: stringlen_list.append(self.__dict__[key_stringlen]) else: stringlen_list.append(self.stringlen) if stringlen_list[-1] < 0: stringlen_list[-1] = int(len(wirestring_list[-1]) / 2) rdlen = self.rdlen if rdlen < 0: rdlen = int(len(''.join(wirestring_list)) / 2) + self.nstring f.write('\n# TXT RDATA (RDLEN=%d)\n' % rdlen) f.write('%04x\n' % rdlen); for i in range(0, self.nstring): f.write('# String Len=%d, String=\"%s\"\n' % (stringlen_list[i], string_list[i])) f.write('%02x%s%s\n' % (stringlen_list[i], ' ' if len(wirestring_list[i]) > 0 else '', wirestring_list[i])) class RP: '''Implements rendering RP RDATA in the wire format. Configurable parameters are as follows: - rdlen: 16-bit RDATA length. If omitted, the accurate value is auto calculated and used; if negative, the RDLEN field will be omitted from the output data. - mailbox: The mailbox field. - text: The text field. All of these parameters have the default values and can be omitted. ''' rdlen = None # auto-calculate mailbox = 'root.example.com' text = 'rp-text.example.com' def dump(self, f): mailbox_wire = encode_name(self.mailbox) text_wire = encode_name(self.text) if self.rdlen is None: self.rdlen = (len(mailbox_wire) + len(text_wire)) / 2 else: self.rdlen = int(self.rdlen) if self.rdlen >= 0: f.write('\n# RP RDATA (RDLEN=%d)\n' % self.rdlen) f.write('%04x\n' % self.rdlen) else: f.write('\n# RP RDATA (RDLEN omitted)\n') f.write('# MAILBOX=%s TEXT=%s\n' % (self.mailbox, self.text)) f.write('%s %s\n' % (mailbox_wire, text_wire)) class NSECBASE: '''Implements rendering NSEC/NSEC3 type bitmaps commonly used for these RRs. The NSEC and NSEC3 classes will be inherited from this class.''' nbitmap = 1 # number of bitmaps block = 0 maplen = None # default bitmap length, auto-calculate bitmap = '040000000003' # an arbtrarily chosen bitmap sample def dump(self, f): # first, construct the bitmpa data block_list = [] maplen_list = [] bitmap_list = [] for i in range(0, self.nbitmap): key_bitmap = 'bitmap' + str(i) if key_bitmap in self.__dict__: bitmap_list.append(self.__dict__[key_bitmap]) else: bitmap_list.append(self.bitmap) key_maplen = 'maplen' + str(i) if key_maplen in self.__dict__: maplen_list.append(self.__dict__[key_maplen]) else: maplen_list.append(self.maplen) if maplen_list[-1] is None: # calculate it if not specified maplen_list[-1] = int(len(bitmap_list[-1]) / 2) key_block = 'block' + str(i) if key_block in self.__dict__: block_list.append(self.__dict__[key_block]) else: block_list.append(self.block) # dump RR-type specific part (NSEC or NSEC3) self.dump_fixedpart(f, 2 * self.nbitmap + \ int(len(''.join(bitmap_list)) / 2)) # dump the bitmap for i in range(0, self.nbitmap): f.write('# Bitmap: Block=%d, Length=%d\n' % (block_list[i], maplen_list[i])) f.write('%02x %02x %s\n' % (block_list[i], maplen_list[i], bitmap_list[i])) class NSEC(NSECBASE): rdlen = None # auto-calculate nextname = 'next.example.com' def dump_fixedpart(self, f, bitmap_totallen): name_wire = encode_name(self.nextname) if self.rdlen is None: # if rdlen needs to be calculated, it must be based on the bitmap # length, because the configured maplen can be fake. self.rdlen = int(len(name_wire) / 2) + bitmap_totallen f.write('\n# NSEC RDATA (RDLEN=%d)\n' % self.rdlen) f.write('%04x\n' % self.rdlen); f.write('# Next Name=%s (%d bytes)\n' % (self.nextname, int(len(name_wire) / 2))) f.write('%s\n' % name_wire) class NSEC3(NSECBASE): rdlen = None # auto-calculate hashalg = 1 # SHA-1 optout = False # opt-out flag mbz = 0 # other flag fields (none defined yet) iterations = 1 saltlen = 5 salt = 's' * saltlen hashlen = 20 hash = 'h' * hashlen def dump_fixedpart(self, f, bitmap_totallen): if self.rdlen is None: # if rdlen needs to be calculated, it must be based on the bitmap # length, because the configured maplen can be fake. self.rdlen = 4 + 1 + len(self.salt) + 1 + len(self.hash) \ + bitmap_totallen f.write('\n# NSEC3 RDATA (RDLEN=%d)\n' % self.rdlen) f.write('%04x\n' % self.rdlen) optout_val = 1 if self.optout else 0 f.write('# Hash Alg=%s, Opt-Out=%d, Other Flags=%0x, Iterations=%d\n' % (code_totext(self.hashalg, rdict_nsec3_algorithm), optout_val, self.mbz, self.iterations)) f.write('%02x %02x %04x\n' % (self.hashalg, (self.mbz << 1) | optout_val, self.iterations)) f.write("# Salt Len=%d, Salt='%s'\n" % (self.saltlen, self.salt)) f.write('%02x%s%s\n' % (self.saltlen, ' ' if len(self.salt) > 0 else '', encode_string(self.salt))) f.write("# Hash Len=%d, Hash='%s'\n" % (self.hashlen, self.hash)) f.write('%02x%s%s\n' % (self.hashlen, ' ' if len(self.hash) > 0 else '', encode_string(self.hash))) class RRSIG: rdlen = -1 # auto-calculate covered = 1 # A algorithm = 5 # RSA-SHA1 labels = -1 # auto-calculate (#labels of signer) originalttl = 3600 expiration = int(time.mktime(datetime.strptime('20100131120000', dnssec_timefmt).timetuple())) inception = int(time.mktime(datetime.strptime('20100101120000', dnssec_timefmt).timetuple())) tag = 0x1035 signer = 'example.com' signature = 0x123456789abcdef123456789abcdef def dump(self, f): name_wire = encode_name(self.signer) sig_wire = '%x' % self.signature rdlen = self.rdlen if rdlen < 0: rdlen = int(18 + len(name_wire) / 2 + len(str(sig_wire)) / 2) labels = self.labels if labels < 0: labels = count_namelabels(self.signer) f.write('\n# RRSIG RDATA (RDLEN=%d)\n' % rdlen) f.write('%04x\n' % rdlen); f.write('# Covered=%s Algorithm=%s Labels=%d OrigTTL=%d\n' % (code_totext(self.covered, rdict_rrtype), code_totext(self.algorithm, rdict_algorithm), labels, self.originalttl)) f.write('%04x %02x %02x %08x\n' % (self.covered, self.algorithm, labels, self.originalttl)) f.write('# Expiration=%s, Inception=%s\n' % (str(self.expiration), str(self.inception))) f.write('%08x %08x\n' % (self.expiration, self.inception)) f.write('# Tag=%d Signer=%s and Signature\n' % (self.tag, self.signer)) f.write('%04x %s %s\n' % (self.tag, name_wire, sig_wire)) class TSIG: rdlen = None # auto-calculate algorithm = 'hmac-sha256' time_signed = 1286978795 # arbitrarily chosen default fudge = 300 mac_size = None # use a common value for the algorithm mac = None # use 'x' * mac_size original_id = 2845 # arbitrarily chosen default error = 0 other_len = None # 6 if error is BADTIME; otherwise 0 other_data = None # use time_signed + fudge + 1 for BADTIME dict_macsize = { 'hmac-md5' : 16, 'hmac-sha1' : 20, 'hmac-sha256' : 32 } def dump(self, f): if str(self.algorithm) == 'hmac-md5': name_wire = encode_name('hmac-md5.sig-alg.reg.int') else: name_wire = encode_name(self.algorithm) rdlen = self.rdlen mac_size = self.mac_size if mac_size is None: if self.algorithm in self.dict_macsize.keys(): mac_size = self.dict_macsize[self.algorithm] else: raise RuntimeError('TSIG Mac Size cannot be determined') mac = encode_string('x' * mac_size) if self.mac is None else \ encode_string(self.mac, mac_size) other_len = self.other_len if other_len is None: # 18 = BADTIME other_len = 6 if self.error == 18 else 0 other_data = self.other_data if other_data is None: other_data = '%012x' % (self.time_signed + self.fudge + 1) \ if self.error == 18 else '' else: other_data = encode_string(self.other_data, other_len) if rdlen is None: rdlen = int(len(name_wire) / 2 + 16 + len(mac) / 2 + \ len(other_data) / 2) f.write('\n# TSIG RDATA (RDLEN=%d)\n' % rdlen) f.write('%04x\n' % rdlen); f.write('# Algorithm=%s Time-Signed=%d Fudge=%d\n' % (self.algorithm, self.time_signed, self.fudge)) f.write('%s %012x %04x\n' % (name_wire, self.time_signed, self.fudge)) f.write('# MAC Size=%d MAC=(see hex)\n' % mac_size) f.write('%04x%s\n' % (mac_size, ' ' + mac if len(mac) > 0 else '')) f.write('# Original-ID=%d Error=%d\n' % (self.original_id, self.error)) f.write('%04x %04x\n' % (self.original_id, self.error)) f.write('# Other-Len=%d Other-Data=(see hex)\n' % other_len) f.write('%04x%s\n' % (other_len, ' ' + other_data if len(other_data) > 0 else '')) def get_config_param(section): config_param = {'name' : (Name, {}), 'header' : (DNSHeader, header_xtables), 'question' : (DNSQuestion, question_xtables), 'edns' : (EDNS, {}), 'soa' : (SOA, {}), 'txt' : (TXT, {}), 'rp' : (RP, {}), 'rrsig' : (RRSIG, {}), 'nsec' : (NSEC, {}), 'nsec3' : (NSEC3, {}), 'tsig' : (TSIG, {}) } s = section m = re.match('^([^:]+)/\d+$', section) if m: s = m.group(1) return config_param[s] usage = '''usage: %prog [options] input_file''' if __name__ == "__main__": parser = OptionParser(usage=usage) parser.add_option('-o', '--output', action='store', dest='output', default=None, metavar='FILE', help='output file name [default: prefix of input_file]') (options, args) = parser.parse_args() if len(args) == 0: parser.error('input file is missing') configfile = args[0] outputfile = options.output if not outputfile: m = re.match('(.*)\.[^.]+$', configfile) if m: outputfile = m.group(1) else: raise ValueError('output file is not specified and input file is not in the form of "output_file.suffix"') config = configparser.SafeConfigParser() config.read(configfile) output = open(outputfile, 'w') print_header(output, configfile) # First try the 'custom' mode; if it fails assume the standard mode. try: sections = config.get('custom', 'sections').split(':') except configparser.NoSectionError: sections = ['header', 'question', 'edns'] for s in sections: section_param = get_config_param(s) (obj, xtables) = (section_param[0](), section_param[1]) if get_config(config, s, obj, xtables): obj.dump(output) output.close()