vpn_acct.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. #!/usr/bin/python
  2. import os, sys, socket, syslog
  3. from time import sleep
  4. from glob import iglob
  5. from datetime import datetime
  6. import argparse
  7. import pickle, struct
  8. import pyinotify
  9. sock = socket.socket()
  10. last_timestamp = 0
  11. def logging(message, level = syslog.LOG_INFO):
  12. syslog.syslog(level, message)
  13. #print message
  14. def read_vpn_acct_file(filename):
  15. logging("Reading file %s..." % (filename, ), syslog.LOG_DEBUG)
  16. tstamp = 0
  17. with open(filename) as f:
  18. data = []
  19. for line in f:
  20. if line.startswith('# time:'):
  21. tstamp = int(line.split(':')[1].strip())
  22. elif line.startswith('#'):
  23. pass
  24. else:
  25. data.append(line.split())
  26. metrics = []
  27. for d in data:
  28. # format: username ip qos uptxbytes downrxbytes
  29. if len(d) == 5:
  30. metrics.extend([
  31. ( '%s.%s.uptxbytes' % (options.prefix, d[0]), (tstamp, int(d[3])) ),
  32. ( '%s.%s.downrxbytes' % (options.prefix, d[0]), (tstamp, int(d[4])) ),
  33. ])
  34. return tstamp, metrics
  35. def send_to_carbon(metrics):
  36. # http://graphite.readthedocs.org/en/latest/feeding-carbon.html
  37. logging("Sending %d metrics..." % len(metrics), syslog.LOG_DEBUG)
  38. payload = pickle.dumps(metrics)
  39. header = struct.pack("!L", len(payload))
  40. message = header + payload
  41. sock.sendall(message)
  42. def process_vpn_file(filename, old_tstamp = 0):
  43. logging("Processing VPN file %s..." % (filename, ), syslog.LOG_DEBUG)
  44. global last_timestamp
  45. tstamp, metrics = read_vpn_acct_file(filename)
  46. if tstamp >= old_tstamp:
  47. try:
  48. send_to_carbon(metrics)
  49. except socket.error:
  50. if last_timestamp == 0: # error not detected yet
  51. last_timestamp = tstamp
  52. def handle_old_vpn_files(directory, old_tstamp):
  53. # We go back one minute earlier to scan the files. However, we will do an exact
  54. # comparison when handling the matching files (see process_vpn_file() function).
  55. old_date = datetime.fromtimestamp(old_tstamp-60)
  56. logging("Starting processing deferred files starting at %s..." % (old_date.strftime('%Y%m%d%H%M%S'), ), syslog.LOG_DEBUG)
  57. for f in iglob(directory + '/*'):
  58. try:
  59. tmp_tstamp = datetime.strptime(os.path.basename(f).split('-')[0], '%Y%m%d%H%M%S')
  60. except ValueError:
  61. # Bad filename format: likely not an accounting file...
  62. pass
  63. else:
  64. if tmp_tstamp > old_date:
  65. logging("Processing deferred file %s..." % (f, ), syslog.LOG_DEBUG)
  66. process_vpn_file(f, old_tstamp)
  67. class VPNAcctHandler(pyinotify.ProcessEvent):
  68. def process_IN_MOVED_TO(self, event):
  69. self.handle_file(event)
  70. def process_IN_CLOSE_WRITE(self, event):
  71. self.handle_file(event)
  72. def handle_file(self, event):
  73. logging("Event detected for file %s..." % (event.pathname, ), syslog.LOG_DEBUG)
  74. if last_timestamp > 0:
  75. # Error ongoing: will wait till the connection is back (and last_timestamp == 0)
  76. logging("Error ongoing: %s will be processed later..." % (event.pathname, ), syslog.LOG_DEBUG)
  77. return
  78. process_vpn_file(event.pathname)
  79. if __name__ == '__main__':
  80. parser = argparse.ArgumentParser(
  81. description="Watch for a specific directory containing OpenVPN accounting data " \
  82. "and as soon as a new file is created, parse it and send the " \
  83. "accounting data to a Carbon daemon.",
  84. epilog="Send comments/remarks/complaints to <tech@illyse.org>.",
  85. formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  86. parser.add_argument('directory', help='directory to watch')
  87. parser.add_argument('prefix', help='prefix for Graphite metrics')
  88. parser.add_argument('-t', '--timestamp', dest='tstamp', type=int, default=0, help='Timestamp to recover data from')
  89. parser.add_argument('-s', '--server', dest='server', default='localhost', help='Carbon daemon address')
  90. parser.add_argument('-p', '--port', dest='port', type=int, default=2004, help='Carbon daemon port')
  91. options = parser.parse_args()
  92. logging("Connecting to Carbon agent on %(server)s on port %(port)d..." %
  93. { 'server': options.server, 'port': options.port, }, syslog.LOG_DEBUG)
  94. try:
  95. sock = socket.create_connection( (options.server, options.port) )
  96. except socket.error:
  97. logging("Couldn't connect to %(server)s on port %(port)d, is Carbon agent running?" %
  98. { 'server': options.server, 'port': options.port, }, syslog.LOG_ERR)
  99. sys.exit(1)
  100. if options.tstamp > 0:
  101. handle_old_vpn_files(options.directory, options.tstamp)
  102. last_timestamp = 0
  103. def on_loop(notifier):
  104. """
  105. Function called after each event loop to handle connexion errors.
  106. """
  107. global last_timestamp, sock
  108. if last_timestamp > 0:
  109. sock.close()
  110. try:
  111. sock = socket.create_connection( (options.server, options.port) )
  112. except socket.error:
  113. logging("Couldn't connect to %(server)s on port %(port)d, is Carbon agent running?" %
  114. { 'server': options.server, 'port': options.port, }, syslog.LOG_ERR)
  115. else:
  116. handle_old_vpn_files(options.directory, last_timestamp)
  117. last_timestamp = 0
  118. # https://github.com/seb-m/pyinotify/blob/master/python2/examples/daemon.py
  119. logging("Starting to watch %s directory..." % (options.directory, ))
  120. wm = pyinotify.WatchManager()
  121. handler = VPNAcctHandler()
  122. notifier = pyinotify.Notifier(wm, handler)
  123. wm.add_watch(options.directory, pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO)
  124. try:
  125. notifier.loop(daemonize=True, callback=on_loop, pid_file='/var/run/pyinotify.pid', stdout='/var/log/pyinotify.log')
  126. except pyinotify.NotifierError, err:
  127. print >> sys.stderr, err