import_payments_from_csv.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. # -*- coding: utf-8 -*-
  2. """
  3. Import payments from a CSV file from a bank. The payments will automatically be
  4. parsed, and there'll be an attempt to automatically match payments with members.
  5. The matching is performed using the label of the payment.
  6. - First, try to find a string such as 'ID-42' where 42 is the member's ID
  7. - Second (if no ID found), try to find a member username (with no ambiguity with
  8. respect to other usernames)
  9. - Third (if no username found), try to find a member family name (with no
  10. ambiguity with respect to other family name)
  11. This script will check if a payment has already been registered with same
  12. properies (date, label, price) to avoid creating duplicate payments inside coin.
  13. By default, only a dry-run is perfomed to let you see what will happen ! You
  14. should run this command with --commit if you agree with the dry-run.
  15. """
  16. from __future__ import unicode_literals
  17. # Standard python libs
  18. import csv
  19. import datetime
  20. import json
  21. import logging
  22. import os
  23. import re
  24. import unidecode
  25. # Django specific imports
  26. from argparse import RawTextHelpFormatter
  27. from django.core.management.base import BaseCommand, CommandError
  28. # Coin specific imports
  29. from coin.members.models import Member
  30. from coin.billing.models import Payment
  31. # Parser / import / matcher configuration
  32. # The CSV delimiter
  33. DELIMITER=str(';')
  34. # The date format in the CSV
  35. DATE_FORMAT="%d/%m/%Y"
  36. # The default regex used to match the label of a payment with a member ID
  37. ID_REGEX=r"(?i)(\b|_)ID[\s\-\_\/]*(\d+)(\b|_)"
  38. # If the label of the payment contains one of these, the payment won't be
  39. # matched to a member when importing it.
  40. KEYWORDS_TO_NOTMATCH=[ "REM CHQ" ]
  41. class Command(BaseCommand):
  42. help = __doc__
  43. def create_parser(self, *args, **kwargs):
  44. parser = super(Command, self).create_parser(*args, **kwargs)
  45. parser.formatter_class = RawTextHelpFormatter
  46. return parser
  47. def add_arguments(self, parser):
  48. parser.add_argument(
  49. 'filename',
  50. type=str,
  51. help="The CSV filename to be parsed"
  52. )
  53. parser.add_argument(
  54. '--commit',
  55. action='store_true',
  56. dest='commit',
  57. default=False,
  58. help='Agree with the proposed change and commit them'
  59. )
  60. def handle(self, *args, **options):
  61. assert options["filename"] != ""
  62. if not os.path.isfile(options["filename"]):
  63. raise CommandError("This file does not exists.")
  64. os.system("iconv -f ISO-8859-1 -t UTF-8 %s > %s.utf8.csv" % (options["filename"], options["filename"]))
  65. options["filename"] = options["filename"] + '.utf8.csv'
  66. payments = self.convert_csv_to_dicts(self.clean_csv(self.load_csv(options["filename"])))
  67. payments = self.try_to_match_payment_with_members(payments)
  68. new_payments = self.filter_already_known_payments(payments)
  69. new_payments = self.unmatch_payment_with_keywords(new_payments)
  70. number_of_already_known_payments = len(payments)-len(new_payments)
  71. number_of_new_payments = len(new_payments)
  72. if (number_of_new_payments > 0) :
  73. print("======================================================")
  74. print(" > New payments found")
  75. print(json.dumps(new_payments, indent=4, separators=(',', ': ')))
  76. print("======================================================")
  77. print("Number of already known payments found : " + str(number_of_already_known_payments))
  78. print("Number of new payments found : " + str(number_of_new_payments))
  79. print("Number of new payments matched : " + str(len([p for p in new_payments if p["member_matched"]])))
  80. print("Number of payments not matched : " + str(len([p for p in new_payments if not p["member_matched"]])))
  81. print("======================================================")
  82. if number_of_new_payments == 0:
  83. print("Nothing to do, everything looks up to date !")
  84. return
  85. if not options["commit"]:
  86. print("Please carefully review the matches, then if everything \n" \
  87. "looks alright, use --commit to register these new payments.")
  88. else:
  89. self.add_new_payments(new_payments)
  90. def is_date(self, text):
  91. try:
  92. datetime.datetime.strptime(text, DATE_FORMAT)
  93. return True
  94. except ValueError:
  95. return False
  96. def is_money_amount(self, text):
  97. try:
  98. float(text.replace(",","."))
  99. return True
  100. except ValueError:
  101. return False
  102. def load_csv(self, filename):
  103. with open(filename, "r") as f:
  104. return list(csv.reader(f, delimiter=DELIMITER))
  105. def clean_csv(self, data):
  106. output = []
  107. for i, row in enumerate(data):
  108. for j in range(len(row)):
  109. row[j] = row[j].decode('utf-8')
  110. if len(row) < 4:
  111. continue
  112. if not self.is_date(row[0]):
  113. logging.warning("Ignoring the following row (bad format for date in the first column) :")
  114. logging.warning(str(row))
  115. continue
  116. if self.is_money_amount(row[2]):
  117. logging.warning("Ignoring row %s (not a payment)" % str(i))
  118. logging.warning(str(row))
  119. continue
  120. if not self.is_money_amount(row[3]):
  121. logging.warning("Ignoring the following row (bad format for money amount in colun three) :")
  122. logging.warning(str(row))
  123. continue
  124. # Clean the date
  125. row[0] = datetime.datetime.strptime(row[0], DATE_FORMAT).strftime("%Y-%m-%d")
  126. # Clean the label ...
  127. row[4] = row[4].replace('\r', ' ')
  128. row[4] = row[4].replace('\n', ' ')
  129. output.append(row)
  130. return output
  131. def convert_csv_to_dicts(self, data):
  132. output = []
  133. for row in data:
  134. payment = {}
  135. payment["date"] = row[0]
  136. payment["label"] = row[4]
  137. payment["amount"] = float(row[3].replace(",","."))
  138. output.append(payment)
  139. return output
  140. def try_to_match_payment_with_members(self, payments):
  141. #members = Member.objects.filter(status="member")
  142. members = Member.objects.all()
  143. idregex = re.compile(ID_REGEX)
  144. for payment in payments:
  145. payment_label = payment["label"].upper()
  146. # First, attempt to match the member ID
  147. idmatches = idregex.findall(payment_label)
  148. if len(idmatches) == 1:
  149. i = int(idmatches[0][1])
  150. member_matches = [ member.username for member in members if member.pk==i ]
  151. if len(member_matches) == 1:
  152. payment["member_matched"] = member_matches[0]
  153. #print("Matched by ID to "+member_matches[0])
  154. continue
  155. # Second, attempt to find the username
  156. usernamematch = None
  157. for member in members:
  158. username = self.flatten(member.username)
  159. matches = re.compile(r"(?i)(\b|_)"+re.escape(username)+r"(\b|_)") \
  160. .findall(payment_label)
  161. # If not found, try next
  162. if len(matches) == 0:
  163. continue
  164. # If we already had a match, abort the whole search because we
  165. # have multiple usernames matched !
  166. if usernamematch != None:
  167. usernamematch = None
  168. break
  169. usernamematch = member.username
  170. if usernamematch != None:
  171. payment["member_matched"] = usernamematch
  172. #print("Matched by username to "+usernamematch)
  173. continue
  174. # Third, attempt to match by family name
  175. familynamematch = None
  176. for member in members:
  177. if member.last_name == "":
  178. continue
  179. # "Flatten" accents in the last name... (probably the CSV
  180. # don't contain 'special' chars like accents
  181. member_last_name = self.flatten(member.last_name)
  182. matches = re.compile(r"(?i)(\b|_)"+re.escape(member_last_name)+r"(\b|_)") \
  183. .findall(payment_label)
  184. # If not found, try next
  185. if len(matches) == 0:
  186. continue
  187. # If this familyname was matched several time, abort the whole search
  188. #if len(matches) > 1:
  189. # print("Several matches ! Aborting !")
  190. # familynamematch = None
  191. # break
  192. # If we already had a match, abort the whole search because we
  193. # have multiple familynames matched !
  194. if familynamematch != None:
  195. familynamematch = None
  196. break
  197. familynamematch = member_last_name
  198. usernamematch = member.username
  199. if familynamematch != None:
  200. payment["member_matched"] = usernamematch
  201. #print("Matched by familyname to "+familynamematch)
  202. continue
  203. #print("Could not match")
  204. payment["member_matched"] = None
  205. return payments
  206. def unmatch_payment_with_keywords(self, payments):
  207. matchers = {}
  208. for keyword in KEYWORDS_TO_NOTMATCH:
  209. matchers[keyword] = re.compile(r"(?i)(\b|_|-)"+re.escape(keyword)+r"(\b|_|-)")
  210. for i, payment in enumerate(payments):
  211. # If no match found, don't filter anyway
  212. if payment["member_matched"] == None:
  213. continue
  214. for keyword, matcher in matchers.items():
  215. matches = matcher.findall(payment["label"])
  216. # If not found, try next
  217. if len(matches) == 0:
  218. continue
  219. print("Ignoring possible match for payment '%s' because " \
  220. "it contains the keyword %s" \
  221. % (payment["label"], keyword))
  222. payments[i]["member_matched"] = None
  223. break
  224. return payments
  225. def filter_already_known_payments(self, payments):
  226. new_payments = []
  227. known_payments = Payment.objects.all()
  228. for payment in payments:
  229. found_match = False
  230. for known_payment in known_payments:
  231. if (str(known_payment.date) == payment["date"].encode('utf-8')) \
  232. and (known_payment.label == payment["label"]) \
  233. and (float(known_payment.amount) == float(payment["amount"])):
  234. found_match = True
  235. break
  236. if not found_match:
  237. new_payments.append(payment)
  238. return new_payments
  239. def add_new_payments(self, new_payments):
  240. for new_payment in new_payments:
  241. # Get the member if there's a member matched
  242. member = None
  243. if new_payment["member_matched"]:
  244. member = Member.objects.filter(username=new_payment["member_matched"])
  245. assert len(member) == 1
  246. member = member[0]
  247. print("Adding new payment : ")
  248. print(new_payment)
  249. # Create the payment
  250. payment = Payment.objects.create(amount=float(new_payment["amount"]),
  251. label=new_payment["label"],
  252. date=new_payment["date"],
  253. member=member)
  254. def flatten(self, some_string):
  255. return unidecode.unidecode(some_string).upper()