#! /usr/bin/env python3 # -*- coding: utf-8 -*- import os, sys, json import csv from collections import OrderedDict from datetime import datetime import hashlib import locale locale.setlocale(locale.LC_ALL, 'fr_FR.UTF-8') class CsvStatementParser(object): def __init__(self): self.lines = OrderedDict() self.fieldnames = None self.date_fieldname = "Date" self.overlap_detector = {} self.first_ops = {} self.last_ops = {} self.daterange = [datetime.now(), datetime.fromordinal(1)] def parse(self, filename): with open(filename, encoding='iso-8859-1') as csvfile: reader = csv.DictReader(csvfile, delimiter=';') if self.fieldnames is None: # Le premier fichier parcourru détermine les noms de # colonnes attendus dans les prochains fichiers. self.fieldnames = [k for k in reader.fieldnames if k != ''] # On identifie également la permière colonne qui # ressemble à une date, elle servira ensuite de clef # d'indexation. for fname in self.fieldnames: if "date" in fname.lower(): self.date_fieldname = fname break if self.fieldnames != [k for k in reader.fieldnames if k != '']: print("""Fichier ignoré : %s. Cause: does not have the expected column names. Found: %s Expected: %s """ % (filename, ",".join(reader.fieldnames), ",".join(self.fieldnames))) else: self._parse_file(filename, reader) def _parse_file(self, filename, reader): print("Lecture du fichier %s" % os.path.basename(filename)) for row in reader: opdate = datetime.strptime(row[self.date_fieldname], '%d/%m/%Y') ophash = datetime.strftime(opdate, '%Y-%m-%d') + hashlib.md5(json.dumps(row).encode()).hexdigest() self.lines[ophash] = {k:v for k,v in row.items() if k != ''} # Adjust dateranges if opdate < self.daterange[0]: self.daterange[0] = opdate if opdate > self.daterange[1]: self.daterange[1] = opdate # Prepare overlap detection if ophash not in self.overlap_detector: self.overlap_detector[ophash] = set() self.overlap_detector[ophash].add(filename) # Remember first line of each CSV file if filename not in self.first_ops: self.first_ops[filename] = ophash # Remember first line of each CSV file if filename not in self.last_ops: self.last_ops[filename] = ophash # CSV files are sometimes sorted by date ASC and sometimes # sorted by date DESC. So we may need to swap first_op and last_op. if (int(self.first_ops[filename][0:10].replace('-', '')) > int(self.last_ops[filename][0:10].replace('-', ''))): tmp = self.first_ops[filename] self.first_ops[filename] = self.last_ops[filename] self.last_ops[filename] = tmp def dump_full(self, output_filename): with open(output_filename, 'w') as outfile: writer = csv.DictWriter(outfile, self.fieldnames, delimiter=';') writer.writeheader() for line in reversed(sorted(self.lines.items())): writer.writerow(line[1]) print("Relevé intégral généré dans le fichier %s" % os.path.abspath(output_filename)) def dump_monthly_reports(self, outputdir): firstmonth = int('{:%Y%m}'.format(self.daterange[0])) + 1 lastmonth = int('{:%Y%m}'.format(self.daterange[1])) - 1 if firstmonth >= lastmonth: print("Impossible de générer des relevés mensuels car la plage de dates traitée est trop petite.") return curmonth = firstmonth def __openfile__(curmonth): fname = "releve_{0}.csv".format(curmonth) outfile = open(os.path.join(outputdir, fname), 'w') writer = csv.DictWriter(outfile, self.fieldnames, delimiter=';') writer.writeheader() return outfile, writer outfile, writer = __openfile__(curmonth) writer = csv.DictWriter(outfile, self.fieldnames, delimiter=';') for line in sorted(self.lines.items()): month = int(line[0][0:4] + line[0][5:7]) if month < curmonth: continue if month > lastmonth: break if month > curmonth: outfile.close() curmonth = month if month in self.badmonths: outfile, writer = __openfile__(str(curmonth) + "_potentiellement_incomplet") else: outfile, writer = __openfile__(curmonth) writer.writerow(line[1]) outfile.close() print("Relevés mensuels générés dans le dossier %s" % os.path.abspath(outputdir)) def check_overlaps(self): """ Helps finding possible missing operations if exported CSV files are not "contiguous". """ self.badmonths = set() print("\nRecherche de chevauchements (les chevauchements de fichiers CSV c'est bien, ça rassure)...") for filename, first_op in self.first_ops.items(): if first_op in self.overlap_detector: otherfiles = [v for v in self.overlap_detector.get(first_op)] otherfiles.remove(filename) if len(otherfiles) > 0: # Eliminate files having the same first_op for candidate in otherfiles: if self.first_ops[candidate] == first_op: otherfiles.remove(candidate) if len(otherfiles) == 0: self.badmonths.add(int(first_op[0:7].replace('-', ''))) print("Attention. Il y a peut-être des écritures manquantes avant le %s (fichier %s)." % (first_op[0:10], os.path.basename(filename))) for filename, last_op in self.last_ops.items(): if last_op in self.overlap_detector: otherfiles = [v for v in self.overlap_detector.get(last_op)] otherfiles.remove(filename) if len(otherfiles) > 0: # Eliminate files having the same last_op for candidate in otherfiles: if self.last_ops[candidate] == last_op: otherfiles.remove(candidate) if len(otherfiles) == 0: self.badmonths.add(int(last_op[0:7].replace('-', ''))) print("Attention. Il y a peut-être des écritures manquantes après le %s (fichier %s)." % (last_op[0:10], os.path.basename(filename))) print("") def start_cli(dirpath): # Lecture des fichiers CSV présents dans le dossier p = CsvStatementParser() for f in sorted(os.listdir(dirpath)): if f.endswith('.csv') or f.endswith('.CSV'): p.parse(os.path.join(dirpath, f)) print("Les écritures lues s'étalent entre le {0:%d %B %Y} et le {1:%d %B %Y}.".format(p.daterange[0], p.daterange[1])) # Recherche de chevauchements p.check_overlaps() # Création d'un dossier pour stocker les fichiers générés outputdir = os.path.join(dirpath, "output") if not os.path.isdir(outputdir): os.makedirs(outputdir) # Générer un relevé intégral et des relevés mensuels suffix = "_{0:%Y-%m-%d}__{1:%Y-%m-%d}".format(p.daterange[0], p.daterange[1]) if len(p.badmonths): suffix += "_avec_des_trous" p.dump_full(os.path.join(outputdir, "integral%s.csv" % suffix)) p.dump_monthly_reports(outputdir) if __name__ == '__main__': if len(sys.argv) < 2: print("Erreur. Merci de préciser le chemin du dossier où se trouvent les fichiers CSV à analyser.") sys.exit(1) inputdir = sys.argv[1] start_cli(inputdir)