123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- # -*- coding: utf-8 -*-
- """
- Import payments from a CSV file from a bank. The payments will automatically be
- parsed, and there'll be an attempt to automatically match payments with members.
- The matching is performed using the label of the payment.
- - First, try to find a string such as 'ID-42' where 42 is the member's ID
- - Second (if no ID found), try to find a member username (with no ambiguity with
- respect to other usernames)
- - Third (if no username found), try to find a member family name (with no
- ambiguity with respect to other family name)
- This script will check if a payment has already been registered with same
- properies (date, label, price) to avoid creating duplicate payments inside coin.
- By default, only a dry-run is perfomed to let you see what will happen ! You
- should run this command with --commit if you agree with the dry-run.
- """
- from __future__ import unicode_literals
- # Standard python libs
- import csv
- import datetime
- import json
- import logging
- import os
- import re
- # Django specific imports
- from argparse import RawTextHelpFormatter
- from django.core.management.base import BaseCommand, CommandError
- # Coin specific imports
- from coin.members.models import Member
- from coin.billing.models import Payment
- # Parser / import / matcher configuration
- # The CSV delimiter
- DELIMITER=str(';')
- # The date format in the CSV
- DATE_FORMAT="%d/%m/%Y"
- # The default regex used to match the label of a payment with a member ID
- ID_REGEX=r"(?i)(\b|_)ID[\s\-\_\/]*(\d+)(\b|_)"
- # If the label of the payment contains one of these, the payment won't be
- # matched to a member when importing it.
- KEYWORDS_TO_NOTMATCH=[ "DON", "MECENAT", "REM CHQ" ]
- class Command(BaseCommand):
- help = __doc__
- def create_parser(self, *args, **kwargs):
- parser = super(Command, self).create_parser(*args, **kwargs)
- parser.formatter_class = RawTextHelpFormatter
- return parser
- def add_arguments(self, parser):
- parser.add_argument(
- 'filename',
- type=str,
- help="The CSV filename to be parsed"
- )
- parser.add_argument(
- '--commit',
- action='store_true',
- dest='commit',
- default=False,
- help='Agree with the proposed change and commit them'
- )
- def handle(self, *args, **options):
- assert options["filename"] != ""
- if not os.path.isfile(options["filename"]):
- raise CommandError("This file does not exists.")
- payments = self.convert_csv_to_dicts(self.clean_csv(self.load_csv(options["filename"])))
- payments = self.try_to_match_payment_with_members(payments)
- new_payments = self.filter_already_known_payments(payments)
- new_payments = self.unmatch_payment_with_keywords(new_payments)
- number_of_already_known_payments = len(payments)-len(new_payments)
- number_of_new_payments = len(new_payments)
- if (number_of_new_payments > 0) :
- print("======================================================")
- print(" > New payments found")
- print(json.dumps(new_payments, indent=4, separators=(',', ': ')))
- print("======================================================")
- print("Number of already known payments found : " + str(number_of_already_known_payments))
- print("Number of new payments found : " + str(number_of_new_payments))
- print("Number of new payments matched : " + str(len([p for p in new_payments if p["member_matched"]])))
- print("Number of payments not matched : " + str(len([p for p in new_payments if not p["member_matched"]])))
- print("======================================================")
- if number_of_new_payments == 0:
- print("Nothing to do, everything looks up to date !")
- return
- if not options["commit"]:
- print("Please carefully review the matches, then if everything \n" \
- "looks alright, use --commit to register these new payments.")
- else:
- self.add_new_payments(new_payments)
- def is_date(self, text):
- try:
- datetime.datetime.strptime(text, DATE_FORMAT)
- return True
- except ValueError:
- return False
- def is_money_amount(self, text):
- try:
- float(text.replace(",","."))
- return True
- except ValueError:
- return False
- def load_csv(self, filename):
- with open(filename, "r") as f:
- return list(csv.reader(f, delimiter=DELIMITER))
- def clean_csv(self, data):
- output = []
- for i, row in enumerate(data):
- for j in range(len(row)):
- row[j] = row[j].decode('utf-8')
- if len(row) < 4:
- continue
- if not self.is_date(row[0]):
- logging.warning("Ignoring the following row (bad format for date in the first column) :")
- logging.warning(str(row))
- continue
- if self.is_money_amount(row[2]):
- logging.warning("Ignoring row %s (not a payment)" % str(i))
- logging.warning(str(row))
- continue
- if not self.is_money_amount(row[3]):
- logging.warning("Ignoring the following row (bad format for money amount in colun three) :")
- logging.warning(str(row))
- continue
- # Clean the date
- row[0] = datetime.datetime.strptime(row[0], DATE_FORMAT).strftime("%Y-%m-%d")
- # Clean the label ...
- row[4] = row[4].replace('\r', ' ')
- row[4] = row[4].replace('\n', ' ')
- output.append(row)
- return output
- def convert_csv_to_dicts(self, data):
- output = []
- for row in data:
- payment = {}
- payment["date"] = row[0]
- payment["label"] = row[4]
- payment["amount"] = float(row[3].replace(",","."))
- output.append(payment)
- return output
- def try_to_match_payment_with_members(self, payments):
- members = Member.objects.filter(status="member")
- idregex = re.compile(ID_REGEX)
- for payment in payments:
- payment_label = payment["label"]
- # First, attempt to match the member ID
- idmatches = idregex.findall(payment_label)
- if len(idmatches) == 1:
- i = int(idmatches[0][1])
- member_matches = [ member.username for member in members if member.pk==i ]
- if len(member_matches) == 1:
- payment["member_matched"] = member_matches[0]
- #print("Matched by ID to "+member_matches[0])
- continue
- # Second, attempt to find the username
- usernamematch = None
- for member in members:
- matches = re.compile(r"(?i)(\b|_)"+re.escape(member.username)+r"(\b|_)") \
- .findall(payment_label)
- # If not found, try next
- if len(matches) == 0:
- continue
- # If we already had a match, abort the whole search because we
- # have multiple usernames matched !
- if usernamematch != None:
- usernamematch = None
- break
- usernamematch = member.username
- if usernamematch != None:
- payment["member_matched"] = usernamematch
- #print("Matched by username to "+usernamematch)
- continue
- # Third, attempt to match by family name
- familynamematch = None
- for member in members:
- if member.last_name == "":
- continue
- matches = re.compile(r"(?i)(\b|_)"+re.escape(str(member.last_name))+r"(\b|_)") \
- .findall(payment_label)
- # If not found, try next
- if len(matches) == 0:
- continue
- # If this familyname was matched several time, abort the whole search
- if len(matches) > 1:
- familynamematch = None
- break
- # If we already had a match, abort the whole search because we
- # have multiple familynames matched !
- if familynamematch != None:
- familynamematch = None
- break
- familynamematch = str(member.last_name)
- usernamematch = str(member.username)
- if familynamematch != None:
- payment["member_matched"] = usernamematch
- #print("Matched by familyname to "+familynamematch)
- continue
- #print("Could not match")
- payment["member_matched"] = None
- return payments
- def unmatch_payment_with_keywords(self, payments):
- matchers = {}
- for keyword in KEYWORDS_TO_NOTMATCH:
- matchers[keyword] = re.compile(r"(?i)(\b|_|-)"+re.escape(keyword)+r"(\b|_|-)")
- for i, payment in enumerate(payments):
- # If no match found, don't filter anyway
- if payment["member_matched"] == None:
- continue
- for keyword, matcher in matchers.items():
- matches = matcher.findall(payment["label"])
- # If not found, try next
- if len(matches) == 0:
- continue
- print("Ignoring possible match for payment '%s' because " \
- "it contains the keyword %s" \
- % (payment["label"], keyword))
- payments[i]["member_matched"] = None
- break
- return payments
- def filter_already_known_payments(self, payments):
- new_payments = []
- known_payments = Payment.objects.all()
- for payment in payments:
- found_match = False
- for known_payment in known_payments:
- if (str(known_payment.date) == payment["date"].encode('utf-8')) \
- and (known_payment.label == payment["label"]) \
- and (float(known_payment.amount) == float(payment["amount"])):
- found_match = True
- break
- if not found_match:
- new_payments.append(payment)
- return new_payments
- def add_new_payments(self, new_payments):
- for new_payment in new_payments:
- # Get the member if there's a member matched
- member = None
- if new_payment["member_matched"]:
- member = Member.objects.filter(username=new_payment["member_matched"])
- assert len(member) == 1
- member = member[0]
- print("Adding new payment : ")
- print(new_payment)
- # Create the payment
- payment = Payment.objects.create(amount=float(new_payment["amount"]),
- label=new_payment["label"],
- date=new_payment["date"],
- member=member)
|