#!/usr/bin/env python3 import io import sys import json import time from netaddr import IPNetwork, IPSet from utils import read_json, write_json from registry import Inetnum, AutNum DBFILE = "/srv/http/dn42/tower-bird.json" REGISTRY = "/home/zorun/net.dn42.registry" HTMLOUT = "/srv/http/dn42/lastseen/index.html" # Where the data comes from ASN = 76142 #TIMEFMT = '%F %H:%M' TIMEFMT = '%c UTC' DN42 = IPSet(["172.22.0.0/15"]) def prefix_components(prefix): """Can be used as a key for sorting collections of prefixes""" ip = prefix.split("/")[0] cidr = prefix.split("/")[1] return tuple(int(part) for part in ip.split('.')) + (cidr,) class LastSeen(): # Database of all previously seen prefixes, with dates prefixes_db = dict() # Prefixes currently announced current_prefixes = list() # From the registry autnums = dict() inetnums = dict() # Optimised version for inclusion testing networks = list() def __init__(self, db_filename): self.prefixes_db = read_json(db_filename) self.current_prefixes = [prefix for prefix in self.prefixes_db if self.prefixes_db[prefix]["current"]] # Registry self.inetnums = Inetnum(REGISTRY).data self.autnums = AutNum(REGISTRY).data # Precompute this self.networks = [IPNetwork(net) for net in self.inetnums] def stats(self): known = IPSet(self.prefixes_db) current = IPSet(self.current_prefixes) ratio_known = float(known.size) / float(DN42.size) ratio_current = float(current.size) / float(DN42.size) return {"known": ratio_known, "active": ratio_current} def whois(self, prefix): """Returns the name associated to a prefix, according to the registry. We look for the most specific prefix containing the argument. """ prefix = IPNetwork(prefix) relevant_nets = [net for net in self.networks if prefix in net] if relevant_nets: final_net = str(max(relevant_nets, key=(lambda p: p.prefixlen))) if "netname" in self.inetnums[final_net]: netname = self.inetnums[final_net]["netname"][0] #print("{} -> {} [{}]".format(prefix, final_net, netname)) return netname else: return None else: #print("No whois for {}".format(prefix)) return None def as_name(self, asn): """Returns a tuple (as-name, descr), any of which can be the empty string, or None if the AS is not found in the registry. """ if isinstance(asn, int): query = 'AS' + str(asn) elif isinstance(asn, str): asn = asn.upper() if not asn.startswith('AS'): query = 'AS' + asn else: return None if query in self.autnums: return (self.autnums[query].get('as-name', [""])[0], self.autnums[query].get('descr', [""])[0]) def gen_html(self, out): stats = self.stats() out.write("") out.write("

Last seen in dn42 BGP (from AS {})

".format(ASN)) out.write("

Raw JSON data, Python script

") out.write("

Last update: {} (data collection started on 26th January 2014)

".format(time.strftime('%c UTC', time.gmtime()))) out.write("

Number of prefixes currently announced: {} (totalizing {:.2%} of dn42 address space)

".format(len(self.current_prefixes), stats['active'])) out.write("

Number of known prefixes since January 2014: {} (totalizing {:.2%} of dn42 address space)

".format(len(self.prefixes_db), stats['known'])) out.write("

Data comes from BGP (AS {}) and is sampled every 10 minutes. \"UP\" means that the prefix is currently announced in dn42. \"DOWN\" means that the prefix has been announced at some point, but not anymore

".format(ASN)) out.write("

" "" "" "" "" "" "" # "" "") for (prefix, data) in sorted(self.prefixes_db.items(), key=(lambda d : (d[1]["last_updated"],) + prefix_components(d[0]))): out.write("") good = data["current"] netname = self.whois(prefix) asn = data["origin_as"] as_string = 'AS' + str(asn) as_name = self.as_name(asn) if as_name: as_string += ' | ' + as_name[0] last_seen = data["last_updated"] # first_seen = time.strftime(TIMEFMT, # time.gmtime(int(data["first_seen"]))) \ # if "first_seen" in data else "?" out.write("".format("class='good'" if good else "class='bad'", "UP" if good else "DOWN")) out.write("".format(prefix)) out.write("".format(netname if netname else "?")) out.write("".format(as_string)) out.write("".format(time.strftime(TIMEFMT, time.gmtime(int(last_seen))))) # out.write("".format(first_seen)) out.write("") out.write("
StatusPrefixNetnameOriginLast seenFirst seen
{}{}{}{}{}{}

") if __name__ == '__main__': l = LastSeen(DBFILE) buf = io.StringIO() l.gen_html(buf) with open(HTMLOUT, "w+") as htmlfile: htmlfile.write(buf.getvalue())