Browse Source

Script ccoop_resplit.py pour générer des relevés de comptes mensuels CSV.

pitchum 8 years ago
parent
commit
e7c558e3a8
1 changed files with 169 additions and 0 deletions
  1. 169 0
      ccoop_resplit.py

+ 169 - 0
ccoop_resplit.py

@@ -0,0 +1,169 @@
+#! /usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+import os, sys, json
+import csv
+from collections import OrderedDict
+from datetime import datetime
+import hashlib
+import locale
+locale.setlocale(locale.LC_ALL, 'fr_FR.UTF-8')
+
+class CsvStatementParser(object):
+    
+    def __init__(self):
+        self.lines = OrderedDict()
+        self.fieldnames = None
+        self.date_fieldname = "Date"
+        self.overlap_detector = {}
+        self.first_ops = {}
+        self.last_ops = {}
+        self.daterange = [datetime.now(), datetime.fromordinal(1)]
+    
+    
+    def parse(self, filename):
+        with open(filename, encoding='iso-8859-1') as csvfile:
+            reader = csv.DictReader(csvfile, delimiter=';')
+            if self.fieldnames is None:
+                # Le premier fichier parcourru détermine les noms de
+                # colonnes attendus dans les prochains fichiers.
+                self.fieldnames = [k for k in reader.fieldnames if k != '']
+                
+                # On identifie également la permière colonne qui
+                # ressemble à une date, elle servira ensuite de clef
+                # d'indexation.
+                for fname in self.fieldnames:
+                    if "date" in fname.lower():
+                        self.date_fieldname = fname
+                        break
+            
+            if self.fieldnames != [k for k in reader.fieldnames if k != '']:
+                print("""Fichier ignoré : %s. Cause: does not have the expected column names.
+  Found:    %s
+  Expected: %s
+""" % (filename, ",".join(reader.fieldnames), ",".join(self.fieldnames)))
+            else:
+                self._parse_file(filename, reader)
+    
+    
+    def _parse_file(self, filename, reader):
+        print("Lecture du fichier %s" % filename)
+        for row in reader:
+            opdate = datetime.strptime(row[self.date_fieldname], '%d/%m/%Y')
+            ophash = datetime.strftime(opdate, '%Y-%m-%d') + hashlib.md5(json.dumps(row).encode()).hexdigest()
+            self.lines[ophash] = {k:v for k,v in row.items() if k != ''}
+            # Adjust dateranges
+            if opdate < self.daterange[0]:
+                self.daterange[0] = opdate
+            if opdate > self.daterange[1]:
+                self.daterange[1] = opdate
+            # Prepare overlap detection
+            if ophash not in self.overlap_detector:
+                self.overlap_detector[ophash] = set()
+            self.overlap_detector[ophash].add(filename)
+            # Remember first line of each CSV file
+            if filename not in self.first_ops:
+                self.first_ops[filename] = ophash
+        # Remember first line of each CSV file
+        if filename not in self.last_ops:
+            self.last_ops[filename] = ophash
+    
+    
+    def dump_full(self, output_filename):
+        with open(output_filename, 'w') as outfile:
+            writer = csv.DictWriter(outfile, self.fieldnames, delimiter=';')
+            writer.writeheader()
+            for line in reversed(sorted(self.lines.items())):
+                writer.writerow(line[1])
+        print("Relevé intégral généré dans le fichier %s" % os.path.abspath(output_filename))
+    
+    
+    def dump_monthly_reports(self, outputdir):
+        firstmonth = int('{:%Y%m}'.format(self.daterange[0])) + 1
+        lastmonth  = int('{:%Y%m}'.format(self.daterange[1])) - 1
+        if firstmonth >= lastmonth:
+            print("Impossible de générer des relevés mensuels car la plage de dates traitée est trop petite.")
+            return
+        curmonth = firstmonth
+        def __openfile__(curmonth):
+            dt = datetime.strptime(str(curmonth), '%Y%m')
+            # fname = "releve_{0}__{1:_<5}_{2}.csv".format(curmonth, dt.strftime('%b'), dt.strftime('%Y'))
+            fname = "releve_{0}.csv".format(curmonth)
+            outfile = open(os.path.join(outputdir, fname), 'w')
+            writer = csv.DictWriter(outfile, self.fieldnames, delimiter=';')
+            return outfile, writer
+        outfile, writer = __openfile__(curmonth)
+        writer = csv.DictWriter(outfile, self.fieldnames, delimiter=';')
+        for line in sorted(self.lines.items()):
+            month = int(line[0][0:4] + line[0][5:7])
+            if month < curmonth:
+                continue
+            if month > lastmonth:
+                break
+            if month > curmonth:
+                outfile.close()
+                curmonth = month
+                outfile, writer = __openfile__(curmonth)
+            writer.writerow(line[1])
+        outfile.close()
+        print("Relevés mensuels générés dans le dossier %s" % os.path.abspath(outputdir))
+    
+    
+    def check_overlaps(self):
+        """
+        Helps finding possible missing operations if exported CSV files
+        are not "contiguous".
+        """
+        print("\nRecherche de chevauchements (les chevauchements de fichiers CSV c'est bien, ça rassure)...")
+        for filename, first_op in self.first_ops.items():
+            if first_op in self.overlap_detector:
+                otherfiles = [v for v in self.overlap_detector.get(first_op)]
+                otherfiles.remove(filename)
+                if len(otherfiles) > 0:
+                    # Eliminate files having the same first_op
+                    for candidate in otherfiles:
+                        if self.first_ops[candidate] == first_op:
+                            otherfiles.remove(candidate)
+                if len(otherfiles) == 0:
+                    print("Attention. Il y a peut-être des écritures manquantes après le %s (fichier %s)." % (first_op[0:10], filename))
+        
+        for filename, last_op in self.last_ops.items():
+            if last_op in self.overlap_detector:
+                otherfiles = [v for v in self.overlap_detector.get(last_op)]
+                otherfiles.remove(filename)
+                if len(otherfiles) > 0:
+                    # Eliminate files having the same last_op
+                    for candidate in otherfiles:
+                        if self.last_ops[candidate] == last_op:
+                            otherfiles.remove(candidate)
+                if len(otherfiles) == 0:
+                    print("Attention. Il y a peut-être des écritures manquantes avant le %s (fichier %s)." % (last_op[0:10], filename))
+        print("")
+
+def start_cli(dirpath):
+    # Lecture des fichiers CSV présents dans le dossier
+    p = CsvStatementParser()
+    for f in sorted(os.listdir(dirpath)):
+        if f.endswith('.csv') or f.endswith('.CSV'):
+            p.parse(f)
+    print("Les écritures lues s'étalent entre le {0:%d %B %Y} et le {1:%d %B %Y}.".format(p.daterange[0], p.daterange[1]))
+    
+    # Recherche de chevauchements
+    p.check_overlaps()
+    
+    # Création d'un dossier pour stocker les fichiers générés
+    outputdir = "output"
+    if not os.path.isdir(outputdir):
+        os.makedirs(outputdir)
+    
+    # Générer un relevé intégral et des relevés mensuels
+    p.dump_full(os.path.join(outputdir, "integral.csv"))
+    p.dump_monthly_reports(outputdir)
+
+if __name__ == '__main__':
+    if len(sys.argv) < 2:
+        print("Erreur. Merci de préciser le chemin du dossier où se trouvent les fichiers CSV à analyser.")
+        sys.exit(1)
+    inputdir = sys.argv[1]
+    start_cli(inputdir)
+