#!/usr/bin/python import os, sys, socket, syslog from time import sleep from glob import iglob from datetime import datetime import argparse import pickle, struct import pyinotify sock = socket.socket() last_timestamp = 0 def logging(message, level = syslog.LOG_INFO): syslog.syslog(level, message) #print message def read_vpn_acct_file(filename): logging("Reading file %s..." % (filename, ), syslog.LOG_DEBUG) tstamp = 0 with open(filename) as f: data = [] for line in f: if line.startswith('# time:'): tstamp = int(line.split(':')[1].strip()) elif line.startswith('#'): pass else: data.append(line.split()) metrics = [] for d in data: # format: username ip qos uptxbytes downrxbytes if len(d) == 5: metrics.extend([ ( '%s.%s.uptxbytes' % (options.prefix, d[0]), (tstamp, int(d[3])) ), ( '%s.%s.downrxbytes' % (options.prefix, d[0]), (tstamp, int(d[4])) ), ]) return tstamp, metrics def send_to_carbon(metrics): # http://graphite.readthedocs.org/en/latest/feeding-carbon.html logging("Sending %d metrics..." % len(metrics), syslog.LOG_DEBUG) payload = pickle.dumps(metrics) header = struct.pack("!L", len(payload)) message = header + payload sock.sendall(message) def process_vpn_file(filename, old_tstamp = 0): logging("Processing VPN file %s..." % (filename, ), syslog.LOG_DEBUG) global last_timestamp tstamp, metrics = read_vpn_acct_file(filename) if tstamp >= old_tstamp: try: send_to_carbon(metrics) except socket.error: if last_timestamp == 0: # error not detected yet last_timestamp = tstamp def handle_old_vpn_files(directory, old_tstamp): # We go back one minute earlier to scan the files. However, we will do an exact # comparison when handling the matching files (see process_vpn_file() function). old_date = datetime.fromtimestamp(old_tstamp-60) logging("Starting processing deferred files starting at %s..." % (old_date.strftime('%Y%m%d%H%M%S'), ), syslog.LOG_DEBUG) for f in iglob(directory + '/*'): try: tmp_tstamp = datetime.strptime(os.path.basename(f).split('-')[0], '%Y%m%d%H%M%S') except ValueError: # Bad filename format: likely not an accounting file... pass else: if tmp_tstamp > old_date: logging("Processing deferred file %s..." % (f, ), syslog.LOG_DEBUG) process_vpn_file(f, old_tstamp) class VPNAcctHandler(pyinotify.ProcessEvent): def process_IN_MOVED_TO(self, event): self.handle_file(event) def process_IN_CLOSE_WRITE(self, event): self.handle_file(event) def handle_file(self, event): logging("Event detected for file %s..." % (event.pathname, ), syslog.LOG_DEBUG) if last_timestamp > 0: # Error ongoing: will wait till the connection is back (and last_timestamp == 0) logging("Error ongoing: %s will be processed later..." % (event.pathname, ), syslog.LOG_DEBUG) return process_vpn_file(event.pathname) if __name__ == '__main__': parser = argparse.ArgumentParser( description="Watch for a specific directory containing OpenVPN accounting data " \ "and as soon as a new file is created, parse it and send the " \ "accounting data to a Carbon daemon.", epilog="Send comments/remarks/complaints to .", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('directory', help='directory to watch') parser.add_argument('prefix', help='prefix for Graphite metrics') parser.add_argument('-t', '--timestamp', dest='tstamp', type=int, default=0, help='Timestamp to recover data from') parser.add_argument('-s', '--server', dest='server', default='localhost', help='Carbon daemon address') parser.add_argument('-p', '--port', dest='port', type=int, default=2004, help='Carbon daemon port') options = parser.parse_args() logging("Connecting to Carbon agent on %(server)s on port %(port)d..." % { 'server': options.server, 'port': options.port, }, syslog.LOG_DEBUG) try: sock = socket.create_connection( (options.server, options.port) ) except socket.error: logging("Couldn't connect to %(server)s on port %(port)d, is Carbon agent running?" % { 'server': options.server, 'port': options.port, }, syslog.LOG_ERR) sys.exit(1) if options.tstamp > 0: handle_old_vpn_files(options.directory, options.tstamp) last_timestamp = 0 def on_loop(notifier): """ Function called after each event loop to handle connexion errors. """ global last_timestamp, sock if last_timestamp > 0: sock.close() try: sock = socket.create_connection( (options.server, options.port) ) except socket.error: logging("Couldn't connect to %(server)s on port %(port)d, is Carbon agent running?" % { 'server': options.server, 'port': options.port, }, syslog.LOG_ERR) else: handle_old_vpn_files(options.directory, last_timestamp) last_timestamp = 0 # https://github.com/seb-m/pyinotify/blob/master/python2/examples/daemon.py logging("Starting to watch %s directory..." % (options.directory, )) wm = pyinotify.WatchManager() handler = VPNAcctHandler() notifier = pyinotify.Notifier(wm, handler) wm.add_watch(options.directory, pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO) try: notifier.loop(daemonize=True, callback=on_loop, pid_file='/var/run/pyinotify.pid', stdout='/var/log/pyinotify.log') except pyinotify.NotifierError, err: print >> sys.stderr, err