vpn_acct.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. #!/usr/bin/python
  2. import os, sys, socket, syslog
  3. from time import sleep
  4. from glob import iglob
  5. from datetime import datetime
  6. import argparse
  7. import pickle, struct
  8. import pyinotify
  9. sock = socket.socket()
  10. last_timestamp = 0
  11. # This file could not be formatted by Black.
  12. def logging(message, level = syslog.LOG_INFO):
  13. syslog.syslog(level, message)
  14. #print message
  15. def read_vpn_acct_file(filename):
  16. logging("Reading file %s..." % (filename, ), syslog.LOG_DEBUG)
  17. tstamp = 0
  18. with open(filename) as f:
  19. data = []
  20. for line in f:
  21. if line.startswith('# time:'):
  22. tstamp = int(line.split(':')[1].strip())
  23. elif line.startswith('#'):
  24. pass
  25. else:
  26. data.append(line.split())
  27. metrics = []
  28. for d in data:
  29. # format: username ip qos uptxbytes downrxbytes
  30. if len(d) == 5:
  31. metrics.extend([
  32. ('%s.%s.uptxbytes' %
  33. (options.prefix, d[0]), (tstamp, int(d[3]))),
  34. ('%s.%s.downrxbytes' %
  35. (options.prefix, d[0]), (tstamp, int(d[4]))),
  36. ])
  37. return tstamp, metrics
  38. def send_to_carbon(metrics):
  39. # http://graphite.readthedocs.org/en/latest/feeding-carbon.html
  40. logging("Sending %d metrics..." % len(metrics), syslog.LOG_DEBUG)
  41. payload = pickle.dumps(metrics)
  42. header = struct.pack("!L", len(payload))
  43. message = header + payload
  44. sock.sendall(message)
  45. def process_vpn_file(filename, old_tstamp = 0):
  46. logging("Processing VPN file %s..." % (filename,), syslog.LOG_DEBUG)
  47. global last_timestamp
  48. tstamp, metrics = read_vpn_acct_file(filename)
  49. if tstamp >= old_tstamp:
  50. try:
  51. send_to_carbon(metrics)
  52. except socket.error:
  53. if last_timestamp == 0: # error not detected yet
  54. last_timestamp = tstamp
  55. def handle_old_vpn_files(directory, old_tstamp):
  56. # We go back one minute earlier to scan the files. However, we will do an exact
  57. # comparison when handling the matching files (see process_vpn_file() function).
  58. old_date = datetime.fromtimestamp(old_tstamp-60)
  59. logging(
  60. "Starting processing deferred files starting at %s..." %
  61. (old_date.strftime('%Y%m%d%H%M%S'),),
  62. syslog.LOG_DEBUG
  63. )
  64. for f in iglob(directory + '/*'):
  65. try:
  66. tmp_tstamp = datetime.strptime(
  67. os.path.basename(f).split('-')[0],
  68. '%Y%m%d%H%M%S'
  69. )
  70. except ValueError:
  71. # Bad filename format: likely not an accounting file...
  72. pass
  73. else:
  74. if tmp_tstamp > old_date:
  75. logging(
  76. "Processing deferred file %s..." % (f,),
  77. syslog.LOG_DEBUG
  78. )
  79. process_vpn_file(f, old_tstamp)
  80. class VPNAcctHandler(pyinotify.ProcessEvent):
  81. def process_IN_MOVED_TO(self, event):
  82. self.handle_file(event)
  83. def process_IN_CLOSE_WRITE(self, event):
  84. self.handle_file(event)
  85. def handle_file(self, event):
  86. logging(
  87. "Event detected for file %s..." % (event.pathname,),
  88. syslog.LOG_DEBUG
  89. )
  90. if last_timestamp > 0:
  91. # Error ongoing: will wait till the connection is back (and last_timestamp == 0)
  92. logging(
  93. "Error ongoing: %s will be processed later..." % (event.pathname,),
  94. syslog.LOG_DEBUG
  95. )
  96. return
  97. process_vpn_file(event.pathname)
  98. if __name__ == '__main__':
  99. parser = argparse.ArgumentParser(
  100. description="Watch for a specific directory containing OpenVPN accounting data " \
  101. "and as soon as a new file is created, parse it and send the " \
  102. "accounting data to a Carbon daemon.",
  103. epilog="Send comments/remarks/complaints to <tech@illyse.org>.",
  104. formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  105. parser.add_argument('directory', help='directory to watch')
  106. parser.add_argument('prefix', help='prefix for Graphite metrics')
  107. parser.add_argument('-t', '--timestamp', dest='tstamp', type=int, default=0, help='Timestamp to recover data from')
  108. parser.add_argument('-s', '--server', dest='server', default='localhost', help='Carbon daemon address')
  109. parser.add_argument('-p', '--port', dest='port', type=int, default=2004, help='Carbon daemon port')
  110. options = parser.parse_args()
  111. logging(
  112. "Connecting to Carbon agent on %(server)s on port %(port)d..." %
  113. {"server": options.server, "port": options.port},
  114. syslog.LOG_DEBUG
  115. )
  116. try:
  117. sock = socket.create_connection( (options.server, options.port) )
  118. except socket.error:
  119. logging(
  120. "Couldn't connect to %(server)s on port %(port)d, is Carbon agent running?" %
  121. {"server": options.server, "port": options.port},
  122. syslog.LOG_ERR
  123. )
  124. sys.exit(1)
  125. if options.tstamp > 0:
  126. handle_old_vpn_files(options.directory, options.tstamp)
  127. last_timestamp = 0
  128. def on_loop(notifier):
  129. """
  130. Function called after each event loop to handle connexion errors.
  131. """
  132. global last_timestamp, sock
  133. if last_timestamp > 0:
  134. sock.close()
  135. try:
  136. sock = socket.create_connection((options.server, options.port))
  137. except socket.error:
  138. logging(
  139. "Couldn't connect to %(server)s on port %(port)d, is Carbon agent running?" %
  140. {"server": options.server, "port": options.port},
  141. syslog.LOG_ERR
  142. )
  143. else:
  144. handle_old_vpn_files(options.directory, last_timestamp)
  145. last_timestamp = 0
  146. # https://github.com/seb-m/pyinotify/blob/master/python2/examples/daemon.py
  147. logging("Starting to watch %s directory..." % (options.directory,))
  148. wm = pyinotify.WatchManager()
  149. handler = VPNAcctHandler()
  150. notifier = pyinotify.Notifier(wm, handler)
  151. wm.add_watch(
  152. options.directory,
  153. pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_TO
  154. )
  155. try:
  156. notifier.loop(
  157. daemonize=True,
  158. callback=on_loop,
  159. pid_file="/var/run/pyinotify.pid",
  160. stdout="/var/log/pyinotify.log"
  161. )
  162. except pyinotify.NotifierError, err:
  163. print >> sys.stderr, err