123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- from flask import Flask
- from flask import request, render_template
- from flask.ext.sqlalchemy import SQLAlchemy
- #from flask import session, request, url_for, redirect, render_template
- from netaddr import IPAddress
- # Hack for python3
- from netaddr.strategy.ipv4 import packed_to_int as unpack_v4
- from netaddr.strategy.ipv6 import packed_to_int as unpack_v6
- from datetime import datetime
- app = Flask(__name__)
- app.config.from_pyfile('config.py')
- db = SQLAlchemy(app)
- def unpack(ip):
- if len(ip) == 4:
- return unpack_v4(ip)
- elif len(ip) == 16:
- return unpack_v6(ip)
- class Target(db.Model):
- """Target IP to ping"""
- id = db.Column(db.Integer, primary_key=True)
- # IP addresses are encoded as their binary representation
- ip = db.Column(db.BINARY(length=16))
- # Date at which a user asked for measurements to this target
- submitted = db.Column(db.DateTime)
- def __init__(self, ip):
- self.ip = IPAddress(ip).packed
- self.submitted = datetime.now()
- def get_ip(self):
- return IPAddress(unpack(self.ip))
- def is_v4(self):
- return self.get_ip().version == 4
- def is_v6(self):
- return self.get_ip().version == 6
- def __repr__(self):
- return '%r' % self.get_ip()
- def __str__(self):
- return str(self.get_ip())
- class Result(db.Model):
- """Result of a ping measurement"""
- id = db.Column(db.Integer, primary_key=True)
- target_id = db.Column(db.Integer, db.ForeignKey('target.id'))
- target = db.relationship('Target',
- backref=db.backref('results', lazy='dynamic'))
- # Free-form (e.g. machine name). No IP, for privacy.
- source = db.Column(db.String)
- # In milliseconds
- rtt = db.Column(db.Float)
- def __init__(self, target_id, source, rtt):
- target = Target.query.get_or_404(int(target_id))
- self.target = target
- self.source = source
- self.rtt = float(rtt)
- def init_db():
- db.create_all()
- @app.route('/')
- def homepage():
- return render_template('home.html')
- @app.route('/submit', methods=['POST'])
- def submit_job():
- if 'target' in request.form:
- target = Target(request.form['target'])
- db.session.add(target)
- db.session.commit()
- return "Launching jobs towards {}".format(target)
- else:
- return "Invalid arguments"
- @app.route('/targets/all')
- def get_jobs():
- """"List of targets to ping"""
- targets = Target.query.order_by('-id').all()
- return "\n".join("{} {}".format(t.id, t) for t in targets)
- @app.route('/targets/ipv4')
- def get_v4jobs():
- """"List of IPv4 targets to ping"""
- targets = Target.query.order_by('-id').all()
- return "\n".join("{} {}".format(t.id, t) for t in targets if t.is_v4())
- @app.route('/targets/ipv6')
- def get_v6jobs():
- """"List of IPv6 targets to ping"""
- targets = Target.query.order_by('-id').all()
- return "\n".join("{} {}".format(t.id, t) for t in targets if t.is_v6())
- @app.route('/result/report', methods=['POST'])
- def report_result():
- if {'rtt', 'target'}.issubset(request.form):
- target = request.form['target']
- rtt = request.form['rtt']
- source = request.form.get('source', "")
- result = Result(target, source, rtt)
- db.session.add(result)
- db.session.commit()
- return "OK\n"
- else:
- return "Invalid arguments\n"
- @app.route('/result/show/<int:target_id>')
- def show_results(target_id):
- target = Target.query.get_or_404(target_id)
- return "<br >\n".join("Result: {} to {} in {} ms".format(r.source, r.target, r.rtt) for r in target.results.order_by('-id').all())
- if __name__ == '__main__':
- init_db()
- app.run(host='0.0.0.0', port=8888)
|