# -*- coding: utf-8 -*- from __future__ import unicode_literals # Standard python libs import csv import datetime import re import os import json import logging # Django specific imports from argparse import RawTextHelpFormatter from django.core.management.base import BaseCommand, CommandError # Coin specific imports from coin.members.models import Member from coin.billing.models import Payment # Parser / import / matcher configuration # The CSV delimiter DELIMITER=str(';') # The date format in the CSV DATE_FORMAT="%d/%m/%Y" # The default regex used to match the label of a payment with a member ID ID_REGEX=r"(?i)(\b|_)ID[\s\-\_\/]*(\d+)(\b|_)" # If the label of the payment contains one of these, the payment won't be # matched to a member when importing it. KEYWORDS_TO_NOTMATCH=[ "DON", "MECENAT", "REM CHQ" ] class Command(BaseCommand): help = """ Import payments from a CSV file from a bank. The payments will automatically be parsed, and there'll be an attempt to automatically match payments with members. The matching is performed using the label of the payment. - First, try to find a string such as 'ID-42' where 42 is the member's ID - Second (if no ID found), try to find a member username (with no ambiguity with respect to other usernames) - Third (if no username found), try to find a member family name (with no ambiguity with respect to other family name) This script will check if a payment has already been registered with same properies (date, label, price) to avoid creating duplicate payments inside coin. By default, only a dry-run is perfomed to let you see what will happen ! You should run this command with --commit if you agree with the dry-run.""" def create_parser(self, *args, **kwargs): parser = super(Command, self).create_parser(*args, **kwargs) parser.formatter_class = RawTextHelpFormatter return parser def add_arguments(self, parser): parser.add_argument( 'filename', type=str, help="The CSV filename to be parsed" ) parser.add_argument( '--commit', action='store_true', dest='commit', default=False, help='Agree with the proposed change and commit them' ) def handle(self, *args, **options): assert options["filename"] != "" if not os.path.isfile(options["filename"]): raise CommandError("This file does not exists.") payments = self.convertCSVToDicts(self.cleanCSV(self.loadCSV(options["filename"]))) payments = self.tryToMatchPaymentWithMembers(payments) newPayments = self.filterAlreadyKnownPayments(payments) newPayments = self.unmatchPaymentWithKeywords(newPayments) numberOfAlreadyKnownPayments = len(payments)-len(newPayments) numberOfNewPayments = len(newPayments) if (numberOfNewPayments > 0) : print "======================================================" print " > New payments found" print json.dumps(newPayments, indent=4, separators=(',', ': ')) print "======================================================" print "Number of already known payments found : " + str(numberOfAlreadyKnownPayments) print "Number of new payments found : " + str(numberOfNewPayments) print "Number of new payments matched : " + str(len([p for p in newPayments if p["memberMatched"]])) print "Number of payments not matched : " + str(len([p for p in newPayments if not p["memberMatched"]])) print "======================================================" if numberOfNewPayments == 0: print "Nothing to do, everything looks up to date !" return if not options["commit"]: print "Please carefully review the matches, then if everything \n" \ "looks alright, use --commit to register these new payments." else: self.addNewPayments(newPayments) def isDate(self, text): try: datetime.datetime.strptime(text, DATE_FORMAT) return True except ValueError: return False def isMoneyAmount(self, text): try: float(text.replace(",",".")) return True except ValueError: return False def loadCSV(self, filename): with open(filename, "r") as f: return list(csv.reader(f, delimiter=DELIMITER)) def cleanCSV(self, data): output = [] for i, row in enumerate(data): for j in range(len(row)): row[j] = row[j].decode('utf-8') if len(row) < 4: continue if not self.isDate(row[0]): logging.warning("Ignoring the following row (bad format for date in the first column) :") logging.warning(str(row)) continue if self.isMoneyAmount(row[2]): logging.warning("Ignoring row %s (not a payment)" % str(i)) logging.warning(str(row)) continue if not self.isMoneyAmount(row[3]): logging.warning("Ignoring the following row (bad format for money amount in colun three) :") logging.warning(str(row)) continue # Clean the date row[0] = datetime.datetime.strptime(row[0], DATE_FORMAT).strftime("%Y-%m-%d") # Clean the label ... row[4] = row[4].replace('\r', ' ') row[4] = row[4].replace('\n', ' ') output.append(row) return output def convertCSVToDicts(self, data): output = [] for row in data: payment = {} payment["date"] = row[0] payment["label"] = row[4] payment["amount"] = float(row[3].replace(",",".")) output.append(payment) return output def tryToMatchPaymentWithMembers(self, payments): members = Member.objects.filter(status="member") idregex = re.compile(ID_REGEX) for payment in payments: paymentLabel = payment["label"] # First, attempt to match the member ID idmatches = idregex.findall(paymentLabel) if len(idmatches) == 1: i = int(idmatches[0][1]) memberMatches = [ member.username for member in members if member.pk==i ] if len(memberMatches) == 1: payment["memberMatched"] = memberMatches[0] #print "Matched by ID to "+memberMatches[0] continue # Second, attempt to find the username usernamematch = None for member in members: matches = re.compile(r"(?i)(\b|_)"+re.escape(member.username)+r"(\b|_)") \ .findall(paymentLabel) # If not found, try next if len(matches) == 0: continue # If we already had a match, abort the whole search because we # have multiple usernames matched ! if usernamematch != None: usernamematch = None break usernamematch = member.username if usernamematch != None: payment["memberMatched"] = usernamematch #print "Matched by username to "+usernamematch continue # Third, attempt to match by family name familynamematch = None for member in members: matches = re.compile(r"(?i)(\b|_)"+re.escape(str(member.last_name))+r"(\b|_)") \ .findall(paymentLabel) # If not found, try next if len(matches) == 0: continue # If this familyname was matched several time, abort the whole search if len(matches) > 1: familynamematch = None break # If we already had a match, abort the whole search because we # have multiple familynames matched ! if familynamematch != None: familynamematch = None break familynamematch = str(member.last_name) if familynamematch != None: payment["memberMatched"] = familynamematch #print "Matched by familyname to "+familynamematch continue #print "Could not match" payment["memberMatched"] = None return payments def unmatchPaymentWithKeywords(self, payments): matchers = {} for keyword in KEYWORDS_TO_NOTMATCH: matchers[keyword] = re.compile(r"(?i)(\b|_|-)"+re.escape(keyword)+r"(\b|_|-)") for i, payment in enumerate(payments): # If no match found, don't filter anyway if payment["memberMatched"] == None: continue for keyword, matcher in matchers.items(): matches = matcher.findall(payment["label"]) # If not found, try next if len(matches) == 0: continue print "Ignoring possible match for payment '%s' because " \ "it contains the keyword %s" \ % (payment["label"], keyword) payments[i]["memberMatched"] = None break return payments def filterAlreadyKnownPayments(self, payments): newPayments = [] knownPayments = Payment.objects.all() for payment in payments: foundMatch = False for knownPayment in knownPayments: if (str(knownPayment.date) == payment["date"].encode('utf-8')) \ and (knownPayment.label == payment["label"]) \ and (float(knownPayment.amount) == float(payment["amount"])): foundMatch = True break if not foundMatch: newPayments.append(payment) return newPayments def addNewPayments(self, newPayments): for newPayment in newPayments: # Get the member if there's a member matched member = None if newPayment["memberMatched"]: member = Member.objects.filter(username=newPayment["memberMatched"]) assert len(member) == 1 member = member[0] print "Adding new payment : " print newPayment # Create the payment payment = Payment.objects.create(amount=float(newPayment["amount"]), label=newPayment["label"].encode('utf-8'), date=newPayment["date"].encode('utf-8'), member=member)