123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- #!/usr/bin/python
- import os, sys, socket, syslog
- from time import sleep
- from glob import iglob
- from datetime import datetime
- import argparse
- import pickle, struct
- import pyinotify
- sock = socket.socket()
- last_timestamp = 0
- def logging(message, level = syslog.LOG_INFO):
- syslog.syslog(level, message)
- #print message
- def read_vpn_acct_file(filename):
- logging("Reading file %s..." % (filename, ), syslog.LOG_DEBUG)
- tstamp = 0
- with open(filename) as f:
- data = []
- for line in f:
- if line.startswith('# time:'):
- tstamp = int(line.split(':')[1].strip())
- elif line.startswith('#'):
- pass
- else:
- data.append(line.split())
- metrics = []
- for d in data:
- # format: username ip qos uptxbytes downrxbytes
- if len(d) == 5:
- metrics.extend([
- ( '%s.%s.uptxbytes' % (options.prefix, d[0]), (tstamp, int(d[3])) ),
- ( '%s.%s.downrxbytes' % (options.prefix, d[0]), (tstamp, int(d[4])) ),
- ])
- return tstamp, metrics
- def send_to_carbon(metrics):
- # http://graphite.readthedocs.org/en/latest/feeding-carbon.html
- logging("Sending %d metrics..." % len(metrics), syslog.LOG_DEBUG)
- payload = pickle.dumps(metrics)
- header = struct.pack("!L", len(payload))
- message = header + payload
- sock.sendall(message)
-
- def process_vpn_file(filename, old_tstamp = 0):
- logging("Processing VPN file %s..." % (filename, ), syslog.LOG_DEBUG)
- global last_timestamp
- tstamp, metrics = read_vpn_acct_file(filename)
- if tstamp >= old_tstamp:
- try:
- send_to_carbon(metrics)
- except socket.error:
- if last_timestamp == 0: # error not detected yet
- last_timestamp = tstamp
- def handle_old_vpn_files(directory, old_tstamp):
- # We go back one minute earlier to scan the files. However, we will do an exact
- # comparison when handling the matching files (see process_vpn_file() function).
-
- old_date = datetime.fromtimestamp(old_tstamp-60)
- logging("Starting processing deferred files starting at %s..." % (old_date.strftime('%Y%m%d%H%M%S'), ), syslog.LOG_DEBUG)
- for f in iglob(directory + '/*'):
- try:
- tmp_tstamp = datetime.strptime(os.path.basename(f).split('-')[0], '%Y%m%d%H%M%S')
- except ValueError:
- # Bad filename format: likely not an accounting file...
- pass
- else:
- if tmp_tstamp > old_date:
- logging("Processing deferred file %s..." % (f, ), syslog.LOG_DEBUG)
- process_vpn_file(f, old_tstamp)
- class VPNAcctHandler(pyinotify.ProcessEvent):
- def process_IN_MOVED_TO(self, event):
- self.handle_file(event)
- def process_IN_CLOSE_WRITE(self, event):
- self.handle_file(event)
- def handle_file(self, event):
- logging("Event detected for file %s..." % (event.pathname, ), syslog.LOG_DEBUG)
- if last_timestamp > 0:
- # Error ongoing: will wait till the connection is back (and last_timestamp == 0)
- logging("Error ongoing: %s will be processed later..." % (event.pathname, ), syslog.LOG_DEBUG)
- return
- process_vpn_file(event.pathname)
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(
- description="Watch for a specific directory containing OpenVPN accounting data " \
- "and as soon as a new file is created, parse it and send the " \
- "accounting data to a Carbon daemon.",
- epilog="Send comments/remarks/complaints to <tech@illyse.org>.",
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
- parser.add_argument('directory', help='directory to watch')
- parser.add_argument('prefix', help='prefix for Graphite metrics')
- parser.add_argument('-t', '--timestamp', dest='tstamp', type=int, default=0, help='Timestamp to recover data from')
- parser.add_argument('-s', '--server', dest='server', default='localhost', help='Carbon daemon address')
- parser.add_argument('-p', '--port', dest='port', type=int, default=2004, help='Carbon daemon port')
- options = parser.parse_args()
- logging("Connecting to Carbon agent on %(server)s on port %(port)d..." %
- { 'server': options.server, 'port': options.port, }, syslog.LOG_DEBUG)
- try:
- sock = socket.create_connection( (options.server, options.port) )
- except socket.error:
- logging("Couldn't connect to %(server)s on port %(port)d, is Carbon agent running?" %
- { 'server': options.server, 'port': options.port, }, syslog.LOG_ERR)
- sys.exit(1)
- if options.tstamp > 0:
- handle_old_vpn_files(options.directory, options.tstamp)
-
- last_timestamp = 0
- def on_loop(notifier):
- """
- Function called after each event loop to handle connexion errors.
- """
- global last_timestamp, sock
- if last_timestamp > 0:
- sock.close()
- try:
- sock = socket.create_connection( (options.server, options.port) )
- except socket.error:
- logging("Couldn't connect to %(server)s on port %(port)d, is Carbon agent running?" %
- { 'server': options.server, 'port': options.port, }, syslog.LOG_ERR)
- else:
- handle_old_vpn_files(options.directory, last_timestamp)
- last_timestamp = 0
- # https://github.com/seb-m/pyinotify/blob/master/python2/examples/daemon.py
- logging("Starting to watch %s directory..." % (options.directory, ))
- wm = pyinotify.WatchManager()
- handler = VPNAcctHandler()
- notifier = pyinotify.Notifier(wm, handler)
- wm.add_watch(options.directory, pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO)
- try:
- notifier.loop(daemonize=True, callback=on_loop, pid_file='/var/run/pyinotify.pid', stdout='/var/log/pyinotify.log')
- except pyinotify.NotifierError, err:
- print >> sys.stderr, err
|