|
@@ -0,0 +1,153 @@
|
|
|
+#!/usr/bin/python
|
|
|
+import os, sys, socket, syslog
|
|
|
+from time import sleep
|
|
|
+from glob import iglob
|
|
|
+from datetime import datetime
|
|
|
+import argparse
|
|
|
+import pickle, struct
|
|
|
+import pyinotify
|
|
|
+
|
|
|
+sock = socket.socket()
|
|
|
+last_timestamp = 0
|
|
|
+
|
|
|
+def logging(message, level = syslog.LOG_INFO):
|
|
|
+ #GLOP#syslog.syslog(level, message)
|
|
|
+ print message
|
|
|
+
|
|
|
+
|
|
|
+def read_vpn_acct_file(filename):
|
|
|
+ logging("Reading file %s..." % (filename, ), syslog.LOG_DEBUG)
|
|
|
+
|
|
|
+ tstamp = 0
|
|
|
+ with open(filename) as f:
|
|
|
+ data = []
|
|
|
+ for line in f:
|
|
|
+ if line.startswith('# time:'):
|
|
|
+ tstamp = int(line.split(':')[1].strip())
|
|
|
+ elif line.startswith('#'):
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ data.append(line.split())
|
|
|
+
|
|
|
+ metrics = []
|
|
|
+ for d in data:
|
|
|
+ # format: username ip qos uptxbytes downrxbytes
|
|
|
+ if len(d) == 5:
|
|
|
+ metrics.extend([
|
|
|
+ ( 'vpn1.%s.upxtbytes' % d[0], (tstamp, int(d[3])) ),
|
|
|
+ ( 'vpn1.%s.downrxbytes' % d[0], (tstamp, int(d[4])) ),
|
|
|
+ ])
|
|
|
+ return tstamp, metrics
|
|
|
+
|
|
|
+
|
|
|
+def send_to_carbon(metrics):
|
|
|
+ # http://graphite.readthedocs.org/en/latest/feeding-carbon.html
|
|
|
+ logging("Sending %d metrics..." % len(metrics), syslog.LOG_DEBUG)
|
|
|
+ payload = pickle.dumps(metrics)
|
|
|
+ header = struct.pack("!L", len(payload))
|
|
|
+ message = header + payload
|
|
|
+ sock.sendall(message)
|
|
|
+ #GLOP#print metrics
|
|
|
+
|
|
|
+
|
|
|
+def process_vpn_file(filename, old_tstamp = 0):
|
|
|
+ logging("Processing VPN file %s..." % (filename, ), syslog.LOG_DEBUG)
|
|
|
+
|
|
|
+ global last_timestamp
|
|
|
+ tstamp, metrics = read_vpn_acct_file(filename)
|
|
|
+
|
|
|
+ if tstamp >= old_tstamp:
|
|
|
+ try:
|
|
|
+ send_to_carbon(metrics)
|
|
|
+ except socket.error:
|
|
|
+ if last_timestamp == 0: # error not detected yet
|
|
|
+ last_timestamp = tstamp
|
|
|
+
|
|
|
+
|
|
|
+def handle_old_vpn_files(directory, old_tstamp):
|
|
|
+ # We go back one minute earlier to scan the files. However, we will do an exact
|
|
|
+ # comparison when handling the matching files (see process_vpn_file() function).
|
|
|
+
|
|
|
+ old_date = datetime.fromtimestamp(old_tstamp-60)
|
|
|
+ logging("Starting processing deferred files starting at %s..." % (old_date.strftime('%Y%m%d%H%M%S'), ), syslog.LOG_DEBUG)
|
|
|
+ for f in iglob(directory + '/*'):
|
|
|
+ try:
|
|
|
+ tmp_tstamp = datetime.strptime(os.path.basename(f).split('-')[0], '%Y%m%d%H%M%S')
|
|
|
+ except ValueError:
|
|
|
+ # Bad filename format: likely not an accounting file...
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ if tmp_tstamp > old_date:
|
|
|
+ logging("Processing deferred file %s..." % (f, ), syslog.LOG_DEBUG)
|
|
|
+ process_vpn_file(f, old_tstamp)
|
|
|
+
|
|
|
+
|
|
|
+class VPNAcctHandler(pyinotify.ProcessEvent):
|
|
|
+
|
|
|
+ def process_IN_CLOSE_WRITE(self, event):
|
|
|
+ logging("Event detected for file %s..." % (event.pathname, ), syslog.LOG_DEBUG)
|
|
|
+
|
|
|
+ if last_timestamp > 0:
|
|
|
+ # Error ongoing: will wait till the connection is back (and last_timestamp == 0)
|
|
|
+ logging("Error ongoing: %s will be processed later..." % (event.pathname, ), syslog.LOG_DEBUG)
|
|
|
+ return
|
|
|
+
|
|
|
+ process_vpn_file(event.pathname)
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ parser = argparse.ArgumentParser(
|
|
|
+ description="Watch for a specific directory containing OpenVPN accounting data " \
|
|
|
+ "and as soon as a new file is created, parse it and send the " \
|
|
|
+ "accounting data to a Carbon daemon.",
|
|
|
+ epilog="Send comments/remarks/complaints to <tech@illyse.org>.",
|
|
|
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
|
|
+
|
|
|
+ parser.add_argument('directory', help='directory to watch')
|
|
|
+ parser.add_argument('-t', '--timestamp', dest='tstamp', type=int, default=0, help='Timestamp to recover data from')
|
|
|
+ parser.add_argument('-s', '--server', dest='server', default='localhost', help='Carbon daemon address')
|
|
|
+ parser.add_argument('-p', '--port', dest='port', type=int, default=2004, help='Carbon daemon port')
|
|
|
+ options = parser.parse_args()
|
|
|
+
|
|
|
+ logging("Connecting to Carbon agent on %(server)s on port %(port)d..." %
|
|
|
+ { 'server': options.server, 'port': options.port, }, syslog.LOG_DEBUG)
|
|
|
+ try:
|
|
|
+ sock = socket.create_connection( (options.server, options.port) )
|
|
|
+ except socket.error:
|
|
|
+ logging("Couldn't connect to %(server)s on port %(port)d, is Carbon agent running?" %
|
|
|
+ { 'server': options.server, 'port': options.port, }, syslog.LOG_ERR)
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+ if options.tstamp > 0:
|
|
|
+ handle_old_vpn_files(options.directory, options.tstamp)
|
|
|
+
|
|
|
+ last_timestamp = 0
|
|
|
+ def on_loop(notifier):
|
|
|
+ """
|
|
|
+ Function called after each event loop to handle connexion errors.
|
|
|
+ """
|
|
|
+ global last_timestamp, sock
|
|
|
+
|
|
|
+ if last_timestamp > 0:
|
|
|
+ sock.close()
|
|
|
+ try:
|
|
|
+ sock = socket.create_connection( (options.server, options.port) )
|
|
|
+ except socket.error:
|
|
|
+ logging("Couldn't connect to %(server)s on port %(port)d, is Carbon agent running?" %
|
|
|
+ { 'server': options.server, 'port': options.port, }, syslog.LOG_ERR)
|
|
|
+ else:
|
|
|
+ handle_old_vpn_files(options.directory, last_timestamp)
|
|
|
+ last_timestamp = 0
|
|
|
+
|
|
|
+ # https://github.com/seb-m/pyinotify/blob/master/python2/examples/daemon.py
|
|
|
+ logging("Starting to watch %s directory..." % (options.directory))
|
|
|
+ wm = pyinotify.WatchManager()
|
|
|
+ handler = VPNAcctHandler()
|
|
|
+ notifier = pyinotify.Notifier(wm, handler)
|
|
|
+ wm.add_watch(options.directory, pyinotify.IN_CLOSE_WRITE)
|
|
|
+
|
|
|
+ try:
|
|
|
+ notifier.loop(daemonize=False, callback=on_loop, pid_file='/tmp/pyinotify.pid', stdout='/tmp/pyinotify.log') #GLOP#
|
|
|
+ except pyinotify.NotifierError, err:
|
|
|
+ print >> sys.stderr, err
|
|
|
+
|