ccoop_resplit.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. #! /usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. # Moulinette permettant de produire des relevés de comptes mensuels
  4. # au format CSV à partir d'exports CSV venant de l'interface Web du
  5. # crédit coopératif et ayant des chevauchements.
  6. import os, sys, json
  7. import csv
  8. from collections import OrderedDict
  9. from datetime import datetime
  10. import hashlib
  11. import locale
  12. locale.setlocale(locale.LC_ALL, 'fr_FR.UTF-8')
  13. class CsvStatementParser(object):
  14. def __init__(self):
  15. self.lines = OrderedDict()
  16. self.fieldnames = None
  17. self.date_fieldname = "Date"
  18. self.overlap_detector = {}
  19. self.first_ops = {}
  20. self.last_ops = {}
  21. self.daterange = [datetime.now(), datetime.fromordinal(1)]
  22. def parse(self, filename):
  23. with open(filename, encoding='iso-8859-1') as csvfile:
  24. reader = csv.DictReader(csvfile, delimiter=';')
  25. if self.fieldnames is None:
  26. # Le premier fichier parcourru détermine les noms de
  27. # colonnes attendus dans les prochains fichiers.
  28. self.fieldnames = [k for k in reader.fieldnames if k != '']
  29. # On identifie également la permière colonne qui
  30. # ressemble à une date, elle servira ensuite de clef
  31. # d'indexation.
  32. for fname in self.fieldnames:
  33. if "date" in fname.lower():
  34. self.date_fieldname = fname
  35. break
  36. if self.fieldnames != [k for k in reader.fieldnames if k != '']:
  37. print("""Fichier ignoré : %s. Cause: does not have the expected column names.
  38. Found: %s
  39. Expected: %s
  40. """ % (filename, ",".join(reader.fieldnames), ",".join(self.fieldnames)))
  41. else:
  42. self._parse_file(filename, reader)
  43. def _parse_file(self, filename, reader):
  44. print("Lecture du fichier %s" % os.path.basename(filename))
  45. for row in reader:
  46. opdate = datetime.strptime(row[self.date_fieldname], '%d/%m/%Y')
  47. ophash = datetime.strftime(opdate, '%Y-%m-%d') + hashlib.md5(json.dumps(row).encode()).hexdigest()
  48. self.lines[ophash] = {k:v for k,v in row.items() if k != ''}
  49. # Adjust dateranges
  50. if opdate < self.daterange[0]:
  51. self.daterange[0] = opdate
  52. if opdate > self.daterange[1]:
  53. self.daterange[1] = opdate
  54. # Prepare overlap detection
  55. if ophash not in self.overlap_detector:
  56. self.overlap_detector[ophash] = set()
  57. self.overlap_detector[ophash].add(filename)
  58. # Remember first line of each CSV file
  59. if filename not in self.first_ops:
  60. self.first_ops[filename] = ophash
  61. # Remember last line of each CSV file
  62. if filename not in self.last_ops:
  63. self.last_ops[filename] = ophash
  64. # CSV files are sometimes sorted by date ASC and sometimes
  65. # sorted by date DESC. So we may need to swap first_op and last_op.
  66. if (int(self.first_ops[filename][0:10].replace('-', '')) > int(self.last_ops[filename][0:10].replace('-', ''))):
  67. tmp = self.first_ops[filename]
  68. self.first_ops[filename] = self.last_ops[filename]
  69. self.last_ops[filename] = tmp
  70. def dump_full(self, output_filename):
  71. with open(output_filename, 'w') as outfile:
  72. writer = csv.DictWriter(outfile, self.fieldnames, delimiter=';')
  73. writer.writeheader()
  74. for line in reversed(sorted(self.lines.items())):
  75. writer.writerow(line[1])
  76. print("Relevé intégral généré dans le fichier %s" % os.path.abspath(output_filename))
  77. def dump_monthly_reports(self, outputdir):
  78. firstmonth = int('{:%Y%m}'.format(self.daterange[0])) + 1
  79. lastmonth = int('{:%Y%m}'.format(self.daterange[1])) - 1
  80. if firstmonth >= lastmonth:
  81. print("Impossible de générer des relevés mensuels car la plage de dates traitée est trop petite.")
  82. return
  83. curmonth = firstmonth
  84. def __openfile__(curmonth):
  85. fname = "releve_{0}.csv".format(curmonth)
  86. outfile = open(os.path.join(outputdir, fname), 'w')
  87. writer = csv.DictWriter(outfile, self.fieldnames, delimiter=';')
  88. writer.writeheader()
  89. return outfile, writer
  90. outfile, writer = __openfile__(curmonth)
  91. writer = csv.DictWriter(outfile, self.fieldnames, delimiter=';')
  92. for line in sorted(self.lines.items()):
  93. month = int(line[0][0:4] + line[0][5:7])
  94. if month < curmonth:
  95. continue
  96. if month > lastmonth:
  97. break
  98. if month > curmonth:
  99. outfile.close()
  100. curmonth = month
  101. if month in self.badmonths:
  102. outfile, writer = __openfile__(str(curmonth) + "_potentiellement_incomplet")
  103. else:
  104. outfile, writer = __openfile__(curmonth)
  105. writer.writerow(line[1])
  106. outfile.close()
  107. print("Relevés mensuels générés dans le dossier %s" % os.path.abspath(outputdir))
  108. def check_overlaps(self):
  109. """
  110. Helps finding possible missing operations if exported CSV files
  111. are not "contiguous".
  112. """
  113. self.badmonths = set()
  114. print("\nRecherche de chevauchements, car les chevauchements de fichiers CSV c'est bien, ça confirme qu'il n'y a pas d'écritures manquantes...")
  115. for filename, first_op in self.first_ops.items():
  116. if first_op in self.overlap_detector:
  117. otherfiles = [v for v in self.overlap_detector.get(first_op)]
  118. otherfiles.remove(filename)
  119. if len(otherfiles) > 0:
  120. # Eliminate files having the same first_op
  121. otherfiles[:] = [candidate for candidate in otherfiles if self.first_ops[candidate] != first_op]
  122. if len(otherfiles) == 0 and first_op[0:10] != "{0:%Y-%m-%d}".format(self.daterange[0]):
  123. self.badmonths.add(int(first_op[0:7].replace('-', '')))
  124. print("Attention. Il y a peut-être des écritures manquantes avant le %s (fichier %s)." % (first_op[0:10], os.path.basename(filename)))
  125. for filename, last_op in self.last_ops.items():
  126. if last_op in self.overlap_detector:
  127. otherfiles = [v for v in self.overlap_detector.get(last_op)]
  128. otherfiles.remove(filename)
  129. if len(otherfiles) > 0:
  130. # Eliminate files having the same last_op
  131. otherfiles[:] = [candidate for candidate in otherfiles if self.last_ops[candidate] != last_op]
  132. if len(otherfiles) == 0 and last_op[0:10] != "{0:%Y-%m-%d}".format(self.daterange[1]):
  133. self.badmonths.add(int(last_op[0:7].replace('-', '')))
  134. print("Attention. Il y a peut-être des écritures manquantes après le %s (fichier %s)." % (last_op[0:10], os.path.basename(filename)))
  135. print("")
  136. def start_cli(dirpath, outputdir):
  137. # Lecture des fichiers CSV présents dans le dossier
  138. p = CsvStatementParser()
  139. for f in sorted(os.listdir(dirpath)):
  140. if f.endswith('.csv') or f.endswith('.CSV'):
  141. p.parse(os.path.join(dirpath, f))
  142. print("Les écritures lues s'étalent entre le {0:%d %B %Y} et le {1:%d %B %Y}.".format(p.daterange[0], p.daterange[1]))
  143. # Recherche de chevauchements
  144. p.check_overlaps()
  145. # Générer un relevé intégral et des relevés mensuels
  146. suffix = "_{0:%Y-%m-%d}__{1:%Y-%m-%d}".format(p.daterange[0], p.daterange[1])
  147. if len(p.badmonths): suffix += "_avec_des_trous"
  148. p.dump_full(os.path.join(outputdir, "integral%s.csv" % suffix))
  149. p.dump_monthly_reports(outputdir)
  150. if __name__ == '__main__':
  151. if len(sys.argv) < 2:
  152. print("Erreur. Merci de préciser le chemin du dossier où se trouvent les fichiers CSV à analyser.")
  153. print("Usage:")
  154. print(" %s exports_csv/ csv_mensuels/" % sys.argv[0])
  155. sys.exit(1)
  156. inputdir = sys.argv[1]
  157. if len(sys.argv) > 2:
  158. outputdir = sys.argv[2]
  159. else:
  160. outputdir = os.path.join(inputdir, "outputdir")
  161. # Création d'un dossier output si besoin
  162. if not os.path.isdir(outputdir):
  163. os.makedirs(outputdir)
  164. start_cli(inputdir, outputdir)