123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- #! /usr/bin/env python3
- # -*- coding: utf-8 -*-
- import os, sys, json
- import csv
- from collections import OrderedDict
- from datetime import datetime
- import hashlib
- import locale
- locale.setlocale(locale.LC_ALL, 'fr_FR.UTF-8')
- class CsvStatementParser(object):
-
- def __init__(self):
- self.lines = OrderedDict()
- self.fieldnames = None
- self.date_fieldname = "Date"
- self.overlap_detector = {}
- self.first_ops = {}
- self.last_ops = {}
- self.daterange = [datetime.now(), datetime.fromordinal(1)]
-
-
- def parse(self, filename):
- with open(filename, encoding='iso-8859-1') as csvfile:
- reader = csv.DictReader(csvfile, delimiter=';')
- if self.fieldnames is None:
- # Le premier fichier parcourru détermine les noms de
- # colonnes attendus dans les prochains fichiers.
- self.fieldnames = [k for k in reader.fieldnames if k != '']
-
- # On identifie également la permière colonne qui
- # ressemble à une date, elle servira ensuite de clef
- # d'indexation.
- for fname in self.fieldnames:
- if "date" in fname.lower():
- self.date_fieldname = fname
- break
-
- if self.fieldnames != [k for k in reader.fieldnames if k != '']:
- print("""Fichier ignoré : %s. Cause: does not have the expected column names.
- Found: %s
- Expected: %s
- """ % (filename, ",".join(reader.fieldnames), ",".join(self.fieldnames)))
- else:
- self._parse_file(filename, reader)
-
-
- def _parse_file(self, filename, reader):
- print("Lecture du fichier %s" % os.path.basename(filename))
- for row in reader:
- opdate = datetime.strptime(row[self.date_fieldname], '%d/%m/%Y')
- ophash = datetime.strftime(opdate, '%Y-%m-%d') + hashlib.md5(json.dumps(row).encode()).hexdigest()
- self.lines[ophash] = {k:v for k,v in row.items() if k != ''}
- # Adjust dateranges
- if opdate < self.daterange[0]:
- self.daterange[0] = opdate
- if opdate > self.daterange[1]:
- self.daterange[1] = opdate
- # Prepare overlap detection
- if ophash not in self.overlap_detector:
- self.overlap_detector[ophash] = set()
- self.overlap_detector[ophash].add(filename)
- # Remember first line of each CSV file
- if filename not in self.first_ops:
- self.first_ops[filename] = ophash
- # Remember first line of each CSV file
- if filename not in self.last_ops:
- self.last_ops[filename] = ophash
- # CSV files are sometimes sorted by date ASC and sometimes
- # sorted by date DESC. So we may need to swap first_op and last_op.
- if (int(self.first_ops[filename][0:10].replace('-', '')) > int(self.last_ops[filename][0:10].replace('-', ''))):
- tmp = self.first_ops[filename]
- self.first_ops[filename] = self.last_ops[filename]
- self.last_ops[filename] = tmp
-
-
- def dump_full(self, output_filename):
- with open(output_filename, 'w') as outfile:
- writer = csv.DictWriter(outfile, self.fieldnames, delimiter=';')
- writer.writeheader()
- for line in reversed(sorted(self.lines.items())):
- writer.writerow(line[1])
- print("Relevé intégral généré dans le fichier %s" % os.path.abspath(output_filename))
-
-
- def dump_monthly_reports(self, outputdir):
- firstmonth = int('{:%Y%m}'.format(self.daterange[0])) + 1
- lastmonth = int('{:%Y%m}'.format(self.daterange[1])) - 1
- if firstmonth >= lastmonth:
- print("Impossible de générer des relevés mensuels car la plage de dates traitée est trop petite.")
- return
- curmonth = firstmonth
- def __openfile__(curmonth):
- fname = "releve_{0}.csv".format(curmonth)
- outfile = open(os.path.join(outputdir, fname), 'w')
- writer = csv.DictWriter(outfile, self.fieldnames, delimiter=';')
- return outfile, writer
- outfile, writer = __openfile__(curmonth)
- writer = csv.DictWriter(outfile, self.fieldnames, delimiter=';')
- for line in sorted(self.lines.items()):
- month = int(line[0][0:4] + line[0][5:7])
- if month < curmonth:
- continue
- if month > lastmonth:
- break
- if month > curmonth:
- outfile.close()
- curmonth = month
- if month in self.badmonths:
- outfile, writer = __openfile__(str(curmonth) + "_potentiellement_incomplet")
- else:
- outfile, writer = __openfile__(curmonth)
-
- writer.writerow(line[1])
- outfile.close()
- print("Relevés mensuels générés dans le dossier %s" % os.path.abspath(outputdir))
-
-
- def check_overlaps(self):
- """
- Helps finding possible missing operations if exported CSV files
- are not "contiguous".
- """
- self.badmonths = set()
- print("\nRecherche de chevauchements (les chevauchements de fichiers CSV c'est bien, ça rassure)...")
- for filename, first_op in self.first_ops.items():
- if first_op in self.overlap_detector:
- otherfiles = [v for v in self.overlap_detector.get(first_op)]
- otherfiles.remove(filename)
- if len(otherfiles) > 0:
- # Eliminate files having the same first_op
- for candidate in otherfiles:
- if self.first_ops[candidate] == first_op:
- otherfiles.remove(candidate)
- if len(otherfiles) == 0:
- self.badmonths.add(int(first_op[0:7].replace('-', '')))
- print("Attention. Il y a peut-être des écritures manquantes avant le %s (fichier %s)." % (first_op[0:10], os.path.basename(filename)))
-
- for filename, last_op in self.last_ops.items():
- if last_op in self.overlap_detector:
- otherfiles = [v for v in self.overlap_detector.get(last_op)]
- otherfiles.remove(filename)
- if len(otherfiles) > 0:
- # Eliminate files having the same last_op
- for candidate in otherfiles:
- if self.last_ops[candidate] == last_op:
- otherfiles.remove(candidate)
- if len(otherfiles) == 0:
- self.badmonths.add(int(last_op[0:7].replace('-', '')))
- print("Attention. Il y a peut-être des écritures manquantes après le %s (fichier %s)." % (last_op[0:10], os.path.basename(filename)))
- print("")
- def start_cli(dirpath):
- # Lecture des fichiers CSV présents dans le dossier
- p = CsvStatementParser()
- for f in sorted(os.listdir(dirpath)):
- if f.endswith('.csv') or f.endswith('.CSV'):
- p.parse(os.path.join(dirpath, f))
- print("Les écritures lues s'étalent entre le {0:%d %B %Y} et le {1:%d %B %Y}.".format(p.daterange[0], p.daterange[1]))
-
- # Recherche de chevauchements
- p.check_overlaps()
-
- # Création d'un dossier pour stocker les fichiers générés
- outputdir = os.path.join(dirpath, "output")
- if not os.path.isdir(outputdir):
- os.makedirs(outputdir)
-
- # Générer un relevé intégral et des relevés mensuels
- suffix = "_{0:%Y-%m-%d}__{1:%Y-%m-%d}".format(p.daterange[0], p.daterange[1])
- if len(p.badmonths): suffix += "_avec_des_trous"
- p.dump_full(os.path.join(outputdir, "integral%s.csv" % suffix))
- p.dump_monthly_reports(outputdir)
- if __name__ == '__main__':
- if len(sys.argv) < 2:
- print("Erreur. Merci de préciser le chemin du dossier où se trouvent les fichiers CSV à analyser.")
- sys.exit(1)
- inputdir = sys.argv[1]
- start_cli(inputdir)
-
|