123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442 |
- from functools import partial
- import itertools
- import urlparse
- import json
- import collections
- from flask import current_app
- from flask.ext.wtf import Form
- from wtforms import Form as InsecureForm
- from wtforms import (TextField, DateField, DecimalField, IntegerField,
- SelectField, SelectMultipleField, FieldList, FormField, BooleanField)
- from wtforms.widgets import (TextInput, ListWidget, html_params, HTMLString,
- CheckboxInput, Select, TextArea)
- from wtforms.validators import (DataRequired, Optional, URL, Email, Length,
- NumberRange, ValidationError, StopValidation)
- from flask.ext.babel import lazy_gettext as _, gettext
- from babel.support import LazyProxy
- from ispformat.validator import validate_geojson
- from .constants import STEPS, IPV6_SUPPORT
- from .models import ISP
- from .utils import check_geojson_spatialite, filesize_fmt
- class InputListWidget(ListWidget):
- def __call__(self, field, **kwargs):
- kwargs.setdefault('id', field.id)
- html = ['<%s %s>' % (self.html_tag, html_params(**kwargs))]
- for subfield in field:
- html.append('<li>%s</li>' % (subfield()))
- html.append('</%s>' % self.html_tag)
- return HTMLString(''.join(html))
- class MultiCheckboxField(SelectMultipleField):
- """
- A multiple-select, except displays a list of checkboxes.
- Iterating the field will produce subfields, allowing custom rendering of
- the enclosed checkbox fields.
- """
- widget = ListWidget(prefix_label=False)
- option_widget = CheckboxInput()
- class MyFormField(FormField):
- @property
- def flattened_errors(self):
- return list(itertools.chain.from_iterable(self.errors.values()))
- class GeoJSONField(TextField):
- widget = TextArea()
- def process_formdata(self, valuelist):
- if valuelist and valuelist[0]:
- max_size = current_app.config['ISP_FORM_GEOJSON_MAX_SIZE']
- if len(valuelist[0]) > max_size:
- raise ValueError(_(u'JSON value too big, must be less than %(max_size)s',
- max_size=filesize_fmt(max_size)))
- try:
- self.data = json.loads(valuelist[0], object_pairs_hook=collections.OrderedDict)
- except Exception as e:
- raise ValueError(_(u'Not a valid JSON value'))
- elif valuelist and valuelist[0].strip() == '':
- self.data = None # if an empty string was passed, set data as None
- def _value(self):
- if self.raw_data:
- return self.raw_data[0]
- elif self.data is not None:
- return json.dumps(self.data)
- else:
- return ''
- def pre_validate(self, form):
- if self.data is not None:
- if not validate_geojson(self.data):
- raise StopValidation(_(u'Invalid GeoJSON, please check it'))
- if not check_geojson_spatialite(self.data):
- current_app.logger.error('Spatialite could not decode the following GeoJSON: %s', self.data)
- raise StopValidation(_(u'Unable to store GeoJSON in database'))
- class Unique(object):
- """ validator that checks field uniqueness """
- def __init__(self, model, field, message=None, allow_edit=False):
- self.model = model
- self.field = field
- if not message:
- message = _(u'this element already exists')
- self.message = message
- def __call__(self, form, field):
- default = field.default() if callable(field.default) else field.default
- if field.object_data != default and field.object_data == field.data:
- return
- check = self.model.query.filter(self.field == field.data).first()
- if check:
- raise ValidationError(self.message)
- TECHNOLOGIES_CHOICES = (
- ('ftth', _('FTTH')),
- ('fttb', _('FTTB')),
- ('dsl', _('DSL')),
- ('cube', _('Internet Cube')),
- ('wifi', _('Wi-Fi')),
- ('vpn', _('VPN')),
- )
- class CoveredArea(InsecureForm):
- name = TextField(_(u'name'), widget=partial(TextInput(), class_='input-medium', placeholder=_(u'Area')))
- technologies = SelectMultipleField(_(u'technologies'), choices=TECHNOLOGIES_CHOICES,
- widget=partial(Select(True), **{'class': 'selectpicker', 'data-title': _(u'Technologies deployed')}))
- area = GeoJSONField(_('area'), widget=partial(TextArea(), class_='geoinput'))
- def validate(self, *args, **kwargs):
- r = super(CoveredArea, self).validate(*args, **kwargs)
- if bool(self.name.data) != bool(self.technologies.data):
- self._fields['name'].errors += [_(u'You must fill both fields')]
- r = False
- return r
- class OtherWebsites(InsecureForm):
- name = TextField(_(u'name'), widget=partial(TextInput(), class_='input-small', placeholder=_(u'Name')))
- url = TextField(_(u'url'), widget=partial(TextInput(), class_='input-medium', placeholder=_(u'URL')),
- validators=[Optional(), URL(require_tld=True)])
- def generate_choices(choices):
- return [(k, LazyProxy(lambda k, s: u'%u - %s' % (k, s), k, choices[k], enable_cache=False))
- for k in choices]
- class ProjectForm(Form):
- name = TextField(_(u'full name'),
- description=[_(u'E.g. French Data Network')],
- validators=[DataRequired(), Length(min=2),
- Unique(ISP, ISP.name)])
- shortname = TextField(_(u'short name'),
- description=[_(u'E.g. FDN')],
- validators=[Optional(), Length(min=2, max=15), Unique(ISP, ISP.shortname)])
- description = TextField(_(u'description'),
- description=[None, _(u'Short text describing the project')])
- logo_url = TextField(_(u'logo url'),
- validators=[Optional(), URL(require_tld=True)])
- website = TextField(_(u'website'),
- validators=[Optional(), URL(require_tld=True)])
- other_websites = FieldList(MyFormField(OtherWebsites,
- widget=partial(InputListWidget(), class_='formfield')),
- min_entries=1, widget=InputListWidget(),
- description=[None, _(u'Additional websites that you host (e.g. wiki, etherpad...)')])
- contact_email = TextField(_(u'contact email'), validators=[Optional(), Email()],
- description=[None, _(u'General contact email address')])
- main_ml = TextField(_(u'main mailing list'), validators=[Optional(), Email()],
- description=[None, _(u'Address of your main mailing list')])
- creation_date = DateField(_(u'creation date'),
- validators=[Optional()],
- widget=partial(TextInput(), placeholder=_(u'YYYY-mm-dd')),
- description=[None, _(u'Date at which the legal structure for your project was created')])
- chatrooms = FieldList(TextField(_(u'chatrooms')), min_entries=1, widget=InputListWidget(),
- description=[None, _(u'In URI form, e.g. <code>irc://irc.isp.net/#isp</code> or ' +
- '<code>xmpp:isp@chat.isp.net?join</code>')])
- covered_areas = FieldList(MyFormField(CoveredArea, _('Covered Areas'),
- widget=partial(InputListWidget(), class_='formfield')),
- min_entries=1, widget=InputListWidget(),
- description=[None, _(u'Descriptive name of the covered areas and technologies deployed')])
- latitude = DecimalField(_(u'latitude'),
- validators=[Optional(), NumberRange(min=-90, max=90)],
- description=[None, _(u'Coordinates of your registered office or usual meeting location. '
- '<strong>Required in order to appear on the map.</strong>')])
- longitude = DecimalField(_(u'longitude'),
- validators=[Optional(), NumberRange(min=-180, max=180)])
- step = SelectField(_(u'progress step'),
- choices=generate_choices(STEPS), coerce=int)
- member_count = IntegerField(_(u'members'),
- validators=[Optional(), NumberRange(min=0)],
- description=[None, _(u'Number of members')])
- tech_email = TextField(_('Email'),
- validators=[Email(), DataRequired()], description=[None,
- _('Technical contact, in case of problems with your submission')])
- ########################################
- # Fields introduced by ISP format V0.2 #
- ########################################
- asn = IntegerField(_(u'asn'), validators=[Optional(), NumberRange(min=0)],
- description=[None, _('Autonomous System (AS) number.')])
- arcep_code = TextField(_(u'arcep code'),
- validators=[Optional()], description=[None,
- _(u'<a href="https://en.wikipedia.org/wiki/Autorit%%C3%%A9_de_R%%C3%%A9gulation_des_Communications_%%C3%%89lectroniques_et_des_Postes">Arcep</a> identifier')])
- #description=[None, _(u'The identifier assigned by the ARCEP')])
- xdsl = IntegerField(_(u'xdsl subscribers'),
- validators=[Optional(), NumberRange(min=0)],
- description=[None, _(u'Number of xdsl subscribers')])
- vpn = IntegerField(_(u'vpn subscribers'),
- validators=[Optional(), NumberRange(min=0)],
- description=[None, _(u'Number of vpn subscribers')])
- wifi = IntegerField(_(u'wifi subscribers'),
- validators=[Optional(), NumberRange(min=0)],
- description=[None, _(u'Number of wifi subscribers')])
- fiber = IntegerField(_(u'fiber subscribers'),
- validators=[Optional(), NumberRange(min=0)],
- description=[None, _(u'Number of fiber subscribers')])
- statutes = TextField(_(u'statutes url',
- validators=[Optional(), URL(require_tld=True)],
- description=[None, _(u'URL pointing to the organisation statutes')]))
- internal_rules = TextField(_(u'internal rules url',
- validators=[Optional(), URL(require_tld=True)],
- description=[_(u'URL pointing to the organisation internal rules'), _(u'URL pointing to the organisation internal rules')]))
- internet_cube = BooleanField(_(u'internet cube'),
- validators=[Optional()],
- description=[None, _(u'Participating to the internetcube project (collective purchases, install parties)')])
- nlnog_participant = BooleanField(_(u'nlnog participant'),
- validators=[Optional()],
- description=[None, _(u'Participating to the NL-NOG RING? See <code>https://ring.nlnog.net/</code>')])
- ipv6_servers = SelectField(_(u'ipv6 servers'),
- choices=generate_choices(IPV6_SUPPORT),
- coerce=int,
- validators=[Optional()],
- description=[None, _(u'Servers can be reached in IPV6?')])
- ipv6_subscribers = SelectField(_(u'ipv6 subscribers'),
- choices=generate_choices(IPV6_SUPPORT), validators=[Optional()],
- coerce=int,
- description=[None, _(u'Subscribers are provided with IPV6 connectivity?')])
- def validate(self, *args, **kwargs):
- r = super(ProjectForm, self).validate(*args, **kwargs)
- if (self.latitude.data is None) != (self.longitude.data is None):
- self._fields['longitude'].errors += [_(u'You must fill both fields')]
- r = False
- return r
- def validate_covered_areas(self, field):
- if len(filter(lambda e: e['name'], field.data)) == 0:
- # not printed, whatever..
- raise ValidationError(_(u'You must specify at least one area'))
- geojson_size = sum([len(ca.area.raw_data[0]) for ca in self.covered_areas if ca.area.raw_data])
- max_size = current_app.config['ISP_FORM_GEOJSON_MAX_SIZE_TOTAL']
- if geojson_size > max_size:
- # TODO: XXX This is not printed !
- raise ValidationError(gettext(u'The size of all GeoJSON data combined must not exceed %(max_size)s',
- max_size=filesize_fmt(max_size)))
- def to_json(self, json=None):
- if json is None:
- json = {}
- json['name'] = self.name.data
- def optobj(k, d):
- filtered_none = {k: v for k,v in d.iteritems() if v != None}
- if filtered_none != {}:
- json[k] = filtered_none
- def optstr(k, v):
- if k in json or v:
- json[k] = v
- def optlist(k, v):
- if k in json or len(v):
- json[k] = v
- def transform_covered_areas(cas):
- for ca in cas:
- if not ca['name']:
- continue
- if 'area' in ca and ca['area'] is None:
- del ca['area']
- yield ca
- optstr('shortname', self.shortname.data)
- optstr('description', self.description.data)
- optstr('logoURL', self.logo_url.data)
- optstr('website', self.website.data)
- optstr('otherWebsites', dict(((w['name'], w['url']) for w in self.other_websites.data if w['name'])))
- optstr('email', self.contact_email.data)
- optstr('mainMailingList', self.main_ml.data)
- optstr('creationDate', self.creation_date.data.isoformat() if self.creation_date.data else None)
- optstr('progressStatus', self.step.data)
- optstr('memberCount', self.member_count.data)
- optlist('chatrooms', filter(bool, self.chatrooms.data)) # remove empty strings
- optstr('coordinates', {'latitude': self.latitude.data, 'longitude': self.longitude.data}
- if self.latitude.data else {})
- optlist('coveredAreas', list(transform_covered_areas(self.covered_areas.data)))
- optstr('asn', self.asn.data)
- optstr('arcepCode', self.arcep_code.data)
- optstr('internetcube', self.internet_cube.data)
- optstr('nlnogParticipant', self.nlnog_participant.data)
- optobj('publicDocuments', {
- 'statutes': self.statutes.data,
- 'internalRules': self.internal_rules.data})
- optobj('subscriberCount', {
- 'xdsl': self.xdsl.data,
- 'vpn': self.vpn.data,
- 'wifi': self.wifi.data,
- 'fiber': self.fiber.data})
- optobj('ipv6Support', {
- 'infra': self.ipv6_servers.data,
- 'subscribers': self.ipv6_subscribers.data})
- return json
- @classmethod
- def edit_json(cls, isp):
- json = isp.json
- obj = type('abject', (object,), {})()
- def set_attr(attr, itemk=None, d=json):
- if itemk is None:
- itemk = attr
- if itemk in d:
- setattr(obj, attr, d[itemk])
- set_attr('name')
- set_attr('shortname')
- set_attr('description')
- set_attr('logo_url', 'logoURL')
- set_attr('website')
- set_attr('contact_email', 'email')
- set_attr('main_ml', 'mainMailingList')
- set_attr('creation_date', 'creationDate')
- if hasattr(obj, 'creation_date'):
- obj.creation_date = ISP.str2date(obj.creation_date)
- set_attr('step', 'progressStatus')
- set_attr('member_count', 'memberCount')
- set_attr('subscriber_count', 'subscriberCount')
- set_attr('chatrooms', 'chatrooms')
- if 'coordinates' in json:
- set_attr('latitude', d=json['coordinates'])
- set_attr('longitude', d=json['coordinates'])
- if 'otherWebsites' in json:
- setattr(obj, 'other_websites', [{'name': n, 'url': w} for n, w in json['otherWebsites'].iteritems()])
- set_attr('covered_areas', 'coveredAreas')
- # V0.2
- set_attr('asn')
- set_attr('arcep_code', 'arcepCode')
- set_attr('internet_cube', 'internetcube')
- set_attr('nlnog_participant', 'nlnogParticipant')
- # TODO: refactor that. I don't know python enough to find it but I'm
- # sure there's a clever trick to make set_attr support nested structures.
- if 'publicDocuments' in json:
- pubd = json['publicDocuments']
- set_attr('statutes', 'statutes', pubd)
- set_attr('internal_rules', 'internalRules', pubd)
- if 'ipv6Support' in json:
- ipd = json['ipv6Support']
- set_attr('ipv6_servers', 'infra', ipd)
- set_attr('ipv6_sibscribers', 'subscribers', ipd)
- if 'subscriberCount' in json:
- scd = json['subscriberCount']
- set_attr('xdsl', None, scd)
- set_attr('vpn', None, scd)
- set_attr('wifi', None, scd)
- set_attr('fiber', None, scd)
- obj.tech_email = isp.tech_email
- return cls(obj=obj)
- class URLField(TextField):
- def _value(self):
- if isinstance(self.data, basestring):
- return self.data
- elif self.data is None:
- return ''
- else:
- return urlparse.urlunsplit(self.data)
- def process_formdata(self, valuelist):
- if valuelist:
- try:
- self.data = urlparse.urlsplit(valuelist[0])
- except:
- self.data = None
- raise ValidationError(_(u'Invalid URL'))
- def is_url_unique(url):
- if isinstance(url, basestring):
- url = urlparse.urlsplit(url)
- t = list(url)
- t[2] = ''
- u1 = urlparse.urlunsplit(t)
- t[0] = 'http' if t[0] == 'https' else 'https'
- u2 = urlparse.urlunsplit(t)
- if ISP.query.filter(ISP.json_url.startswith(u1) | ISP.json_url.startswith(u2)).count() > 0:
- return False
- return True
- class ProjectJSONForm(Form):
- json_url = URLField(_(u'base url'), description=[_(u'E.g. https://isp.com/'),
- _(u'A ressource implementing our JSON-Schema specification ' +
- 'must exist at path /isp.json')])
- tech_email = TextField(_(u'Email'), validators=[Email()], description=[None,
- _(u'Technical contact, in case of problems')])
- def validate_json_url(self, field):
- if not field.data.netloc:
- raise ValidationError(_(u'Invalid URL'))
- if field.data.scheme not in ('http', 'https'):
- raise ValidationError(_(u'Invalid URL (must be HTTP(S))'))
- if not field.object_data and not is_url_unique(field.data):
- raise ValidationError(_(u'This URL is already in our database'))
- class RequestEditToken(Form):
- tech_email = TextField(_(u'Tech Email'), validators=[Email()], description=[None,
- _(u'The Technical contact you provided while registering')])
|