Browse Source

Factorize the code that manage the CSV import in two parts : the 'general' part and the CLI interface part

Alexandre Aubin 6 years ago
parent
commit
f251d17fe3

+ 297 - 0
coin/billing/import_payments_from_csv.py

@@ -0,0 +1,297 @@
+# -*- coding: utf-8 -*-
+"""
+Import payments from a CSV file from a bank.  The payments will automatically be
+parsed, and there'll be an attempt to automatically match payments with members.
+
+The matching is performed using the label of the payment.
+- First, try to find a string such as 'ID-42' where 42 is the member's ID
+- Second (if no ID found), try to find a member username (with no ambiguity with
+  respect to other usernames)
+- Third (if no username found), try to find a member family name (with no
+  ambiguity with respect to other family name)
+
+This script will check if a payment has already been registered with same
+properies (date, label, price) to avoid creating duplicate payments inside coin.
+
+By default, only a dry-run is perfomed to let you see what will happen ! You
+should run this command with --commit if you agree with the dry-run.
+"""
+
+from __future__ import unicode_literals
+
+# Standard python libs
+import csv
+import datetime
+import logging
+import re
+
+import unidecode
+
+# Coin specific imports
+from coin.members.models import Member
+from coin.billing.models import Payment
+
+# Parser / import / matcher configuration
+
+# The CSV delimiter
+DELIMITER = str(';')
+# The date format in the CSV
+DATE_FORMAT = "%d/%m/%Y"
+# The default regex used to match the label of a payment with a member ID
+ID_REGEX = r"(?i)(\b|_)ID[\s\-\_\/]*(\d+)(\b|_)"
+# If the label of the payment contains one of these, the payment won't be
+# matched to a member when importing it.
+KEYWORDS_TO_NOTMATCH = ["REM CHQ"]
+
+################################################################################
+
+
+def process(f):
+
+    raw_csv = list(csv.reader(f, delimiter=DELIMITER))
+    cleaned_csv = clean_csv(raw_csv)
+
+    payments = convert_csv_to_dicts(cleaned_csv)
+
+    payments = try_to_match_payment_with_members(payments)
+    new_payments = filter_already_known_payments(payments)
+    new_payments = unmatch_payment_with_keywords(new_payments)
+
+    return new_payments
+
+
+def is_date(text):
+    try:
+        datetime.datetime.strptime(text, DATE_FORMAT)
+        return True
+    except ValueError:
+        return False
+
+
+def is_money_amount(text):
+    try:
+        float(text.replace(",","."))
+        return True
+    except ValueError:
+        return False
+
+
+def load_csv(filename):
+    with open(filename, "r") as f:
+        return list(csv.reader(f, delimiter=DELIMITER))
+
+
+def clean_csv(data):
+
+    output = []
+
+    for i, row in enumerate(data):
+
+        if len(row) < 4:
+            continue
+
+        if not is_date(row[0]):
+            logging.warning("Ignoring the following row (bad format for date in the first column) :")
+            logging.warning(str(row))
+            continue
+
+        if is_money_amount(row[2]):
+            logging.warning("Ignoring row %s (not a payment)" % str(i))
+            logging.warning(str(row))
+            continue
+
+        if not is_money_amount(row[3]):
+            logging.warning("Ignoring the following row (bad format for money amount in colun three) :")
+            logging.warning(str(row))
+            continue
+
+        # Clean the date
+        row[0] = datetime.datetime.strptime(row[0], DATE_FORMAT).strftime("%Y-%m-%d")
+
+        # Clean the label ...
+        row[4] = row[4].replace('\r', ' ')
+        row[4] = row[4].replace('\n', ' ')
+
+        output.append(row)
+
+    return output
+
+
+def convert_csv_to_dicts(data):
+
+    output = []
+
+    for row in data:
+        payment = {}
+
+        payment["date"] = row[0]
+        payment["label"] = row[4]
+        payment["amount"] = float(row[3].replace(",","."))
+
+        output.append(payment)
+
+    return output
+
+
+def try_to_match_payment_with_members(payments):
+
+    members = Member.objects.all()
+
+    idregex = re.compile(ID_REGEX)
+
+    for payment in payments:
+
+        payment_label = payment["label"].upper()
+
+        # First, attempt to match the member ID
+        idmatches = idregex.findall(payment_label)
+        if len(idmatches) == 1:
+            i = int(idmatches[0][1])
+            member_matches = [ member.username for member in members if member.pk==i ]
+            if len(member_matches) == 1:
+                payment["member_matched"] = member_matches[0]
+                #print("Matched by ID to "+member_matches[0])
+                continue
+
+
+        # Second, attempt to find the username
+        usernamematch = None
+        for member in members:
+            username = flatten(member.username)
+            matches = re.compile(r"(?i)(\b|_)"+re.escape(username)+r"(\b|_)") \
+                        .findall(payment_label)
+
+            # If not found, try next
+            if len(matches) == 0:
+                continue
+
+            # If we already had a match, abort the whole search because we
+            # have multiple usernames matched !
+            if usernamematch != None:
+                usernamematch = None
+                break
+
+            usernamematch = member.username
+
+        if usernamematch != None:
+            payment["member_matched"] = usernamematch
+            #print("Matched by username to "+usernamematch)
+            continue
+
+
+        # Third, attempt to match by family name
+        familynamematch = None
+        for member in members:
+            if member.last_name == "":
+                continue
+
+            # "Flatten" accents in the last name... (probably the CSV
+            # don't contain 'special' chars like accents
+            member_last_name = flatten(member.last_name)
+
+            matches = re.compile(r"(?i)(\b|_)"+re.escape(member_last_name)+r"(\b|_)") \
+                        .findall(payment_label)
+
+            # If not found, try next
+            if len(matches) == 0:
+                continue
+
+            # If this familyname was matched several time, abort the whole search
+            #if len(matches) > 1:
+            #    print("Several matches ! Aborting !")
+            #    familynamematch = None
+            #    break
+
+            # If we already had a match, abort the whole search because we
+            # have multiple familynames matched !
+            if familynamematch != None:
+                familynamematch = None
+                break
+
+            familynamematch = member_last_name
+            usernamematch = member.username
+
+        if familynamematch != None:
+            payment["member_matched"] = usernamematch
+            #print("Matched by familyname to "+familynamematch)
+            continue
+
+        #print("Could not match")
+        payment["member_matched"] = None
+
+    return payments
+
+
+def unmatch_payment_with_keywords(payments):
+
+    matchers = {}
+    for keyword in KEYWORDS_TO_NOTMATCH:
+        matchers[keyword] = re.compile(r"(?i)(\b|_|-)"+re.escape(keyword)+r"(\b|_|-)")
+
+    for i, payment in enumerate(payments):
+
+        # If no match found, don't filter anyway
+        if payment["member_matched"] == None:
+            continue
+
+        for keyword, matcher in matchers.items():
+            matches = matcher.findall(payment["label"])
+
+            # If not found, try next
+            if len(matches) == 0:
+                continue
+
+            #print("Ignoring possible match for payment '%s' because " \
+            #      "it contains the keyword %s"                        \
+            #      % (payment["label"], keyword))
+            payments[i]["member_matched"] = None
+
+            break
+
+    return payments
+
+def filter_already_known_payments(payments):
+
+    new_payments = []
+
+    known_payments = Payment.objects.all()
+
+    for payment in payments:
+
+        found_match = False
+        for known_payment in known_payments:
+
+            if  (str(known_payment.date) == payment["date"].encode('utf-8')) \
+            and (known_payment.label == payment["label"]) \
+            and (float(known_payment.amount) == float(payment["amount"])):
+                found_match = True
+                break
+
+        if not found_match:
+            new_payments.append(payment)
+
+    return new_payments
+
+
+def add_new_payments(new_payments):
+
+    for new_payment in new_payments:
+
+        # Get the member if there's a member matched
+        member = None
+        if new_payment["member_matched"]:
+            member = Member.objects.filter(username=new_payment["member_matched"])
+            assert len(member) == 1
+            member = member[0]
+
+        print("Adding new payment : ")
+        print(new_payment)
+
+        # Create the payment
+        payment = Payment.objects.create(amount=float(new_payment["amount"]),
+                                         label=new_payment["label"],
+                                         date=new_payment["date"],
+                                         member=member)
+
+def flatten(some_string):
+    return unidecode.unidecode(some_string).upper()

+ 7 - 272
coin/billing/management/commands/import_payments_from_csv.py

@@ -20,34 +20,18 @@ should run this command with --commit if you agree with the dry-run.
 from __future__ import unicode_literals
 
 # Standard python libs
-import csv
-import datetime
 import json
-import logging
 import os
-import re
-
-import unidecode
 
 # Django specific imports
 from argparse import RawTextHelpFormatter
 from django.core.management.base import BaseCommand, CommandError
 
 # Coin specific imports
-from coin.members.models import Member
-from coin.billing.models import Payment
+from coin.billing.import_payments_from_csv import process, add_new_payments
 
-# Parser / import / matcher configuration
+################################################################################
 
-# The CSV delimiter
-DELIMITER=str(';')
-# The date format in the CSV
-DATE_FORMAT="%d/%m/%Y"
-# The default regex used to match the label of a payment with a member ID
-ID_REGEX=r"(?i)(\b|_)ID[\s\-\_\/]*(\d+)(\b|_)"
-# If the label of the payment contains one of these, the payment won't be
-# matched to a member when importing it.
-KEYWORDS_TO_NOTMATCH=[ "REM CHQ" ]
 
 class Command(BaseCommand):
 
@@ -74,30 +58,22 @@ class Command(BaseCommand):
             help='Agree with the proposed change and commit them'
         )
 
-
     def handle(self, *args, **options):
 
         assert options["filename"] != ""
         if not os.path.isfile(options["filename"]):
             raise CommandError("This file does not exists.")
-        os.system("iconv -f ISO-8859-1 -t UTF-8 %s > %s.utf8.csv" % (options["filename"], options["filename"]))
-        options["filename"] = options["filename"] + '.utf8.csv'
-
-        payments = self.convert_csv_to_dicts(self.clean_csv(self.load_csv(options["filename"])))
 
-        payments = self.try_to_match_payment_with_members(payments)
-        new_payments = self.filter_already_known_payments(payments)
-        new_payments = self.unmatch_payment_with_keywords(new_payments)
+        f = open(options["filename"], "r")
+        new_payments = process(f)
 
-        number_of_already_known_payments = len(payments)-len(new_payments)
         number_of_new_payments = len(new_payments)
 
-        if (number_of_new_payments > 0) :
+        if (number_of_new_payments > 0):
             print("======================================================")
             print("   > New payments found")
             print(json.dumps(new_payments, indent=4, separators=(',', ': ')))
         print("======================================================")
-        print("Number of already known payments found : " + str(number_of_already_known_payments))
         print("Number of new payments found           : " + str(number_of_new_payments))
         print("Number of new payments matched         : " + str(len([p for p in new_payments if     p["member_matched"]])))
         print("Number of payments not matched         : " + str(len([p for p in new_payments if not p["member_matched"]])))
@@ -108,248 +84,7 @@ class Command(BaseCommand):
             return
 
         if not options["commit"]:
-            print("Please carefully review the matches, then if everything \n" \
+            print("Please carefully review the matches, then if everything \n"
                   "looks alright, use --commit to register these new payments.")
         else:
-            self.add_new_payments(new_payments)
-
-
-    def is_date(self, text):
-        try:
-            datetime.datetime.strptime(text, DATE_FORMAT)
-            return True
-        except ValueError:
-            return False
-
-
-    def is_money_amount(self, text):
-        try:
-            float(text.replace(",","."))
-            return True
-        except ValueError:
-            return False
-
-
-    def load_csv(self, filename):
-        with open(filename, "r") as f:
-            return list(csv.reader(f, delimiter=DELIMITER))
-
-
-    def clean_csv(self, data):
-
-        output = []
-
-        for i, row in enumerate(data):
-
-            for j in range(len(row)):
-                row[j] = row[j].decode('utf-8')
-
-            if len(row) < 4:
-                continue
-
-            if not self.is_date(row[0]):
-                logging.warning("Ignoring the following row (bad format for date in the first column) :")
-                logging.warning(str(row))
-                continue
-
-            if self.is_money_amount(row[2]):
-                logging.warning("Ignoring row %s (not a payment)" % str(i))
-                logging.warning(str(row))
-                continue
-
-            if not self.is_money_amount(row[3]):
-                logging.warning("Ignoring the following row (bad format for money amount in colun three) :")
-                logging.warning(str(row))
-                continue
-
-            # Clean the date
-            row[0] = datetime.datetime.strptime(row[0], DATE_FORMAT).strftime("%Y-%m-%d")
-
-            # Clean the label ...
-            row[4] = row[4].replace('\r', ' ')
-            row[4] = row[4].replace('\n', ' ')
-
-            output.append(row)
-
-        return output
-
-
-    def convert_csv_to_dicts(self, data):
-
-        output = []
-
-        for row in data:
-            payment = {}
-
-            payment["date"] = row[0]
-            payment["label"] = row[4]
-            payment["amount"] = float(row[3].replace(",","."))
-
-            output.append(payment)
-
-        return output
-
-
-    def try_to_match_payment_with_members(self, payments):
-
-        #members = Member.objects.filter(status="member")
-        members = Member.objects.all()
-
-        idregex = re.compile(ID_REGEX)
-
-        for payment in payments:
-
-            payment_label = payment["label"].upper()
-
-            # First, attempt to match the member ID
-            idmatches = idregex.findall(payment_label)
-            if len(idmatches) == 1:
-                i = int(idmatches[0][1])
-                member_matches = [ member.username for member in members if member.pk==i ]
-                if len(member_matches) == 1:
-                    payment["member_matched"] = member_matches[0]
-                    #print("Matched by ID to "+member_matches[0])
-                    continue
-
-
-            # Second, attempt to find the username
-            usernamematch = None
-            for member in members:
-                username = self.flatten(member.username)
-                matches = re.compile(r"(?i)(\b|_)"+re.escape(username)+r"(\b|_)") \
-                            .findall(payment_label)
-
-                # If not found, try next
-                if len(matches) == 0:
-                    continue
-
-                # If we already had a match, abort the whole search because we
-                # have multiple usernames matched !
-                if usernamematch != None:
-                    usernamematch = None
-                    break
-
-                usernamematch = member.username
-
-            if usernamematch != None:
-                payment["member_matched"] = usernamematch
-                #print("Matched by username to "+usernamematch)
-                continue
-
-
-            # Third, attempt to match by family name
-            familynamematch = None
-            for member in members:
-                if member.last_name == "":
-                    continue
-
-                # "Flatten" accents in the last name... (probably the CSV
-                # don't contain 'special' chars like accents
-                member_last_name = self.flatten(member.last_name)
-
-                matches = re.compile(r"(?i)(\b|_)"+re.escape(member_last_name)+r"(\b|_)") \
-                            .findall(payment_label)
-
-                # If not found, try next
-                if len(matches) == 0:
-                    continue
-
-                # If this familyname was matched several time, abort the whole search
-                #if len(matches) > 1:
-                #    print("Several matches ! Aborting !")
-                #    familynamematch = None
-                #    break
-
-                # If we already had a match, abort the whole search because we
-                # have multiple familynames matched !
-                if familynamematch != None:
-                    familynamematch = None
-                    break
-
-                familynamematch = member_last_name
-                usernamematch = member.username
-
-            if familynamematch != None:
-                payment["member_matched"] = usernamematch
-                #print("Matched by familyname to "+familynamematch)
-                continue
-
-            #print("Could not match")
-            payment["member_matched"] = None
-
-        return payments
-
-
-    def unmatch_payment_with_keywords(self, payments):
-
-        matchers = {}
-        for keyword in KEYWORDS_TO_NOTMATCH:
-            matchers[keyword] = re.compile(r"(?i)(\b|_|-)"+re.escape(keyword)+r"(\b|_|-)")
-
-        for i, payment in enumerate(payments):
-
-            # If no match found, don't filter anyway
-            if payment["member_matched"] == None:
-                continue
-
-            for keyword, matcher in matchers.items():
-                matches = matcher.findall(payment["label"])
-
-                # If not found, try next
-                if len(matches) == 0:
-                    continue
-
-                print("Ignoring possible match for payment '%s' because " \
-                      "it contains the keyword %s"                        \
-                      % (payment["label"], keyword))
-                payments[i]["member_matched"] = None
-
-                break
-
-        return payments
-
-    def filter_already_known_payments(self, payments):
-
-        new_payments = []
-
-        known_payments = Payment.objects.all()
-
-        for payment in payments:
-
-            found_match = False
-            for known_payment in known_payments:
-
-                if  (str(known_payment.date) == payment["date"].encode('utf-8')) \
-                and (known_payment.label == payment["label"]) \
-                and (float(known_payment.amount) == float(payment["amount"])):
-                    found_match = True
-                    break
-
-            if not found_match:
-                new_payments.append(payment)
-
-        return new_payments
-
-
-    def add_new_payments(self, new_payments):
-
-        for new_payment in new_payments:
-
-            # Get the member if there's a member matched
-            member = None
-            if new_payment["member_matched"]:
-                member = Member.objects.filter(username=new_payment["member_matched"])
-                assert len(member) == 1
-                member = member[0]
-
-            print("Adding new payment : ")
-            print(new_payment)
-
-            # Create the payment
-            payment = Payment.objects.create(amount=float(new_payment["amount"]),
-                                             label=new_payment["label"],
-                                             date=new_payment["date"],
-                                             member=member)
-
-    def flatten(self, some_string):
-        return unidecode.unidecode(some_string).upper()
+            add_new_payments(new_payments)