ccoop-resplit 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. #! /usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. # Moulinette permettant de produire des relevés de comptes mensuels
  4. # au format CSV à partir d'exports CSV venant de l'interface Web du
  5. # crédit coopératif et ayant des chevauchements.
  6. import os, sys, json
  7. import csv
  8. from collections import OrderedDict
  9. from datetime import datetime
  10. import hashlib
  11. import locale
  12. locale.setlocale(locale.LC_ALL, 'fr_FR.UTF-8')
  13. class CsvStatementParser(object):
  14. def __init__(self):
  15. self.lines = OrderedDict()
  16. self.fieldnames = None
  17. self.date_fieldname = "Date"
  18. self.overlap_detector = {}
  19. self.first_ops = {}
  20. self.last_ops = {}
  21. self.daterange = [datetime.now(), datetime.fromordinal(1)]
  22. self.dups = dict() # holds counters for duplicate lines
  23. def parse(self, filename):
  24. with open(filename, encoding='iso-8859-1') as csvfile:
  25. reader = csv.DictReader(csvfile, delimiter=';')
  26. if self.fieldnames is None:
  27. # Le premier fichier parcourru détermine les noms de
  28. # colonnes attendus dans les prochains fichiers.
  29. self.fieldnames = [k for k in reader.fieldnames if k != '']
  30. # On identifie également la permière colonne qui
  31. # ressemble à une date, elle servira ensuite de clef
  32. # d'indexation.
  33. for fname in self.fieldnames:
  34. if "date" in fname.lower():
  35. self.date_fieldname = fname
  36. break
  37. if self.fieldnames != [k for k in reader.fieldnames if k != '']:
  38. print("""Fichier ignoré : %s. Cause: does not have the expected column names.
  39. Found: %s
  40. Expected: %s
  41. """ % (filename, ",".join(reader.fieldnames), ",".join(self.fieldnames)))
  42. else:
  43. self._parse_file(filename, reader)
  44. def _parse_file(self, filename, reader):
  45. self.dups = dict() # Duplicate counters must be reset for each file
  46. print("Lecture du fichier %s" % os.path.basename(filename))
  47. for row in reader:
  48. opdate = datetime.strptime(row[self.date_fieldname], '%d/%m/%Y')
  49. ophash = datetime.strftime(opdate, '%Y-%m-%d') + hashlib.md5(json.dumps(row).encode()).hexdigest()
  50. # Special use case: one file contains multiple identical lines.
  51. # Then we append a counter to the duplicate ophash.
  52. if ophash in self.dups:
  53. # print(" *** doublon trouvé dans '{}': {}".format(filename, ';'.join(row.values())))
  54. self.dups[ophash] = self.dups[ophash] + 1
  55. ophash = ophash + "-" + str(self.dups[ophash])
  56. # print(" We have now :\n {}\n {}".format("\n ".join([h + " // " + "".join(v.values()) for h,v in self.lines.items() if h.startswith(ophash[0:10])]), ophash + " // " + "".join(row.values()))) # XXX DEBUG
  57. else:
  58. self.dups[ophash] = 0
  59. self.lines[ophash] = {k:v for k,v in row.items() if k != ''}
  60. # Adjust dateranges
  61. if opdate < self.daterange[0]:
  62. self.daterange[0] = opdate
  63. if opdate > self.daterange[1]:
  64. self.daterange[1] = opdate
  65. # Prepare overlap detection
  66. if ophash not in self.overlap_detector:
  67. self.overlap_detector[ophash] = set()
  68. self.overlap_detector[ophash].add(filename)
  69. # Remember first line of each CSV file
  70. if filename not in self.first_ops:
  71. self.first_ops[filename] = ophash
  72. # Remember last line of each CSV file
  73. if filename not in self.last_ops:
  74. self.last_ops[filename] = ophash
  75. # CSV files are sometimes sorted by date ASC and sometimes
  76. # sorted by date DESC. So we may need to swap first_op and last_op.
  77. if (int(self.first_ops[filename][0:10].replace('-', '')) > int(self.last_ops[filename][0:10].replace('-', ''))):
  78. tmp = self.first_ops[filename]
  79. self.first_ops[filename] = self.last_ops[filename]
  80. self.last_ops[filename] = tmp
  81. def dump_full(self, output_filename):
  82. with open(output_filename, 'w') as outfile:
  83. writer = csv.DictWriter(outfile, self.fieldnames, delimiter=';')
  84. writer.writeheader()
  85. for line in reversed(sorted(self.lines.items())):
  86. writer.writerow(line[1])
  87. print("Relevé intégral généré dans le fichier %s" % os.path.abspath(output_filename))
  88. def dump_monthly_reports(self, outputdir):
  89. firstmonth = int('{:%Y%m}'.format(self.daterange[0])) + 1
  90. lastmonth = int('{:%Y%m}'.format(self.daterange[1])) - 1
  91. if firstmonth >= lastmonth:
  92. print("Impossible de générer des relevés mensuels car la plage de dates traitée est trop petite.")
  93. return
  94. curmonth = firstmonth
  95. def __openfile__(curmonth):
  96. fname = "releve_{0}.csv".format(curmonth)
  97. outfile = open(os.path.join(outputdir, fname), 'w')
  98. writer = csv.DictWriter(outfile, self.fieldnames, delimiter=';')
  99. writer.writeheader()
  100. return outfile, writer
  101. outfile, writer = __openfile__(curmonth)
  102. writer = csv.DictWriter(outfile, self.fieldnames, delimiter=';')
  103. for line in sorted(self.lines.items()):
  104. month = int(line[0][0:4] + line[0][5:7])
  105. if month < curmonth:
  106. continue
  107. if month > lastmonth:
  108. break
  109. if month > curmonth:
  110. outfile.close()
  111. curmonth = month
  112. if month in self.badmonths:
  113. outfile, writer = __openfile__(str(curmonth) + "_potentiellement_incomplet")
  114. else:
  115. outfile, writer = __openfile__(curmonth)
  116. writer.writerow(line[1])
  117. outfile.close()
  118. print("Relevés mensuels générés dans le dossier %s" % os.path.abspath(outputdir))
  119. def check_overlaps(self):
  120. """
  121. Helps finding possible missing operations if exported CSV files
  122. are not "contiguous".
  123. """
  124. self.badmonths = set()
  125. print("\nRecherche de chevauchements, car les chevauchements de fichiers CSV c'est bien, ça confirme qu'il n'y a pas d'écritures manquantes...")
  126. for filename, first_op in self.first_ops.items():
  127. if first_op in self.overlap_detector:
  128. otherfiles = [v for v in self.overlap_detector.get(first_op)]
  129. otherfiles.remove(filename)
  130. if len(otherfiles) > 0:
  131. # Eliminate files having the same first_op
  132. otherfiles[:] = [candidate for candidate in otherfiles if self.first_ops[candidate] != first_op]
  133. if len(otherfiles) == 0 and first_op[0:10] != "{0:%Y-%m-%d}".format(self.daterange[0]):
  134. self.badmonths.add(int(first_op[0:7].replace('-', '')))
  135. print("Attention. Il y a peut-être des écritures manquantes avant le %s (fichier %s)." % (first_op[0:10], os.path.basename(filename)))
  136. for filename, last_op in self.last_ops.items():
  137. if last_op in self.overlap_detector:
  138. otherfiles = [v for v in self.overlap_detector.get(last_op)]
  139. otherfiles.remove(filename)
  140. if len(otherfiles) > 0:
  141. # Eliminate files having the same last_op
  142. otherfiles[:] = [candidate for candidate in otherfiles if self.last_ops[candidate] != last_op]
  143. if len(otherfiles) == 0 and last_op[0:10] != "{0:%Y-%m-%d}".format(self.daterange[1]):
  144. self.badmonths.add(int(last_op[0:7].replace('-', '')))
  145. print("Attention. Il y a peut-être des écritures manquantes après le %s (fichier %s)." % (last_op[0:10], os.path.basename(filename)))
  146. print("")
  147. def start_cli(dirpath, outputdir):
  148. # Lecture des fichiers CSV présents dans le dossier
  149. p = CsvStatementParser()
  150. for f in sorted(os.listdir(dirpath)):
  151. if f.endswith('.csv') or f.endswith('.CSV'):
  152. p.parse(os.path.join(dirpath, f))
  153. print("Les écritures lues s'étalent entre le {0:%d %B %Y} et le {1:%d %B %Y}.".format(p.daterange[0], p.daterange[1]))
  154. # Recherche de chevauchements
  155. p.check_overlaps()
  156. # Générer un relevé intégral et des relevés mensuels
  157. suffix = "_{0:%Y-%m-%d}__{1:%Y-%m-%d}".format(p.daterange[0], p.daterange[1])
  158. if len(p.badmonths): suffix += "_avec_des_trous"
  159. p.dump_full(os.path.join(outputdir, "integral%s.csv" % suffix))
  160. p.dump_monthly_reports(outputdir)
  161. if __name__ == '__main__':
  162. if len(sys.argv) < 2:
  163. print("Erreur. Merci de préciser le chemin du dossier où se trouvent les fichiers CSV à analyser.")
  164. print("Usage:")
  165. print(" %s exports_csv/ csv_mensuels/" % sys.argv[0])
  166. sys.exit(1)
  167. inputdir = sys.argv[1]
  168. if len(sys.argv) > 2:
  169. outputdir = sys.argv[2]
  170. else:
  171. outputdir = os.path.join(inputdir, "outputdir")
  172. # Création d'un dossier output si besoin
  173. if not os.path.isdir(outputdir):
  174. os.makedirs(outputdir)
  175. start_cli(inputdir, outputdir)