# -*- coding: utf-8 -*- from __future__ import unicode_literals import datetime import settings import MySQLdb as mdb import sys import os import codecs MYSQL_QUERIES = { # "bank": "SELECT DISTINCT b.rowid as b_rowid, ba.ref as ba_ref, ba.label as ba_label, ba.account_number as ba_account_number, b.datev as b_datev, b.dateo as b_dateo, b.label as b_label, b.num_chq as b_num_chq, -b.amount as _b_amount, b.amount as b_amount, b.num_releve as b_num_releve, b.datec as b_datec, bu.url_id as bu_url_id, s.nom as s_nom, s.code_compta as s_code_compta, s.code_compta_fournisseur as s_code_compta_fournisseur, bca.label as bca_label, bca.rowid as bca_rowid, bcl.lineid as bcl_lineid FROM (llx_bank_account as ba, llx_bank as b) LEFT JOIN llx_bank_url as bu ON (bu.fk_bank = b.rowid AND bu.type = 'company') LEFT JOIN llx_societe as s ON bu.url_id = s.rowid LEFT JOIN llx_bank_class as bcl ON bcl.lineid = b.rowid LEFT JOIN llx_bank_categ as bca ON bca.rowid = bcl.fk_categ WHERE ba.rowid = b.fk_account AND ba.entity = 1 and b.num_releve <> '' ORDER BY b.datev, b.num_releve", "bank": "SELECT DISTINCT b.rowid as b_rowid, ba.ref as ba_ref, ba.label as ba_label, ba.account_number as ba_account_number, b.datev as b_datev, b.dateo as b_dateo, b.label as b_label, b.num_chq as b_num_chq, -b.amount as _b_amount, b.amount as b_amount, b.num_releve as b_num_releve, b.datec as b_datec, bu.url_id as bu_url_id, s.nom as s_nom, s.code_compta as s_code_compta, s.code_compta_fournisseur as s_code_compta_fournisseur, bca.label as bca_label, bca.rowid as bca_rowid, bcl.lineid as bcl_lineid, ccs.code as ccs_code, ccs.libelle as ccs_label FROM (llx_bank_account as ba, llx_bank as b) LEFT JOIN llx_bank_url as bu ON (bu.fk_bank = b.rowid AND bu.type = 'company') LEFT JOIN llx_societe as s ON bu.url_id = s.rowid LEFT JOIN llx_bank_class as bcl ON bcl.lineid = b.rowid LEFT JOIN llx_bank_categ as bca ON bca.rowid = bcl.fk_categ LEFT JOIN llx_paiementcharge as p ON p.fk_bank = b.rowid LEFT JOIN llx_chargesociales as cs ON cs.rowid = p.fk_charge LEFT JOIN llx_c_chargesociales as ccs ON cs.fk_type = ccs.id WHERE ba.rowid = b.fk_account AND ba.entity = 1 and b.num_releve <> '' ORDER BY b.datev, b.num_releve;", "sells": "SELECT DISTINCT s.rowid as s_rowid, s.nom as s_nom, s.address as s_address, s.zip as s_zip, s.town as s_town, c.code as c_code, s.phone as s_phone, s.siren as s_siren, s.siret as s_siret, s.ape as s_ape, s.idprof4 as s_idprof4, s.code_compta as s_code_compta, s.code_compta_fournisseur as s_code_compta_fournisseur, s.tva_intra as s_tva_intra, f.rowid as f_rowid, f.facnumber as f_facnumber, f.datec as f_datec, f.datef as f_datef, f.date_lim_reglement as f_date_lim_reglement, f.total as f_total, f.total_ttc as f_total_ttc, f.tva as f_tva, f.paye as f_paye, f.fk_statut as f_fk_statut, f.note_private as f_note_private, f.note_public as f_note_public, fd.rowid as fd_rowid, fd.label as fd_label, fd.description as fd_description, fd.subprice as fd_subprice, fd.tva_tx as fd_tva_tx, fd.qty as fd_qty, fd.total_ht as fd_total_ht, fd.total_tva as fd_total_tva, fd.total_ttc as fd_total_ttc, fd.date_start as fd_date_start, fd.date_end as fd_date_end, fd.special_code as fd_special_code, fd.product_type as fd_product_type, fd.fk_product as fd_fk_product, p.ref as p_ref, p.label as p_label, p.accountancy_code_sell as p_accountancy_code_sell, a.account_number as a_account_number FROM llx_societe as s LEFT JOIN llx_c_pays as c on s.fk_pays = c.rowid, llx_facture as f LEFT JOIN llx_facture_extrafields as extra ON f.rowid = extra.fk_object , llx_facturedet as fd LEFT JOIN llx_product as p on (fd.fk_product = p.rowid) LEFT JOIN llx_accountingaccount as a ON fd.fk_code_ventilation = a.rowid WHERE f.fk_soc = s.rowid AND f.rowid = fd.fk_facture AND f.entity = 1", "suppliers": "SELECT DISTINCT s.rowid as s_rowid, s.nom as s_nom, s.address as s_address, s.zip as s_zip, s.town as s_town, s.code_compta_fournisseur as s_code_supplier, c.code as c_code, s.phone as s_phone, s.siren as s_siren, s.siret as s_siret, s.ape as s_ape, s.idprof4 as s_idprof4, s.idprof5 as s_idprof5, s.idprof6 as s_idprof6, s.tva_intra as s_tva_intra, f.rowid as f_rowid, f.ref as f_ref, f.ref_supplier as f_ref_supplier, f.datec as f_datec, f.datef as f_datef, f.total_ht as f_total_ht, f.total_ttc as f_total_ttc, f.total_tva as f_total_tva, f.paye as f_paye, f.fk_statut as f_fk_statut, f.note_public as f_note_public, fd.rowid as fd_rowid, fd.description as fd_description, fd.tva_tx as fd_tva_tx, fd.qty as fd_qty, fd.remise_percent as fd_remise_percent, fd.total_ht as fd_total_ht, fd.total_ttc as fd_total_ttc, fd.tva as fd_tva, fd.product_type as fd_product_type, fd.fk_product as fd_fk_product, p.ref as p_ref, p.label as p_label, p.accountancy_code_buy as p_accountancy_code_buy, a.account_number as a_account_number FROM llx_societe as s LEFT JOIN llx_c_pays as c ON s.fk_pays = c.rowid, llx_facture_fourn as f LEFT JOIN llx_facture_fourn_extrafields as extra ON f.rowid = extra.fk_object , llx_facture_fourn_det as fd LEFT JOIN llx_product as p on (fd.fk_product = p.rowid) LEFT JOIN llx_accountingaccount as a ON fd.fk_code_ventilation = a.rowid WHERE f.fk_soc = s.rowid AND f.rowid = fd.fk_facture_fourn AND f.entity = 1", "social": "SELECT DISTINCT cc.libelle as cc_libelle, c.rowid as c_rowid, c.libelle as c_libelle, c.date_ech as c_date_ech, c.periode as c_periode, c.amount as c_amount, c.paye as c_paye, p.rowid as p_rowid, p.datep as p_datep, p.amount as p_amount, p.num_paiement as p_num_paiement, cc.accountancy_code as cc_acc_code, cc.code as cc_code FROM llx_c_chargesociales as cc, llx_chargesociales as c LEFT JOIN llx_paiementcharge as p ON p.fk_charge = c.rowid WHERE c.fk_type = cc.id AND c.entity = 1", } class Entry(object): accounting_years = settings.get('ACCOUNTING_YEARS') pc_default_tier = settings.get('PC_REFS')['default_tier'] pc_default_client = settings.get('PC_REFS')['default_client'] pc_default_supplier = settings.get('PC_REFS')['default_supplier'] pc_default_income = settings.get('PC_REFS')['default_income'] pc_default_expense = settings.get('PC_REFS')['default_expense'] pc_default_bank = settings.get('PC_REFS')['default_bank'] def __init__(self, cells): self.cells = cells for key in cells: value = self.cells[key] if value is None: self.cells[key] = "" elif isinstance(value, str): self.cells[key] = value.decode('iso8859-1') def get_accounting_year(self, field): value = self.cells[field] for (year, dbegin, dend) in Entry.accounting_years: if value >= dbegin and value <= dend: return year return str(value.year) class BankEntry(Entry): # ['ba_ref', # 's_nom', # 'b_rowid', # 'b_datev', # 'b_num_releve', # 'bu_url_id', # 'ba_label', # 's_code_compta', # 'b_label', # 'b_dateo', # '_b_amount', # 'b_num_chq', # 'b_datec', # 'b_amount', # 's_code_compta_fournisseur'] @staticmethod def clean(row): return True #return row['b_num_releve'] != "" def addEntry(self, entry): pass def get_year(self): return self.get_accounting_year('b_datev') def getMissingPC(self): pc_missing = [] if self.get_s_code() == "": pc_missing.append("tiers: %s" % (self.cells['s_nom'])) if self.cells['ba_account_number'] == "": pc_missing.append("banque: %s" % (self.cells['ba_ref'])) return pc_missing def get_s_code(self): supplier_or_client_by_bca_rowid = { 1: 'client', 2: 'client', 3: 'supplier', 4: 'supplier', 5: 'client', '': None } #entry_type = entry_type_by_bca_rowid[self.cells['bca_rowid']] entry_type = None if entry_type is None: if self.cells['b_label'] == "(SupplierInvoicePayment)": entry_type = "supplier" elif self.cells['b_label'] == "(CustomerInvoicePayment)": entry_type = "client" elif self.cells['b_label'] == "Règlement client": entry_type = "client" elif self.cells['b_label'] == "Solde initial": entry_type = "custom" #elif self.cells['b_label'] == "Souscription part sociale": # entry_type = "custom" elif self.cells['b_label'] == "(SocialContributionPayment)": entry_type = "social" else: entry_type = "custom" third_code = "" if entry_type == "client": third_code = self.cells['s_code_compta'] if third_code == "": third_code = self.pc_default_client print "WARNING: Utilisation du tiers par défaut" print self.cells elif entry_type == "supplier": third_code = self.cells['s_code_compta_fournisseur'] if third_code == "": third_code = self.pc_default_supplier elif entry_type == "social": third_code = SocialEntry.get_third_code(self.cells['ccs_code']) elif entry_type == "custom": third_code = self.get_custom_code() if third_code == "": print "WARNING: Utilisation du tiers par défaut" print self.cells third_code = self.pc_default_tiers else: print "ERROR: entry_type must be defined" return third_code def get_custom_code(self): fn = settings.get('PC_REFS')['fn_custom_code'] return fn(self.cells) def get_ledger(self): s = "" s += "%(date)s %(description)s\n" % { 'date': self.cells['b_datev'].strftime("%Y/%m/%d"), 'description': self.cells['s_nom'] + " - " + self.cells['b_label'] } amount = self.cells['b_amount'] third_code = self.get_s_code() s += " %(account)s \t %(amount)s\n" % { 'account': settings.get_ledger_account(third_code), 'amount': amount } ba_code = self.cells['ba_account_number'] if ba_code == "": ba_code = self.pc_default_bank s += " %(account)s \t %(amount)s\n" % { 'account': settings.get_ledger_account(ba_code), 'amount': -amount } return s class EntryCollection(object): def __init__(self, index, entry_class): self.collection = {} self.index = index self.entry_class = entry_class def addCollectionEntry(self, row): id = row[self.index] if id not in self.collection: self.collection[id] = self.entry_class(row) self.collection[id].addEntry(row) def get_collection(self): return self.collection.values() def get_by_year(self): by_year = {} for item in self.get_collection(): item_year = item.get_year() if item_year not in by_year: by_year[item_year] = [] by_year[item_year].append(item) return by_year def check_pc(self): pc_missing = set() for item in self.get_collection(): pc_missing.update(item.getMissingPC()) return pc_missing class Sell(Entry): @staticmethod def clean(row): return True def __init__(self, row): super(Sell, self).__init__(row) self.entries = {} def addEntry(self, row): id = row['fd_rowid'] self.entries[id] = SellEntry(row) def get_year(self): return self.get_accounting_year('f_datef') def getMissingPC(self): pc_missing = [] if self.cells['s_code_compta'] == "": pc_missing.append("tiers: %s %s" % (self.cells['s_nom'], self.cells['s_ape'])) for entry in self.entries.values(): if entry.get_product_account_code() == self.pc_default_produit: if self.cells['fd_description'] != "": description = self.cells['fd_description'].splitlines()[0] else: description = self.cells['fd_description'] pc_missing.append("produit: %s %s" % (self.cells['s_nom'], description)) return pc_missing def check(self): total_ttc = self.cells['f_total_ttc'] total_tva = self.cells['f_tva'] total_ht = self.cells['f_total'] for entry in self.entries.values(): total_ttc -= entry.cells['fd_total_ttc'] total_tva -= entry.cells['fd_total_tva'] total_ht -= entry.cells['fd_total_ht'] if total_ttc > 1e-10: print "Erreur dans l'écriture %s: total ttc = %s" % (self.cells['f_facnumber'],total_ttc) if total_ht > 1e-10: print "Erreur dans l'écriture %s: total ht = %s" % (self.cells['f_facnumber'],total_ht) if total_tva > 1e-10: print "Erreur dans l'écriture %s: total tva = %s" % (self.cells['f_facnumber'],total_tva) def get_ledger(self): self.check() s = "" s += "%(date)s %(description)s\n" % { 'date': self.cells['f_datef'].strftime("%Y/%m/%d"), 'description': self.cells['f_facnumber'] + " - " + self.cells['s_nom'], } s_code = self.cells['s_code_compta'] if s_code == "": s_code = self.pc_default_client s += " %(compte_tiers)s %(amount_ttc)s\n" % { 'compte_tiers': settings.get_ledger_account(s_code), 'amount_ttc': -self.cells['f_total_ttc'], } if float(self.cells['f_tva']) != 0: tva_account = settings.get('PC_REFS')['tva_collecte'] s += " %(compte_tva_collecte)s %(amount_tva)s\n" % { 'compte_tva_collecte': settings.get_ledger_account(tva_account), 'amount_tva': self.cells['f_tva'], } for entry in self.entries.values(): p_code = entry.get_product_account_code() s += " %(compte_produit)s %(amount_ht)s\n" % { 'compte_produit': settings.get_ledger_account(p_code), 'amount_ht': entry.cells['fd_total_ht'] } return s class SellEntry(Entry): # ['f_date_lim_reglement', # 's_phone', # 'f_total_ttc', # 'c_code', # 's_zip', # 's_siren', # 's_tva_intra', # 'f_total', # 'f_note_public', # 's_siret', # 'fd_date_start', # 'fd_label', # 'f_facnumber', # 'p_accountancy_code_sell', # 'fd_date_end', # 'f_datec', # 's_idprof4', # 'fd_rowid', # 'fd_total_tva', # 'fd_total_ttc', # 's_nom', # 'f_paye', # 'fd_special_code', # 'f_datef', # 'p_ref', # 's_address', # 's_town', # 'fd_description', # 'fd_product_type', # 's_rowid', # 'fd_subprice', # 'fd_fk_product', # 's_ape', # 's_code_compta_fournisseur', # 'p_label', # 'fd_total_ht', # 'f_tva', # 'fd_qty', # 'fd_tva_tx', # 's_code_compta', # 'f_rowid', # 'f_fk_statut', # 'f_note_private'] def get_product_account_code(self): p_code = self.cells['p_accountancy_code_sell'] if p_code == "": p_code = self.cells['a_account_number'] if p_code == "": p_code = self.pc_default_produit return p_code class Supplier(Entry): @staticmethod def clean(row): return True def __init__(self, row): super(Supplier, self).__init__(row) self.entries = {} def addEntry(self, row): id = row['fd_rowid'] self.entries[id] = SupplierEntry(row) def check(self): total_ttc = self.cells['f_total_ttc'] total_tva = self.cells['f_total_tva'] total_ht = self.cells['f_total_ht'] for entry in self.entries.values(): total_ttc -= entry.cells['fd_total_ttc'] total_tva -= entry.cells['fd_tva'] total_ht -= entry.cells['fd_total_ht'] if total_ttc > 1e-10: print "Erreur dans l'écriture %s: total ttc = %s" % (self.cells['f_ref_supplier'],total_ttc) if total_ht > 1e-10: print "Erreur dans l'écriture %s: total ht = %s" % (self.cells['f_ref_supplier'],total_ht) if total_tva > 1e-10: print "Erreur dans l'écriture %s: total tva = %s" % (self.cells['f_ref_supplier'],total_tva) def get_year(self): return self.get_accounting_year('f_datef') def getMissingPC(self): pc_missing = [] if self.cells['s_code_supplier'] == "": pc_missing.append("tiers: %s %s" % (self.cells['s_nom'], self.cells['s_ape'])) for entry in self.entries.values(): if entry.get_product_account_code() == self.pc_default_charge: pc_missing.append("achat: %s - %s" % (self.cells['f_ref_supplier'], self.cells['fd_description'].splitlines()[0])) return pc_missing def get_ledger(self): self.check() s = "" s += "%(date)s %(description)s\n" % { 'date': self.cells['f_datef'].strftime("%Y/%m/%d"), 'description': self.cells['f_ref_supplier'] + " - " + self.cells['s_nom'], } s_code = self.cells['s_code_supplier'] if s_code == "": s_code = self.pc_default_supplier s += " %(compte_tiers)s %(amount_ttc)s\n" % { 'compte_tiers': settings.get_ledger_account(s_code), 'amount_ttc': self.cells['f_total_ttc'], } if self.cells['f_total_tva'] != 0: if s_code.startswith("2"): tva_account = settings.get('PC_REFS')['tva_deductible'] else: tva_account = settings.get('PC_REFS')['tva_deductible_immo'] s += " %(compte_tva)s %(amount_tva)s\n" % { 'compte_tva': settings.get_ledger_account(tva_account), 'amount_tva': -self.cells['f_total_tva'], } for entry in self.entries.values(): p_code = entry.get_product_account_code() s += " %(compte_produit)s %(amount_ht)s\n" % { 'compte_produit': settings.get_ledger_account(p_code), 'amount_ht': -entry.cells['fd_total_ht'] } return s class SupplierEntry(Entry): # ['f_total_ttc', # 's_phone', # 'c_code', # 'f_total_tva', # 's_zip', # 's_siren', # 's_tva_intra', # 'f_ref', # 'f_note_public', # 's_siret', # 'f_ref_supplier', # 'f_datec', # 's_idprof5', # 's_idprof4', # 's_idprof6', # 'p_accountancy_code_buy', # 'fd_rowid', # 'f_total_ht', # 'fd_total_ttc', # 's_nom', # 'fd_tva', # 'f_paye', # 'f_datef', # 'p_ref', # 's_address', # 's_town', # 'fd_description', # 'fd_product_type', # 's_rowid', # 'fd_fk_product', # 's_ape', # 'p_label', # 'fd_total_ht', # 'f_fk_statut', # 'fd_qty', # 'fd_tva_tx', # 'f_rowid', # 'fd_remise_percent'] def get_product_account_code(self): p_code = self.cells['p_accountancy_code_buy'] if p_code == "": p_code = self.cells['a_account_number'] if p_code == "": p_code = self.pc_default_charge return p_code class SocialEntry(Entry): @staticmethod def get_third_code(code): third_code = None if code in settings.get('SOCIAL_REFS'): third_code = settings.get('SOCIAL_REFS')[code] else: third_code = SocialEntry.pc_default_supplier return third_code @staticmethod def get_s_code(code): s_code = None if code is None: s_code = SocialEntry.pc_default_charge else: s_code = code return s_code @staticmethod def clean(row): return True def addEntry(self, entry): pass def get_year(self): return self.get_accounting_year('c_periode') def getMissingPC(self): pc_missing = [] if self.get_s_code(self.cells['cc_acc_code']) == self.pc_default_charge: pc_missing.append("charges: %s" % (self.cells['cc_libelle'])) if self.get_third_code(self.cells['cc_code']) == self.pc_default_supplier: pc_missing.append("tiers: %s (%s)" % (self.cells['c_libelle'], self.cells['cc_code'])) return pc_missing def get_ledger(self): s = "" s += "%(date)s %(description)s\n" % { 'date': self.cells['c_periode'].strftime("%Y/%m/%d"), 'description': self.cells['c_libelle'] + " - " + self.cells['cc_libelle'] } amount = self.cells['c_amount'] third_code = self.get_third_code(self.cells['cc_code']) s_code = self.get_s_code(self.cells['cc_acc_code']) s += " %(account)s \t %(amount)s\n" % { 'account': settings.get_ledger_account(third_code), 'amount': amount } s += " %(account)s \t %(amount)s\n" % { 'account': settings.get_ledger_account(s_code), 'amount': -amount } return s class Writer(object): output_files = settings.get('OUTPUT_FILES') output_dir = settings.get('OUTPUT_DIR') @staticmethod def write(journal, entries): filename = Writer.output_files[journal] output = os.path.join(Writer.output_dir, filename) entries_by_year = entries.get_by_year() for year in entries_by_year: output_file = output.replace("%year%", year) output_dir = os.path.dirname(output_file) if not os.path.exists(output_dir): os.makedirs(os.path.dirname(output_file)) elif not os.path.isdir(output_dir): print "Error: %s is not a dir\n" % (output_dir) raise os.error() f = codecs.open(output_file, 'w', 'utf-8') for entry in entries_by_year[year]: f.write(entry.get_ledger()) f.write("\n") f.close() @staticmethod def write_hreport_plan(): pc_names = settings.get('PC_NAMES') pc_descriptions = settings.get('PC_DESCRIPTIONS') filename = Writer.output_files['pc'] output_file = os.path.join(Writer.output_dir, filename) f = codecs.open(output_file, 'w', 'utf-8') for pc in sorted(pc_names.keys()): name = pc_names[pc] if pc in pc_descriptions: desc = pc_descriptions[pc] else: desc = name s = "%s %s %s\n" %(name.ljust(80), pc.ljust(12), desc) f.write(s) f.close() class DolibarrSQL(object): def __init__(self, mysql_host, mysql_port, mysql_database, mysql_user, mysql_password): self.mysql_database = mysql_database self.mysql_host = mysql_host self.mysql_password = mysql_password self.mysql_user = mysql_user self.mysql_port = mysql_port self.queries = MYSQL_QUERIES def connect(self): try: self.cnx = mdb.connect(self.mysql_host, self.mysql_user, self.mysql_password, self.mysql_database, port = self.mysql_port) self.cursor = self.cnx.cursor() except mdb.Error, e: print "Error %d: %s" % (e.args[0], e.args[1]) sys.exit(1) def disconnect(self): try: self.cnx.close() except mdb.Error, e: print "Error %d: %s" % (e.args[0], e.args[1]) sys.exit(1) def get_bank_entries(self): return self.get_entries(self.queries['bank'], 'b_rowid', BankEntry) def get_sell_entries(self): return self.get_entries(self.queries['sells'], 'f_facnumber', Sell) def get_supplier_entries(self): return self.get_entries(self.queries['suppliers'], 'f_rowid', Supplier) def get_social_entries(self): return self.get_entries(self.queries['social'], 'c_rowid', SocialEntry) def get_entries(self, query, index, entry_class): cur = self.cnx.cursor(cursorclass=mdb.cursors.DictCursor) cur.execute(query) rows = cur.fetchall() collection = EntryCollection(index, entry_class) for row in rows: if entry_class.clean(row): collection.addCollectionEntry(row) return collection