#!/usr/bin/python3 # Style Guide for Python Code https://www.python.org/dev/peps/pep-0008/ # Docstring Conventions https://www.python.org/dev/peps/pep-0257/ import psycopg2.sql as sql # >= 2.7 class Instance: def __init__(self, conn): """Create an instance Arguments: conn - Database connection (psycopg2)""" self._conn = conn self._cur = conn.cursor() def get_adherent(self, fields=None, extrafields=None, activeonly=True): if fields is None: fields = ["firstname", "lastname", "email"] if extrafields is None: extrafields = [] if activeonly: sqlcond = sql.SQL('adh.statut = 1') else: sqlcond = sql.SQL('1 = 1') # Prepare query sqlfields = [sql.SQL('adh.') + sql.Identifier(a) for a in fields] + [sql.SQL('adhx.') + sql.Identifier(b) for b in extrafields] sqlfields = sql.SQL(',').join(sqlfields) sqljoin = sql.SQL('LEFT OUTER JOIN llx_adherent_extrafields AS adhx ON (adh.rowid = adhx.fk_object)') sqlquery = sql.SQL("SELECT {0} FROM llx_adherent AS adh {1} WHERE {2}").format(sqlfields, sqljoin, sqlcond) self._cur.execute(sqlquery.as_string(self._cur)) return self._cur def get_adherent_count(self, activeonly=True): if activeonly: sqlcond = sql.SQL('adh.statut = 1') else: sqlcond = sql.SQL('1 = 1') # Prepare query sqlquery = sql.SQL("SELECT COUNT(*) FROM llx_adherent WHERE {2}").format(sqlcond) # Execute query self._cur.execute(sqlquery) return self._cur.fetchone()[0] def get_subscriber(self, productRef, fields=None, activeonly=True): # Prepare query if fields is None: fields = ['email'] sqlfields = [sql.SQL('llx_societe.') + sql.Identifier(field) for field in fields] sqlfields = sql.SQL(',').join(sqlfields) sqlcond = sql.SQL("""llx_societe.rowid=llx_contrat.fk_soc AND llx_contrat.rowid=llx_contratdet.fk_contrat AND llx_product.rowid=llx_contratdet.fk_product AND llx_product.ref LIKE {0} AND llx_contratdet.statut=4""").format(sql.Literal(productRef)) if activeonly: sqlcond += sql.SQL('AND llx_contrat.statut=1') sqlquery = sql.SQL("""SELECT {0} FROM llx_societe, llx_contrat, llx_contratdet, llx_product WHERE {1} ORDER BY llx_product.ref, llx_contrat.rowid ASC""").format(sqlfields, sqlcond) self._cur.execute(sqlquery.as_string(self._cur)) return self._cur def get_bank_accounts(self, fields=None): # Prepare query if fields is None: fields = ['bank', 'proprio', 'iban_prefix'] sqlquery = sql.SQL('SELECT {0} FROM llx_bank_account WHERE courant = 1 AND clos = 0'.format(','.join(fields))) self._cur.execute(sqlquery.as_string(self._cur)) return self._cur def get_users(self, fields=None, active_user=True, active_member=None): if fields is None: fields = ["firstname", "lastname", "email"] sqlconds = [] if active_user == True: sqlconds.append(sql.SQL('usr.statut=1')) elif active_user == False: sqlconds.append(sql.SQL('usr.statut=0')) if active_member == True: sqlconds.append(sql.SQL('usr.fk_member=member.rowid AND member.statut=1')) elif active_member == False: cond = sql.SQL("""( usr.fk_member is NULL OR ( usr.fk_member=member.rowid AND member.statut=0 ) )""") sqlconds.append(cond) sqlcond = sql.SQL('') if len(sqlconds) != 0: sqlcond = sql.SQL(' WHERE ') + sql.SQL(' AND ').join(sqlconds) sqlfields = [sql.SQL('usr.') + sql.Identifier(a) for a in fields] sqlfields = sql.SQL(',').join(sqlfields) sqlquery = sql.SQL('SELECT DISTINCT ON (usr.rowid) ') + sqlfields + sql.SQL(' FROM llx_user AS usr, llx_adherent AS member') + sqlcond self._cur.execute(sqlquery.as_string(self._cur)) return self._cur