#! /usr/bin/env python3 # -*- coding: utf-8 -*- import os, sys, json import csv from collections import OrderedDict from datetime import datetime import hashlib import locale locale.setlocale(locale.LC_ALL, 'fr_FR.UTF-8') class CsvStatementParser(object): def __init__(self): self.lines = OrderedDict() self.fieldnames = None self.date_fieldname = "Date" self.overlap_detector = {} self.first_ops = {} self.last_ops = {} self.daterange = [datetime.now(), datetime.fromordinal(1)] def parse(self, filename): with open(filename, encoding='iso-8859-1') as csvfile: reader = csv.DictReader(csvfile, delimiter=';') if self.fieldnames is None: # Le premier fichier parcourru détermine les noms de # colonnes attendus dans les prochains fichiers. self.fieldnames = [k for k in reader.fieldnames if k != ''] # On identifie également la permière colonne qui # ressemble à une date, elle servira ensuite de clef # d'indexation. for fname in self.fieldnames: if "date" in fname.lower(): self.date_fieldname = fname break if self.fieldnames != [k for k in reader.fieldnames if k != '']: print("""Fichier ignoré : %s. Cause: does not have the expected column names. Found: %s Expected: %s """ % (filename, ",".join(reader.fieldnames), ",".join(self.fieldnames))) else: self._parse_file(filename, reader) def _parse_file(self, filename, reader): print("Lecture du fichier %s" % os.path.basename(filename)) for row in reader: opdate = datetime.strptime(row[self.date_fieldname], '%d/%m/%Y') ophash = datetime.strftime(opdate, '%Y-%m-%d') + hashlib.md5(json.dumps(row).encode()).hexdigest() self.lines[ophash] = {k:v for k,v in row.items() if k != ''} # Adjust dateranges if opdate < self.daterange[0]: self.daterange[0] = opdate if opdate > self.daterange[1]: self.daterange[1] = opdate # Prepare overlap detection if ophash not in self.overlap_detector: self.overlap_detector[ophash] = set() self.overlap_detector[ophash].add(filename) # Remember first line of each CSV file if filename not in self.first_ops: self.first_ops[filename] = ophash # Remember first line of each CSV file if filename not in self.last_ops: self.last_ops[filename] = ophash # CSV files are sometimes sorted by date ASC and sometimes # sorted by date DESC. So we may need to swap first_op and last_op. if (int(self.first_ops[filename][0:10].replace('-', '')) > int(self.last_ops[filename][0:10].replace('-', ''))): tmp = self.first_ops[filename] self.first_ops[filename] = self.last_ops[filename] self.last_ops[filename] = tmp def dump_full(self, output_filename): with open(output_filename, 'w') as outfile: writer = csv.DictWriter(outfile, self.fieldnames, delimiter=';') writer.writeheader() for line in reversed(sorted(self.lines.items())): writer.writerow(line[1]) print("Relevé intégral généré dans le fichier %s" % os.path.abspath(output_filename)) def dump_monthly_reports(self, outputdir): firstmonth = int('{:%Y%m}'.format(self.daterange[0])) + 1 lastmonth = int('{:%Y%m}'.format(self.daterange[1])) - 1 if firstmonth >= lastmonth: print("Impossible de générer des relevés mensuels car la plage de dates traitée est trop petite.") return curmonth = firstmonth def __openfile__(curmonth): dt = datetime.strptime(str(curmonth), '%Y%m') # fname = "releve_{0}__{1:_<5}_{2}.csv".format(curmonth, dt.strftime('%b'), dt.strftime('%Y')) fname = "releve_{0}.csv".format(curmonth) outfile = open(os.path.join(outputdir, fname), 'w') writer = csv.DictWriter(outfile, self.fieldnames, delimiter=';') return outfile, writer outfile, writer = __openfile__(curmonth) writer = csv.DictWriter(outfile, self.fieldnames, delimiter=';') for line in sorted(self.lines.items()): month = int(line[0][0:4] + line[0][5:7]) if month < curmonth: continue if month > lastmonth: break if month > curmonth: outfile.close() curmonth = month outfile, writer = __openfile__(curmonth) writer.writerow(line[1]) outfile.close() print("Relevés mensuels générés dans le dossier %s" % os.path.abspath(outputdir)) def check_overlaps(self): """ Helps finding possible missing operations if exported CSV files are not "contiguous". """ print("\nRecherche de chevauchements (les chevauchements de fichiers CSV c'est bien, ça rassure)...") for filename, first_op in self.first_ops.items(): if first_op in self.overlap_detector: otherfiles = [v for v in self.overlap_detector.get(first_op)] otherfiles.remove(filename) if len(otherfiles) > 0: # Eliminate files having the same first_op for candidate in otherfiles: if self.first_ops[candidate] == first_op: otherfiles.remove(candidate) if len(otherfiles) == 0: print("Attention. Il y a peut-être des écritures manquantes avant le %s (fichier %s)." % (first_op[0:10], os.path.basename(filename))) for filename, last_op in self.last_ops.items(): if last_op in self.overlap_detector: otherfiles = [v for v in self.overlap_detector.get(last_op)] otherfiles.remove(filename) if len(otherfiles) > 0: # Eliminate files having the same last_op for candidate in otherfiles: if self.last_ops[candidate] == last_op: otherfiles.remove(candidate) if len(otherfiles) == 0: print("Attention. Il y a peut-être des écritures manquantes après le %s (fichier %s)." % (last_op[0:10], os.path.basename(filename))) print("") def start_cli(dirpath): # Lecture des fichiers CSV présents dans le dossier p = CsvStatementParser() for f in sorted(os.listdir(dirpath)): if f.endswith('.csv') or f.endswith('.CSV'): p.parse(os.path.join(dirpath, f)) print("Les écritures lues s'étalent entre le {0:%d %B %Y} et le {1:%d %B %Y}.".format(p.daterange[0], p.daterange[1])) # Recherche de chevauchements p.check_overlaps() # Création d'un dossier pour stocker les fichiers générés outputdir = os.path.join(dirpath, "output") if not os.path.isdir(outputdir): os.makedirs(outputdir) # Générer un relevé intégral et des relevés mensuels p.dump_full(os.path.join(outputdir, "integral.csv")) p.dump_monthly_reports(outputdir) if __name__ == '__main__': if len(sys.argv) < 2: print("Erreur. Merci de préciser le chemin du dossier où se trouvent les fichiers CSV à analyser.") sys.exit(1) inputdir = sys.argv[1] start_cli(inputdir)