import_payments_from_csv.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. # -*- coding: utf-8 -*-
  2. """
  3. Import payments from a CSV file from a bank. The payments will automatically be
  4. parsed, and there'll be an attempt to automatically match payments with members.
  5. The matching is performed using the label of the payment.
  6. - First, try to find a string such as 'ID-42' where 42 is the member's ID
  7. - Second (if no ID found), try to find a member username (with no ambiguity with
  8. respect to other usernames)
  9. - Third (if no username found), try to find a member family name (with no
  10. ambiguity with respect to other family name)
  11. This script will check if a payment has already been registered with same
  12. properies (date, label, price) to avoid creating duplicate payments inside coin.
  13. By default, only a dry-run is perfomed to let you see what will happen ! You
  14. should run this command with --commit if you agree with the dry-run.
  15. """
  16. from __future__ import unicode_literals
  17. # Standard python libs
  18. import csv
  19. import datetime
  20. import logging
  21. import re
  22. import unidecode
  23. # Coin specific imports
  24. from coin.members.models import Member
  25. from coin.billing.models import Payment
  26. # Parser / import / matcher configuration
  27. # The CSV delimiter
  28. DELIMITER = str(';')
  29. # The date format in the CSV
  30. DATE_FORMAT = "%d/%m/%Y"
  31. # The default regex used to match the label of a payment with a member ID
  32. ID_REGEX = r"(?i)(\b|_)ID[\s\-\_\/]*(\d+)(\b|_)"
  33. # If the label of the payment contains one of these, the payment won't be
  34. # matched to a member when importing it.
  35. KEYWORDS_TO_NOTMATCH = ["REM CHQ"]
  36. ################################################################################
  37. def process(f):
  38. raw_csv = list(csv.reader(f, delimiter=DELIMITER))
  39. cleaned_csv = clean_csv(raw_csv)
  40. payments = convert_csv_to_dicts(cleaned_csv)
  41. payments = try_to_match_payment_with_members(payments)
  42. new_payments = filter_already_known_payments(payments)
  43. new_payments = unmatch_payment_with_keywords(new_payments)
  44. return new_payments
  45. def is_date(text):
  46. try:
  47. datetime.datetime.strptime(text, DATE_FORMAT)
  48. return True
  49. except ValueError:
  50. return False
  51. def is_money_amount(text):
  52. try:
  53. float(text.replace(",","."))
  54. return True
  55. except ValueError:
  56. return False
  57. def load_csv(filename):
  58. with open(filename, "r") as f:
  59. return list(csv.reader(f, delimiter=DELIMITER))
  60. def clean_csv(data):
  61. output = []
  62. for i, row in enumerate(data):
  63. if len(row) < 4:
  64. continue
  65. if not is_date(row[0]):
  66. logging.warning("Ignoring the following row (bad format for date in the first column) :")
  67. logging.warning(str(row))
  68. continue
  69. if is_money_amount(row[2]):
  70. logging.warning("Ignoring row %s (not a payment)" % str(i))
  71. logging.warning(str(row))
  72. continue
  73. if not is_money_amount(row[3]):
  74. logging.warning("Ignoring the following row (bad format for money amount in colun three) :")
  75. logging.warning(str(row))
  76. continue
  77. # Clean the date
  78. row[0] = datetime.datetime.strptime(row[0], DATE_FORMAT).strftime("%Y-%m-%d")
  79. # Clean the label ...
  80. row[4] = row[4].replace('\r', ' ')
  81. row[4] = row[4].replace('\n', ' ')
  82. output.append(row)
  83. return output
  84. def convert_csv_to_dicts(data):
  85. output = []
  86. for row in data:
  87. payment = {}
  88. payment["date"] = row[0]
  89. payment["label"] = row[4]
  90. payment["amount"] = float(row[3].replace(",","."))
  91. output.append(payment)
  92. return output
  93. def try_to_match_payment_with_members(payments):
  94. members = Member.objects.all()
  95. idregex = re.compile(ID_REGEX)
  96. for payment in payments:
  97. payment_label = payment["label"].upper()
  98. # First, attempt to match the member ID
  99. idmatches = idregex.findall(payment_label)
  100. if len(idmatches) == 1:
  101. i = int(idmatches[0][1])
  102. member_matches = [ member.username for member in members if member.pk==i ]
  103. if len(member_matches) == 1:
  104. payment["member_matched"] = member_matches[0]
  105. #print("Matched by ID to "+member_matches[0])
  106. continue
  107. # Second, attempt to find the username
  108. usernamematch = None
  109. for member in members:
  110. username = flatten(member.username)
  111. matches = re.compile(r"(?i)(\b|_)"+re.escape(username)+r"(\b|_)") \
  112. .findall(payment_label)
  113. # If not found, try next
  114. if len(matches) == 0:
  115. continue
  116. # If we already had a match, abort the whole search because we
  117. # have multiple usernames matched !
  118. if usernamematch != None:
  119. usernamematch = None
  120. break
  121. usernamematch = member.username
  122. if usernamematch != None:
  123. payment["member_matched"] = usernamematch
  124. #print("Matched by username to "+usernamematch)
  125. continue
  126. # Third, attempt to match by family name
  127. familynamematch = None
  128. for member in members:
  129. if member.last_name == "":
  130. continue
  131. # "Flatten" accents in the last name... (probably the CSV
  132. # don't contain 'special' chars like accents
  133. member_last_name = flatten(member.last_name)
  134. matches = re.compile(r"(?i)(\b|_)"+re.escape(member_last_name)+r"(\b|_)") \
  135. .findall(payment_label)
  136. # If not found, try next
  137. if len(matches) == 0:
  138. continue
  139. # If this familyname was matched several time, abort the whole search
  140. #if len(matches) > 1:
  141. # print("Several matches ! Aborting !")
  142. # familynamematch = None
  143. # break
  144. # If we already had a match, abort the whole search because we
  145. # have multiple familynames matched !
  146. if familynamematch != None:
  147. familynamematch = None
  148. break
  149. familynamematch = member_last_name
  150. usernamematch = member.username
  151. if familynamematch != None:
  152. payment["member_matched"] = usernamematch
  153. #print("Matched by familyname to "+familynamematch)
  154. continue
  155. #print("Could not match")
  156. payment["member_matched"] = None
  157. return payments
  158. def unmatch_payment_with_keywords(payments):
  159. matchers = {}
  160. for keyword in KEYWORDS_TO_NOTMATCH:
  161. matchers[keyword] = re.compile(r"(?i)(\b|_|-)"+re.escape(keyword)+r"(\b|_|-)")
  162. for i, payment in enumerate(payments):
  163. # If no match found, don't filter anyway
  164. if payment["member_matched"] == None:
  165. continue
  166. for keyword, matcher in matchers.items():
  167. matches = matcher.findall(payment["label"])
  168. # If not found, try next
  169. if len(matches) == 0:
  170. continue
  171. #print("Ignoring possible match for payment '%s' because " \
  172. # "it contains the keyword %s" \
  173. # % (payment["label"], keyword))
  174. payments[i]["member_matched"] = None
  175. break
  176. return payments
  177. def filter_already_known_payments(payments):
  178. new_payments = []
  179. known_payments = Payment.objects.all()
  180. for payment in payments:
  181. found_match = False
  182. for known_payment in known_payments:
  183. if (str(known_payment.date) == payment["date"].encode('utf-8')) \
  184. and (known_payment.label == payment["label"]) \
  185. and (float(known_payment.amount) == float(payment["amount"])):
  186. found_match = True
  187. break
  188. if not found_match:
  189. new_payments.append(payment)
  190. return new_payments
  191. def add_new_payments(new_payments):
  192. for new_payment in new_payments:
  193. # Get the member if there's a member matched
  194. member = None
  195. if new_payment["member_matched"]:
  196. member = Member.objects.filter(username=new_payment["member_matched"])
  197. assert len(member) == 1
  198. member = member[0]
  199. print("Adding new payment : ")
  200. print(new_payment)
  201. # Create the payment
  202. payment = Payment.objects.create(amount=float(new_payment["amount"]),
  203. label=new_payment["label"],
  204. date=new_payment["date"],
  205. member=member)
  206. def flatten(some_string):
  207. return unidecode.unidecode(some_string).upper()