#!/usr/bin/env python # -*- coding: utf-8 -*- from flask import Flask, request, session, g, redirect, url_for, abort, \ render_template, flash from gettext import translation import locale import os current_locale, encoding = locale.getdefaultlocale() locale_path = 'translations/' language = translation('messages', locale_path, [current_locale]) gettext = language.ugettext #from flask_openid import OpenID #from flaskext.babel import Babel, gettext, ngettext import sqlite3 from datetime import date, time, timedelta, datetime import time from contextlib import closing import hashlib import smtplib import string import re from collections import OrderedDict from settings import * app = Flask(__name__) app.config.from_object(__name__) #oid = OpenID(app) #babel = Babel(app) def connect_db(): return sqlite3.connect(app.config['DATABASE']) @app.before_request def before_request(): g.db = connect_db() g.db.execute("PRAGMA foreign_keys = ON") @app.teardown_request def teardown_request(exception): g.db.close() @app.route('/') def home(): return render_template('index.html', active_button="home") def query_db(query, args=(), one=False): cur = g.db.execute(query, args) rv = [dict((cur.description[idx][0], value) for idx, value in enumerate(row)) for row in cur.fetchall()] return (rv[0] if rv else None) if one else rv def init_db(): with closing(connect_db()) as db: with app.open_resource('schema.sql') as f: db.cursor().executescript(f.read()) db.commit() #---------------- # Login / Logout def valid_login(email, password): # get user key user_key = query_db('select key from users where email = ?', (email,), one=True) if not user_key: # no such user return None user_key = user_key['key'] # try password return query_db('select * from users where email = ? and password = ?', [email, crypt(password, user_key)], one=True) def connect_user(user): session['user'] = user del session['user']['password'] del session['user']['key'] def disconnect_user(): session.pop('user', None) def crypt(passwd, user_key): # the per-user salt should not be stored in the db # storing the passwd... but this is better than nothing per_user_salt = hashlib.sha1(user_key).hexdigest() salt_passwd = '%s%s%s' % (app.config['PASSWD_SALT'], per_user_salt, passwd) return hashlib.sha1(salt_passwd).hexdigest() def keygen(): return hashlib.sha1(os.urandom(24)).hexdigest() def get_userid(): user = session.get('user') if user is None: return -1 elif user.get('id') < 0: return -1 else: return user.get('id') #@oid.loginhandler @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': user = valid_login(request.form['username'], request.form['password']) if user is None: if request.form['openid']: return oid.try_login(request.form['openid'], ask_for=['email', 'fullname', 'nickname']) else: flash(u'Email ou mot de passe invalide.', 'error') else: connect_user(user) flash(u'Vous êtes connecté. Bienvenue, %s !' % user['name'], 'success') if request.args.get('continue'): return redirect(request.args['continue']) return redirect(url_for('home')) return render_template('login.html') #@oid.after_login def create_or_login(resp): openid_url = resp.identity_url user = query_db('select * from users where openid = ?', [openid_url], one=True) if user is not None: flash(gettext(u'Successfully signed in')) connect_user(user) return redirect(oid.get_next_url()) return redirect(url_for('home')) @app.route('/logout') def logout(): disconnect_user() flash(u'Vous avez été déconnecté.', 'info') if request.args.get('continue') and not "admin" in request.args.get('continue'): return redirect(request.args['continue']) return redirect(url_for('home')) #----------------- # Change password @app.route('/password/lost', methods=['GET', 'POST']) def password_lost(): info = None if request.method == 'POST': user = query_db('select * from users where email = ?', [request.form['email']], one=True) if user is None: flash('Cet utilisateur n\'existe pas !', 'error') else: key = 'v%s' % keygen() # start with v: valid key g.db.execute('update users set key = ? where id = ?', [key, user['id']]) g.db.commit() link = request.url_root + url_for('login_key', userid=user['id'], key=key) BODY = string.join(( "From: %s" % EMAIL, "To: %s" % user['email'], "Subject: [Cavote] %s" % gettext(u"Lost password"), "Date: %s" % time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime()).decode('utf-8'), "Content-type: text/plain; charset=utf-8", "X-Mailer: %s" % VERSION, "", gettext(u"It seems that you have lost your password."), gettext(u"This link will log you without password."), gettext(u"Don't forget to define a new one as soon a possible!"), gettext(u"This link will only work one time."), "", link, "", gettext(u"If you think this mail is not for you, please ignore and delete it.") ), "\r\n") server = smtplib.SMTP(SMTP_SERVER) server.sendmail(EMAIL, [user['email']], BODY.encode('utf-8')) server.quit() flash(u"Un mail a été envoyé à " + user['email'], 'info') return render_template('password_lost.html') @app.route('/login//') def login_key(userid, key): user = query_db('select * from users where id = ? and key = ?', [userid, key], one=True) if user is None or user['key'][0] != "v": abort(404) else: connect_user(user) flash(u"Veuillez mettre à jour votre mot de passe", 'info') return redirect(url_for('user_password', userid=user['id'])) #--------------- # User settings @app.route('/user/') def user(userid): if int(userid) != get_userid(): abort(401) groups = query_db('select * from groups join user_group on id=id_group where id_user = ?', (userid,)) return render_template('user.html', groups=groups) @app.route('/user/settings/', methods=['GET', 'POST']) def user_edit(userid): if int(userid) != get_userid(): abort(401) if request.method == 'POST': if query_db('select * from users where email=? and id!=?', [request.form['email'], userid], one=True) is None: if query_db('select * from users where name=? and id!=?', [request.form['name'], userid], one=True) is None: g.db.execute('update users set email = ?, openid = ?, name = ?, organization = ? where id = ?', [request.form['email'], request.form['openid'], request.form['name'], request.form['organization'], session['user']['id']]) g.db.commit() disconnect_user() user = query_db('select * from users where id=?', [userid], one=True) if user is None: flash(u'Une erreur s\'est produite.', 'error') return redirect(url_for('login')) connect_user(user) flash(u'Votre profil a été mis à jour !', 'success') else: flash(u'Le nom ' + request.form['name'] + u' est déjà pris ! Veuillez en choisir un autre.', 'error') else: flash(u'Il existe déjà un compte pour cette adresse e-mail : ' + request.form['email'], 'error') return render_template('user_edit.html') @app.route('/user/password/', methods=['GET', 'POST']) def user_password(userid): if int(userid) != get_userid(): abort(401) if request.method == 'POST': if request.form['password'] == request.form['password2']: # new (invalid) key key = 'i%s' % keygen() # start with i: invalid key print "\n\nchange key for %s\n" % key # FIXME TMP g.db.execute('update users set password = ?, key = ? where id = ?', [crypt(request.form['password'], key), key, session['user']['id']]) g.db.commit() flash(u'Votre mot de passe a été mis à jour.', 'success') else: flash(u'Les mots de passe sont différents.', 'error') return render_template('user_edit.html') #------------ # User admin @app.route('/admin/users') def admin_users(): if not session.get('user').get('is_admin'): abort(401) tuples = query_db('select *, groups.name as groupname \ from (select *, id as userid, name as username \ from users join user_group on id=id_user order by id desc) \ join groups on id_group=groups.id') users = dict() for t in tuples: if t['userid'] in users: users[t['userid']]['groups'].append(t["groupname"]) else: users[t['userid']] = dict() users[t['userid']]['userid'] = t['userid'] users[t['userid']]['email'] = t['email'] users[t['userid']]['username'] = t['username'] users[t['userid']]['is_admin'] = t['is_admin'] users[t['userid']]['groups'] = [t['groupname']] return render_template('admin_users.html', users=users.values()) @app.route('/admin/users/add', methods=['GET', 'POST']) def admin_user_add(): if not session.get('user').get('is_admin'): abort(401) if request.method == 'POST': if request.form['email']: if query_db('select * from users where email=?', [request.form['email']], one=True) is None: if request.form['username']: if query_db('select * from users where name=?', [request.form['username']], one=True) is None: admin = 0 if 'admin' in request.form: admin = 1 key = 'v%s' % keygen() g.db.execute('insert into users (email, openid, name, organization, password, is_admin, key) \ values (?, ?, ?, ?, ?, ?, ?)', [request.form['email'], request.form['openid'], request.form['username'], request.form['organization'], '', admin, key]) g.db.commit() user = query_db('select * from users where email = ?', [request.form["email"]], one=True) if user: groups = request.form.getlist('groups') groups.append('1') for group in groups: if query_db('select id from groups where id = ?', group, one=True) is None: flash(u'Le groupe portant l\'id %s n\'existe pas.' % group, 'warning') else: g.db.execute('insert into user_group values (?, ?)', [user['id'], group]) g.db.commit() link = request.url_root + url_for('login_key', userid=user['id'], key=user['key']) BODY = string.join(( "From: %s" % EMAIL, "To: %s" % user['email'], "Subject: [Cavote] %s" % gettext(u"Welcome"), "Date: %s" % time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime()).decode('utf-8'), "Content-type: text/plain; charset=utf-8", "X-Mailer: %s" % VERSION, "", "%(text)s %(user)s!" % {"text": gettext(u"Hi"), "user": user['name']}, "%(text2)s %(title)s." % {"text2": gettext(u"Welcome on"), "title": TITLE}, "%(text3)s %(email)s." % {"text3": gettext(u"Your account address is"), "email": user['email']}, "", gettext(u"To log in for the first time and set your password, please follow this link :"), link, "" ), "\r\n") server = smtplib.SMTP(SMTP_SERVER) server.sendmail(EMAIL, [user['email']], BODY.encode('utf-8')) server.quit() flash(u'Le nouvel utilisateur a été créé avec succès', 'success') return redirect(url_for('admin_users')) else: flash(u'Une erreur s\'est produite.', 'error') else: flash(u'Le nom ' + request.form['username'] + u' est déjà pris ! Veuillez en choisir un autre.', 'error') else: flash(u"Vous devez spécifier un nom d'utilisateur.", 'error') else: flash(u'Il existe déjà un compte pour cette adresse e-mail : ' + request.form['email'], 'error') else: flash(u"Vous devez spécifier une adresse email.", 'error') groups = query_db('select * from groups where system=0') return render_template('admin_user_new.html', groups=groups) @app.route('/admin/users/edit/', methods=['GET', 'POST']) def admin_user_edit(iduser): if not session.get('user').get('is_admin'): abort(401) user = query_db('select * from users where id = ?', [iduser], one=True) user['groups'] = query_db('select groups.* from groups join user_group on groups.id = user_group.id_group where id_user = ?', [iduser]) if user is None: abort(404) if request.method == 'POST': if query_db('select * from users where email=? and id!=?', [request.form['email'], iduser], one=True) is None: if query_db('select * from users where name=? and id!=?', [request.form['name'], iduser], one=True) is None: admin = 0 if 'admin' in request.form: admin = 1 g.db.execute('update users set email = ?, name = ?, organization = ?, openid= ?, is_admin = ? where id = ?', [request.form['email'], request.form['name'], request.form['organization'], request.form['openid'], admin, iduser]) g.db.commit() groups = request.form.getlist('groups') groups.append('1') for group in user['groups']: if not group['id'] in groups: g.db.execute('delete from user_group where id_user = ? and id_group = ?', [iduser, group['id']]) g.db.commit() for group in groups: group = query_db('select id from groups where id = ?', [group], one=True) if group is None: flash(u'Le groupe portant l\'id %s n\'existe pas.' % group, 'warning') else: if not group in user['groups']: g.db.execute('insert into user_group values (?, ?)', [user['id'], group['id']]) g.db.commit() user = query_db('select * from users where id = ?', [iduser], one=True) user['groups'] = query_db('select groups.* from groups \ join user_group on groups.id = user_group.id_group \ where id_user = ?', [iduser]) flash(u'Le profil a été mis à jour !', 'success') else: flash(u'Le nom ' + request.form['name'] + u' est déjà pris ! Veuillez en choisir un autre.', 'error') else: flash(u'Il existe déjà un compte pour cette adresse e-mail : ' + request.form['email'], 'error') groups = query_db('select * from groups where system=0') return render_template('admin_user_edit.html', user=user, groups=groups) @app.route('/admin/users/delete/') def admin_user_del(iduser): if not session.get('user').get('is_admin'): abort(401) user = query_db('select * from users where id = ?', [iduser], one=True) if user is None: abort(404) g.db.execute('delete from users where id = ?', [iduser]) g.db.commit() return redirect(url_for('admin_users')) #------------- # Roles admin @app.route('/admin/groups') def admin_groups(): if not session.get('user').get('is_admin'): abort(401) groups = query_db('select groups.*, count(user_group.id_user) as nb_users \ from (select groups.*, count(votes.id) as nb_votes \ from groups left join votes on votes.id_group = groups.id \ group by groups.id) as groups \ left join user_group on user_group.id_group = groups.id group by groups.id') return render_template('admin_groups.html', groups=groups) @app.route('/admin/groups/add', methods=['POST']) def admin_group_add(): if not session.get('user').get('is_admin'): abort(401) if request.method == 'POST': if request.form['name']: g.db.execute('insert into groups (name) values (?)', [request.form['name']]) g.db.commit() else: flash(u"Vous devez spécifier un nom.", "error") return redirect(url_for('admin_groups')) @app.route('/admin/groups/delete/') def admin_group_del(idgroup): if not session.get('user').get('is_admin'): abort(401) group = query_db('select * from groups where id = ?', [idgroup], one=True) if group is None: abort(404) if group['system']: abort(401) g.db.execute('delete from groups where id = ?', [idgroup]) g.db.commit() return redirect(url_for('admin_groups')) #------------ # Votes list @app.route('/votes/') def votes(votes): today = date.today() active_button = votes max_votes = 'select id_group, count(*) as max_votes from user_group group by id_group' basequery = 'select votes.*, max_votes from votes \ left join (' + max_votes + ') as max_votes on votes.id_group = max_votes.id_group' nb_votes = 'select id_vote, count(*) as nb_votes \ from (select id_user, id_vote \ from user_vote \ group by id_user, id_vote) \ group by id_vote' basequery = 'select * from (' + basequery + ') \ left join (' + nb_votes + ') on id = id_vote' basequery = 'select *, votes.id as voteid, groups.name as groupname from (' + basequery + ') as votes \ join groups on groups.id = id_group \ where is_hidden=0' if votes == 'all': votes = query_db(basequery + ' and is_open=1 order by date_end') elif votes == 'archive': votes = query_db(basequery + ' and is_terminated=1 order by date_end desc') elif votes == 'current': votes = query_db(basequery + ' and is_terminated=0 order by date_end') elif votes == 'waiting': basequery = 'select votes.* from user_group \ join (' + basequery + ') as votes on votes.id_group = user_group.id_group \ where user_group.id_user = ?' already_voted = 'select id_vote from user_vote \ where id_user = ?' votes = query_db(basequery + ' and votes.id not in (' + already_voted + ') and is_terminated=0', [get_userid(), get_userid()]) else: abort(404) for vote in votes: if not vote.get('nb_votes'): vote['nb_votes'] = 0 if vote.get('max_votes'): vote['percent'] = int((float(vote['nb_votes']) / float(vote['max_votes'])) * 100) return render_template('votes.html', votes=votes, active_button=active_button) #------ # Vote def can_see_vote(idvote, iduser=-1): vote = query_db('select * from votes where id=?', [idvote], one=True) if vote is None: return False if not vote['is_public']: user = query_db('select * from users where id=?', [iduser], one=True) if query_db('select * from user_group where id_user = ? and id_group = ?', [iduser, vote['id']], one=True) is None: return False return True def can_vote(idvote, iduser=-1): vote = query_db('select * from votes where id=?', [idvote], one=True) if vote is None: return False if vote['is_terminated'] == 0: if iduser > 0: if can_see_vote(idvote, iduser): if not has_voted(idvote, iduser): if query_db('select * from user_group where id_user = ? and id_group = ?', [iduser, vote['id_group']], one=True): return True return False def has_voted(idvote, iduser=-1): vote = query_db('select * from user_vote \ where id_vote = ? and id_user = ?', [idvote, iduser], one=True) return (vote is not None) @app.route('/vote/', methods=['GET', 'POST']) def vote(idvote): vote = query_db('select votes.*, groups.name as groupname, users.name as author from votes \ join groups on groups.id=votes.id_group \ join users on users.id=votes.id_author \ where votes.id=?', [idvote], one=True) if vote is None: abort(404) if can_see_vote(idvote, get_userid()): choices = query_db('select name, id from choices where id_vote=?', [idvote]) values = query_db('select weight,name from values_ where id_cardinal=? order by weight ASC', [vote['id_cardinal']]) values.insert(0, {'weight':None,'name':u'=X̄ (Pas d’Opinion)'}) for v in values: v['class'] = "value_" + re.sub('[^a-zA-Z0-9-]', '_', v['name']) weights = OrderedDict([(v['weight'], v) for v in values]) if request.method == 'POST': if can_vote(idvote, get_userid()): userid = session.get('user').get('id') if not vote['is_anonymous'] else None # ACTION: store user's choices for choice in choices: if choice['name'] is None: continue weight = request.form.get('value-'+str(choice['id']),None) if weight is not None and len(weight) is 0: weight = None if weight is not None: weight = int(weight) g.db.execute('insert into user_choice (id_user, id_choice, id_cardinal, weight) \ values (?, ?, ?, ?)', [userid, choice['id'], vote['id_cardinal'], weight]) g.db.commit() # ACTION: randomize storage when anonymous votes if vote['is_anonymous']: g.db.execute('delete from user_choice_buffer_anonymous') g.db.execute('insert into user_choice_buffer_anonymous select * \ from user_choice where id_choice in (%s)' % ','.join(['?'] * len(choices)) , tuple(c['id'] for c in choices)) g.db.execute('delete from user_choice where id_choice in (%s)' % ','.join(['?'] * len(choices)) , tuple(c['id'] for c in choices)) g.db.execute('insert into user_choice select * \ from user_choice_buffer_anonymous \ order by random()') g.db.execute('delete from user_choice_buffer_anonymous') g.db.commit() comment = request.form.get('comment', None) g.db.execute('insert into user_vote (id_user, id_vote, comment) \ values (?, ?, ?)' , [session.get('user').get('id'), vote['id'], comment]) g.db.commit() else: abort(401) # ACTION: count quorum numbers max_votes = query_db('select id_group, count(*) as nb \ from user_group where id_group = ? \ group by id_group', [vote['id_group']], one=True) if max_votes is None: vote['percent'] = 0 vote['nb_votes'] = 0 else: vote['max_votes'] = max_votes['nb'] votes = query_db('select id_vote, count(*) as nb \ from (select id_user, id_vote from user_vote \ group by id_user, id_vote) \ where id_vote = ? group by id_vote', [idvote], one=True) if votes is None: vote['percent'] = 0 vote['nb_votes'] = 0 else: vote['nb_votes'] = votes['nb'] vote['percent'] = int((float(vote['nb_votes']) / float(vote['max_votes'])) * 100) # ACTION: query users' choices joined with users' identity if not anonymous user_choices = query_db('select user_choice.id_user as userid, users.name as username, \ choices.id as choiceid, choices.name as choice_name, \ user_choice.weight as weight, user_vote.comment as comment \ from choices \ join user_choice on choices.id = user_choice.id_choice \ left join users on userid = users.id \ left join user_vote on userid = user_vote.id_user and choices.id_vote = user_vote.id_vote \ where choices.id_vote = ? \ order by user_vote.date,choices.id', [idvote]) # ACTION: aggregate user choices per vote vote['blank'] = 0 results = OrderedDict() for c in choices: choice_values = [(w, { 'nb':0 , 'idx':i , 'name':weights[w]['name'] , 'class':weights[w]['class'] } ) for (i,w) in enumerate(weights, start=0)] choice_values = OrderedDict(choice_values) results[c['id']] = { 'id':c['id'] , 'name':c['name'] , 'sum':0 , 'average':0.0 , 'nb':0 , 'blank':0 , 'values_':choice_values } for uc in user_choices: results[uc['choiceid']]['nb'] += 1 results[uc['choiceid']]['values_'][uc['weight']]['nb'] += 1 if uc['weight'] is None: results[uc['choiceid']]['blank'] += 1 vote['blank'] += 1 else: results[uc['choiceid']]['sum'] += uc['weight'] for c in results: if results[c]['nb'] - results[c]['blank'] != 0: results[c]['average'] = results[c]['average'] + (float(results[c]['sum']) / float(results[c]['nb'] - results[c]['blank'])) previous_percent = 0 for w in weights: if results[c]['nb'] > 0: percent = float(results[c]['values_'][w]['nb'] * 100) / results[c]['nb'] else: percent = 0. results[c]['values_'][w]['percent'] = percent results[c]['values_'][w]['previous_percent'] = previous_percent previous_percent += percent results[c]['values_'] = results[c]['values_'].values() results = sorted(results.values(), key=lambda c: c['average'], reverse=True) len_results = len(results) if len_results % 2 == 0: medians = results[len_results/2-1:len_results/2+1] else: medians = [results[len_results/2]] results = { 'list':results , 'medians':[m['id'] for m in medians] , 'average':sum([r['sum'] for r in results])/len_results } # ACTION: list user results per user users = OrderedDict() if vote['is_anonymous']: user_votes = query_db('select users.name, id_user as userid, comment \ from user_vote \ join users on users.id = id_user where id_vote = ?', [idvote]) for uc in user_votes: users[uc['userid']] = { 'username':uc['name'] , 'comment':uc['comment'] , 'choices':{} , 'userid':uc['userid'] } else: for uc in user_choices: weight = uc['weight'] value = { 'weight':weight , 'name':weights[weight]['name'] , 'class':weights[weight]['class'] } if uc['userid'] in users: users[uc['userid']]['choices'][uc['choiceid']] = value else: users[uc['userid']] = { 'userid':uc['userid'] , 'username':uc['username'] , 'comment':uc['comment'] , 'choices':{uc['choiceid']:value} } attachments = query_db('select * from attachments where id_vote=?', [idvote]) if query_db('select * from user_group where id_group = ? and id_user = ?' , [vote['id_group'], get_userid()], one=True) and not vote['is_terminated']: flash(u'Ce vote vous concerne !', 'info') return render_template('vote.html', vote=vote, attachments=attachments , values=values, choices=choices, results=results, users=users.values() , can_vote=can_vote(idvote, get_userid())) flash(u'Vous n\'avez pas le droit de voir ce vote, désolé.') return redirect(url_for('home')) @app.route('/vote/deletechoices//') def vote_deletechoices(idvote, iduser): if int(iduser) != get_userid(): abort(401) vote = query_db('select votes.* from votes \ where votes.id=?', [idvote], one=True) if not vote['is_terminated'] and not vote['is_anonymous']: g.db.execute('delete from user_choice where id_user = ? and id_choice \ in (select id from choices where id_vote = ?)' , [iduser, idvote]) g.db.commit() g.db.execute('delete from user_vote where id_user = ? and id_vote \ in (select id_vote from choices where id_vote = ?)' , [iduser, idvote]) g.db.commit() return redirect(url_for('vote', idvote=idvote)) #------------- # Votes admin @app.route('/admin/votes/list') def admin_votes(): if not session.get('user').get('is_admin'): abort(401) votes = query_db('select *, votes.id as voteid, groups.name as groupname from votes \ join groups on groups.id=votes.id_group \ where is_hidden=0 order by id desc') return render_template('admin_votes.html', votes=votes , today=date.today().strftime("%Y-%m-%d") , can_delete_votes=CAN_DELETE_VOTES ) @app.route('/admin/votes/add', methods=['GET', 'POST']) def admin_vote_add(): if not session.get('user').get('is_admin'): abort(401) cardinals= OrderedDict([(len(values), {'name':name,'values':values,'first':first}) for (name, first, values) in CARDINALS]) if request.method == 'POST': if request.form['title']: if query_db('select * from votes where title = ?', [request.form['title']], one=True) is None: date_begin = date.today() date_end = date.today() + timedelta(days=int(request.form['days'])) transparent = 0 public = 0 anonymous = 0 if 'transparent' in request.form: transparent = 1 if 'public' in request.form: public = 1 if 'anonymous' in request.form: anonymous = 1 try: quorum = float(request.form.get('quorum')) except ValueError: quorum = 0 if not (0 <= quorum and quorum <= 1): flash(u'Une erreur est survenue !', 'error') group = query_db('select id from groups where name = ?', [request.form['group']], one=True) if group is None: group[id] = 1 try: cardinal = int(request.form.get('cardinal')) except ValueError: cardinal = None if cardinal in cardinals: cardinal_name = cardinals[cardinal]['name'] cardinal_values = cardinals[cardinal]['values'] weight = cardinals[cardinal]['first'] if not cardinals[cardinal]['first'] is None else -(cardinal/2) if query_db('select * from cardinals where id = ?', [cardinal], one=True) is None: g.db.execute('insert into cardinals (id, name, first) values (?, ?, ?)', [len(cardinal_values), cardinal_name, weight]) g.db.commit() for name in cardinal_values: g.db.execute('insert into values_ (id_cardinal, name, weight) values (?, ?, ?)' , [cardinal, name, weight]) g.db.commit() weight += 1 g.db.execute('insert into votes (title, description, category, \ date_begin, date_end, quorum, is_transparent, is_public, \ is_anonymous, id_group, id_author, id_cardinal) \ values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', [ request.form['title'], request.form['description'], request.form['category'] , date_begin, date_end, quorum, transparent, public, anonymous , group['id'], session['user']['id'], cardinal ]) g.db.commit() vote = query_db('select * from votes where title = ? and date_begin = ? order by id desc' , [request.form['title'], date_begin], one=True) if vote is None: flash(u'Une erreur est survenue !', 'error') return redirect(url_for('home')) else: if request.form['pattern'] in PATTERNS: pattern = PATTERNS[request.form['pattern']] for choice in pattern: g.db.execute('insert into choices (name, id_vote) values (?, ?)', [choice, vote['id']]) g.db.commit() flash(u"Le vote a été créé", 'info') return redirect(url_for('admin_vote_edit', voteid=vote['id'])) else: flash(u'Le titre que vous avez choisi est déjà pris.', 'error') else: flash(u'Vous devez spécifier un titre.', 'error') groups = query_db('select * from groups') return render_template('admin_vote_new.html', groups=groups, cardinals=cardinals , quorums=QUORUMS, patterns=PATTERNS) @app.route('/admin/votes/edit/', methods=['GET', 'POST']) def admin_vote_edit(voteid): if not session.get('user').get('is_admin'): abort(401) vote = query_db('select * from votes where id = ?', [voteid], one=True) if vote is None: abort(404) if request.method == 'POST': if request.form['title']: if request.form['days'] > 0: date_end = datetime.strptime(vote['date_begin'], "%Y-%m-%d") + timedelta(days=int(request.form['days'])) date_end = date_end.strftime("%Y-%m-%d") transparent = 0 public = 0 if 'transparent' in request.form: transparent = 1 if 'public' in request.form: public = 1 isopen = 0 isterminated = 0 print "POST" if request.form['status'] == 'Ouvert': choices = query_db('select id_vote, count(*) as nb, groups.name as group_name \ from choices \ join votes on votes.id = choices.id_vote \ join groups on groups.id = votes.id_group \ where id_vote = ? \ group by id_vote', [voteid], one=True) if choices is not None and choices['nb'] >= 1: isopen = 1 previousvote = query_db('select id, is_open, id_group from votes where id = ?', [voteid], one=True) if (previousvote is None or previousvote['is_open'] == 0) and 'mail_notice' in request.form: users_to_vote = query_db('select users.email, users.name from users \ join user_group on users.id=user_group.id_user \ where user_group.id_group = ?', [previousvote['id_group']]) for user in users_to_vote: link = request.url_root + url_for('vote', idvote=voteid) BODY = string.join(( u"From: %s" % EMAIL, u"To: %s" % user['email'], u"Subject: [vote] [%s] %s" % (choices['group_name'], request.form['title']), u"Date: %s" % time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime()).decode('utf-8'), u"Content-type: text/plain; charset=utf-8", u"X-Mailer: %s" % VERSION, "", u"%(text0)s%(group)s" % \ { "text0":gettext(u"A new vote concerns you within the group: ") \ , "group":choices['group_name'] }, \ link, "", gettext(u"If you think this mail is not for you, please ignore and delete it."), gettext(u"For more informations, you can contact:"), EMAIL ), "\r\n") server = smtplib.SMTP(SMTP_SERVER) server.sendmail(EMAIL, [user['email']], BODY.encode('utf-8')) server.quit() else: flash(u'Vous devez proposer au moins un choix pour ouvrir le vote.', 'error') elif request.form['status'] == u'Terminé': isterminated = 1 if vote['is_open']: isopen = 1 g.db.execute('update votes set title = ?, description = ?, category = ?, quorum = ?, \ is_transparent = ?, is_public = ?, is_open = ?, is_terminated = ?, \ date_end = ?, reminder_last_days = ? where id = ?', [ request.form['title'], request.form['description'], request.form['category'], request.form['quorum'] , transparent, public, isopen, isterminated, date_end, request.form['reminder'], voteid ]) g.db.commit() vote = query_db('select * from votes where id = ?', [voteid], one=True) flash(u"Le vote a bien été mis à jour.", "success") else: flash(u'Vous devez spécifier un titre.', 'error') vote['duration'] = (datetime.strptime(vote['date_end'], "%Y-%m-%d") - datetime.strptime(vote['date_begin'], "%Y-%m-%d")).days group = query_db('select name from groups where id = ?', [vote['id_group']], one=True) choices = query_db('select * from choices where id_vote = ?', [voteid]) values_ = query_db('select * from cardinals where id = ?', [vote['id_cardinal']], one=True)['name'] attachments = query_db('select * from attachments where id_vote = ?', [voteid]) if date.today().strftime("%Y-%m-%d") > vote['date_end'] and not vote['is_terminated']: flash(u'La deadline du vote est expirée, vous devriez terminer le vote.') return render_template('admin_vote_edit.html', vote=vote, group=group, values_=values_, choices=choices, attachments=attachments, quorums=QUORUMS) @app.route('/admin/votes/delete/') def admin_vote_del(idvote): if not session.get('user').get('is_admin'): abort(401) if not CAN_DELETE_VOTES: flash(u'La configuration interdit la suppression des votes.', 'error') else: if vote is None: abort(404) g.db.execute('update votes set is_hidden=1 where id = ?', [idvote]) g.db.commit() return redirect(url_for('admin_votes')) @app.route('/admin/votes/addchoice/', methods=['POST']) def admin_vote_addchoice(voteid): if not session.get('user').get('is_admin'): abort(401) vote = query_db('select * from votes where id = ?', [voteid], one=True) if vote is None: abort(404) g.db.execute('insert into choices (name, id_vote) values (?, ?)', [request.form['title'], voteid]) g.db.commit() return redirect(url_for('admin_vote_edit', voteid=voteid)) @app.route('/admin/votes/editchoice//', methods=['POST', 'DELETE']) def admin_vote_editchoice(voteid, choiceid): if not session.get('user').get('is_admin'): abort(401) choice = query_db('select * from choices where id = ? and id_vote = ?', [choiceid, voteid], one=True) if choice is None: abort(404) if request.method == 'POST': g.db.execute('update choices set name=? where id = ? and id_vote = ?', [request.form['title'], choiceid, voteid]) g.db.commit() return redirect(url_for('admin_vote_edit', voteid=voteid)) @app.route('/admin/votes/deletechoice//') def admin_vote_deletechoice(voteid, choiceid): if not session.get('user').get('is_admin'): abort(401) choice = query_db('select * from choices where id = ? and id_vote = ?', [choiceid, voteid], one=True) if choice is None: abort(404) g.db.execute('delete from choices where id = ? and id_vote = ?', [choiceid, voteid]) g.db.commit() choices = query_db('select id_vote, count(*) as nb \ from choices where id_vote = ? \ group by id_vote', [voteid], one=True) if choices is None or choices['nb'] < 2: g.db.execute('update votes set is_open=0 where id = ?', [voteid]) g.db.commit() flash(u'Attention ! Il y a moins de deux choix. Le vote a été fermé.', 'error') return redirect(url_for('admin_vote_edit', voteid=voteid)) @app.route('/admin/votes/addattachment/', methods=['POST']) def admin_vote_addattachment(voteid): if not session.get('user').get('is_admin'): abort(401) vote = query_db('select * from votes where id = ?', [voteid], one=True) if vote is None: abort(404) g.db.execute('insert into attachments (url, id_vote) values (?, ?)', [request.form['url'], voteid]) g.db.commit() return redirect(url_for('admin_vote_edit', voteid=voteid)) @app.route('/admin/votes/deleteattachment//') def admin_vote_deleteattachment(voteid, attachmentid): if not session.get('user').get('is_admin'): abort(401) attachment = query_db('select * from attachments where id = ? and id_vote = ?', [attachmentid, voteid], one=True) if attachment is None: abort(404) g.db.execute('delete from attachments where id = ? and id_vote = ?', [attachmentid, voteid]) g.db.commit() return redirect(url_for('admin_vote_edit', voteid=voteid)) #------ # Main if __name__ == '__main__': app.run()