#!/usr/bin/python import os, sys, socket, syslog from time import sleep from glob import iglob from datetime import datetime import argparse import pickle, struct import pyinotify sock = socket.socket() last_timestamp = 0 # This file could not be formatted by Black. def logging(message, level = syslog.LOG_INFO): syslog.syslog(level, message) #print message def read_vpn_acct_file(filename): logging("Reading file %s..." % (filename, ), syslog.LOG_DEBUG) tstamp = 0 with open(filename) as f: data = [] for line in f: if line.startswith('# time:'): tstamp = int(line.split(':')[1].strip()) elif line.startswith('#'): pass else: data.append(line.split()) metrics = [] for d in data: # format: username ip qos uptxbytes downrxbytes if len(d) == 5: metrics.extend([ ('%s.%s.uptxbytes' % (options.prefix, d[0]), (tstamp, int(d[3]))), ('%s.%s.downrxbytes' % (options.prefix, d[0]), (tstamp, int(d[4]))), ]) return tstamp, metrics def send_to_carbon(metrics): # http://graphite.readthedocs.org/en/latest/feeding-carbon.html logging("Sending %d metrics..." % len(metrics), syslog.LOG_DEBUG) payload = pickle.dumps(metrics) header = struct.pack("!L", len(payload)) message = header + payload sock.sendall(message) def process_vpn_file(filename, old_tstamp = 0): logging("Processing VPN file %s..." % (filename,), syslog.LOG_DEBUG) global last_timestamp tstamp, metrics = read_vpn_acct_file(filename) if tstamp >= old_tstamp: try: send_to_carbon(metrics) except socket.error: if last_timestamp == 0: # error not detected yet last_timestamp = tstamp def handle_old_vpn_files(directory, old_tstamp): # We go back one minute earlier to scan the files. However, we will do an exact # comparison when handling the matching files (see process_vpn_file() function). old_date = datetime.fromtimestamp(old_tstamp-60) logging( "Starting processing deferred files starting at %s..." % (old_date.strftime('%Y%m%d%H%M%S'),), syslog.LOG_DEBUG ) for f in iglob(directory + '/*'): try: tmp_tstamp = datetime.strptime( os.path.basename(f).split('-')[0], '%Y%m%d%H%M%S' ) except ValueError: # Bad filename format: likely not an accounting file... pass else: if tmp_tstamp > old_date: logging( "Processing deferred file %s..." % (f,), syslog.LOG_DEBUG ) process_vpn_file(f, old_tstamp) class VPNAcctHandler(pyinotify.ProcessEvent): def process_IN_MOVED_TO(self, event): self.handle_file(event) def process_IN_CLOSE_WRITE(self, event): self.handle_file(event) def handle_file(self, event): logging( "Event detected for file %s..." % (event.pathname,), syslog.LOG_DEBUG ) if last_timestamp > 0: # Error ongoing: will wait till the connection is back (and last_timestamp == 0) logging( "Error ongoing: %s will be processed later..." % (event.pathname,), syslog.LOG_DEBUG ) return process_vpn_file(event.pathname) if __name__ == '__main__': parser = argparse.ArgumentParser( description="Watch for a specific directory containing OpenVPN accounting data " \ "and as soon as a new file is created, parse it and send the " \ "accounting data to a Carbon daemon.", epilog="Send comments/remarks/complaints to .", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('directory', help='directory to watch') parser.add_argument('prefix', help='prefix for Graphite metrics') parser.add_argument('-t', '--timestamp', dest='tstamp', type=int, default=0, help='Timestamp to recover data from') parser.add_argument('-s', '--server', dest='server', default='localhost', help='Carbon daemon address') parser.add_argument('-p', '--port', dest='port', type=int, default=2004, help='Carbon daemon port') options = parser.parse_args() logging( "Connecting to Carbon agent on %(server)s on port %(port)d..." % {"server": options.server, "port": options.port}, syslog.LOG_DEBUG ) try: sock = socket.create_connection( (options.server, options.port) ) except socket.error: logging( "Couldn't connect to %(server)s on port %(port)d, is Carbon agent running?" % {"server": options.server, "port": options.port}, syslog.LOG_ERR ) sys.exit(1) if options.tstamp > 0: handle_old_vpn_files(options.directory, options.tstamp) last_timestamp = 0 def on_loop(notifier): """ Function called after each event loop to handle connexion errors. """ global last_timestamp, sock if last_timestamp > 0: sock.close() try: sock = socket.create_connection((options.server, options.port)) except socket.error: logging( "Couldn't connect to %(server)s on port %(port)d, is Carbon agent running?" % {"server": options.server, "port": options.port}, syslog.LOG_ERR ) else: handle_old_vpn_files(options.directory, last_timestamp) last_timestamp = 0 # https://github.com/seb-m/pyinotify/blob/master/python2/examples/daemon.py logging("Starting to watch %s directory..." % (options.directory,)) wm = pyinotify.WatchManager() handler = VPNAcctHandler() notifier = pyinotify.Notifier(wm, handler) wm.add_watch( options.directory, pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_TO ) try: notifier.loop( daemonize=True, callback=on_loop, pid_file="/var/run/pyinotify.pid", stdout="/var/log/pyinotify.log" ) except pyinotify.NotifierError, err: print >> sys.stderr, err