forms.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. from functools import partial
  2. import itertools
  3. import urllib.parse
  4. import json
  5. import collections
  6. from flask import current_app
  7. from flask_wtf import Form
  8. from wtforms import Form as InsecureForm
  9. from wtforms import (TextField, DateField, DecimalField, IntegerField,
  10. SelectField, SelectMultipleField, FieldList, FormField)
  11. from wtforms.widgets import (TextInput, ListWidget, html_params, HTMLString,
  12. CheckboxInput, Select, TextArea)
  13. from wtforms.validators import (DataRequired, Optional, URL, Email, Length,
  14. NumberRange, ValidationError, StopValidation)
  15. from flask_babel import lazy_gettext as _, gettext
  16. from babel.support import LazyProxy
  17. from ispformat.validator import validate_geojson
  18. from .constants import STEPS
  19. from .models import ISP
  20. from .utils import check_geojson_spatialite, filesize_fmt
  21. class InputListWidget(ListWidget):
  22. def __call__(self, field, **kwargs):
  23. kwargs.setdefault('id', field.id)
  24. html = ['<%s %s>' % (self.html_tag, html_params(**kwargs))]
  25. for subfield in field:
  26. html.append('<li>%s</li>' % (subfield()))
  27. html.append('</%s>' % self.html_tag)
  28. return HTMLString(''.join(html))
  29. class MultiCheckboxField(SelectMultipleField):
  30. """
  31. A multiple-select, except displays a list of checkboxes.
  32. Iterating the field will produce subfields, allowing custom rendering of
  33. the enclosed checkbox fields.
  34. """
  35. widget = ListWidget(prefix_label=False)
  36. option_widget = CheckboxInput()
  37. class MyFormField(FormField):
  38. @property
  39. def flattened_errors(self):
  40. return list(itertools.chain.from_iterable(list(self.errors.values())))
  41. class GeoJSONField(TextField):
  42. widget = TextArea()
  43. def process_formdata(self, valuelist):
  44. if valuelist and valuelist[0]:
  45. max_size = current_app.config['ISP_FORM_GEOJSON_MAX_SIZE']
  46. if len(valuelist[0]) > max_size:
  47. raise ValueError(_('JSON value too big, must be less than %(max_size)s',
  48. max_size=filesize_fmt(max_size)))
  49. try:
  50. self.data = json.loads(valuelist[0], object_pairs_hook=collections.OrderedDict)
  51. except Exception as e:
  52. raise ValueError(_('Not a valid JSON value'))
  53. elif valuelist and valuelist[0].strip() == '':
  54. self.data = None # if an empty string was passed, set data as None
  55. def _value(self):
  56. if self.raw_data:
  57. return self.raw_data[0]
  58. elif self.data is not None:
  59. return json.dumps(self.data)
  60. else:
  61. return ''
  62. def pre_validate(self, form):
  63. if self.data is not None:
  64. if not validate_geojson(self.data):
  65. raise StopValidation(_('Invalid GeoJSON, please check it'))
  66. if not check_geojson_spatialite(self.data):
  67. current_app.logger.error('Spatialite could not decode the following GeoJSON: %s', self.data)
  68. raise StopValidation(_('Unable to store GeoJSON in database'))
  69. class Unique(object):
  70. """ validator that checks field uniqueness """
  71. def __init__(self, model, field, message=None, allow_edit=False):
  72. self.model = model
  73. self.field = field
  74. if not message:
  75. message = _('this element already exists')
  76. self.message = message
  77. def __call__(self, form, field):
  78. default = field.default() if callable(field.default) else field.default
  79. if field.object_data != default and field.object_data == field.data:
  80. return
  81. check = self.model.query.filter(self.field == field.data).first()
  82. if check:
  83. raise ValidationError(self.message)
  84. TECHNOLOGIES_CHOICES = (
  85. ('ftth', _('FTTH')),
  86. ('fttb', _('FTTB')),
  87. ('dsl', _('DSL')),
  88. ('cube', _('Internet Cube')),
  89. ('wifi', _('Wi-Fi')),
  90. ('vpn', _('VPN')),
  91. )
  92. class CoveredArea(InsecureForm):
  93. name = TextField(_('name'), widget=partial(TextInput(), class_='input-medium', placeholder=_('Area')))
  94. technologies = SelectMultipleField(_('technologies'), choices=TECHNOLOGIES_CHOICES,
  95. widget=partial(Select(True), **{'class': 'selectpicker', 'data-title': _('Technologies deployed')}))
  96. area = GeoJSONField(_('area'), widget=partial(TextArea(), class_='geoinput'))
  97. def validate(self, *args, **kwargs):
  98. r = super(CoveredArea, self).validate(*args, **kwargs)
  99. if bool(self.name.data) != bool(self.technologies.data):
  100. self._fields['name'].errors += [_('You must fill both fields')]
  101. r = False
  102. return r
  103. class OtherWebsites(InsecureForm):
  104. name = TextField(_('name'), widget=partial(TextInput(), class_='input-small', placeholder=_('Name')))
  105. url = TextField(_('url'), widget=partial(TextInput(), class_='input-medium', placeholder=_('URL')),
  106. validators=[Optional(), URL(require_tld=True)])
  107. STEP_CHOICES = [(k, LazyProxy(lambda k, s: '%u - %s' % (k, s), k, STEPS[k], enable_cache=False)) for k in STEPS]
  108. class ProjectForm(Form):
  109. name = TextField(_('full name'), description=[_('E.g. French Data Network')],
  110. validators=[DataRequired(), Length(min=2), Unique(ISP, ISP.name)])
  111. shortname = TextField(_('short name'), description=[_('E.g. FDN')],
  112. validators=[Optional(), Length(min=2, max=15), Unique(ISP, ISP.shortname)])
  113. description = TextField(_('description'), description=[None, _('Short text describing the project')])
  114. logo_url = TextField(_('logo url'), validators=[Optional(), URL(require_tld=True)])
  115. website = TextField(_('website'), validators=[Optional(), URL(require_tld=True)])
  116. other_websites = FieldList(MyFormField(OtherWebsites, widget=partial(InputListWidget(), class_='formfield')),
  117. min_entries=1, widget=InputListWidget(),
  118. description=[None, _('Additional websites that you host (e.g. wiki, etherpad...)')])
  119. contact_email = TextField(_('contact email'), validators=[Optional(), Email()],
  120. description=[None, _('General contact email address')])
  121. main_ml = TextField(_('main mailing list'), validators=[Optional(), Email()],
  122. description=[None, _('Address of your main mailing list')])
  123. creation_date = DateField(_('creation date'), validators=[Optional()], widget=partial(TextInput(), placeholder=_('YYYY-mm-dd')),
  124. description=[None, _('Date at which the legal structure for your project was created')])
  125. chatrooms = FieldList(TextField(_('chatrooms')), min_entries=1, widget=InputListWidget(),
  126. description=[None, _('In URI form, e.g. <code>irc://irc.isp.net/#isp</code> or ' +
  127. '<code>xmpp:isp@chat.isp.net?join</code>')])
  128. covered_areas = FieldList(MyFormField(CoveredArea, _('Covered Areas'), widget=partial(InputListWidget(), class_='formfield')),
  129. min_entries=1, widget=InputListWidget(),
  130. description=[None, _('Descriptive name of the covered areas and technologies deployed')])
  131. latitude = DecimalField(_('latitude'), validators=[Optional(), NumberRange(min=-90, max=90)],
  132. description=[None, _('Coordinates of your registered office or usual meeting location. '
  133. '<strong>Required in order to appear on the map.</strong>')])
  134. longitude = DecimalField(_('longitude'), validators=[Optional(), NumberRange(min=-180, max=180)])
  135. step = SelectField(_('progress step'), choices=STEP_CHOICES, coerce=int)
  136. member_count = IntegerField(_('members'), validators=[Optional(), NumberRange(min=0)],
  137. description=[None, _('Number of members')])
  138. subscriber_count = IntegerField(_('subscribers'), validators=[Optional(), NumberRange(min=0)],
  139. description=[None, _('Number of subscribers to an internet access')])
  140. tech_email = TextField(_('Email'), validators=[Email(), DataRequired()], description=[None,
  141. _('Technical contact, in case of problems with your submission')])
  142. def validate(self, *args, **kwargs):
  143. r = super(ProjectForm, self).validate(*args, **kwargs)
  144. if (self.latitude.data is None) != (self.longitude.data is None):
  145. self._fields['longitude'].errors += [_('You must fill both fields')]
  146. r = False
  147. return r
  148. def validate_covered_areas(self, field):
  149. if len([e for e in field.data if e['name']]) == 0:
  150. # not printed, whatever..
  151. raise ValidationError(_('You must specify at least one area'))
  152. geojson_size = sum([len(ca.area.raw_data[0]) for ca in self.covered_areas if ca.area.raw_data])
  153. max_size = current_app.config['ISP_FORM_GEOJSON_MAX_SIZE_TOTAL']
  154. if geojson_size > max_size:
  155. # TODO: XXX This is not printed !
  156. raise ValidationError(gettext('The size of all GeoJSON data combined must not exceed %(max_size)s',
  157. max_size=filesize_fmt(max_size)))
  158. def to_json(self, json=None):
  159. if json is None:
  160. json = {}
  161. json['name'] = self.name.data
  162. def optstr(k, v):
  163. if k in json or v:
  164. json[k] = v
  165. def optlist(k, v):
  166. if k in json or len(v):
  167. json[k] = v
  168. def transform_covered_areas(cas):
  169. for ca in cas:
  170. if not ca['name']:
  171. continue
  172. if 'area' in ca and ca['area'] is None:
  173. del ca['area']
  174. yield ca
  175. optstr('shortname', self.shortname.data)
  176. optstr('description', self.description.data)
  177. optstr('logoURL', self.logo_url.data)
  178. optstr('website', self.website.data)
  179. optstr('otherWebsites', dict(((w['name'], w['url']) for w in self.other_websites.data if w['name'])))
  180. optstr('email', self.contact_email.data)
  181. optstr('mainMailingList', self.main_ml.data)
  182. optstr('creationDate', self.creation_date.data.isoformat() if self.creation_date.data else None)
  183. optstr('progressStatus', self.step.data)
  184. optstr('memberCount', self.member_count.data)
  185. optstr('subscriberCount', self.subscriber_count.data)
  186. optlist('chatrooms', list(filter(bool, self.chatrooms.data))) # remove empty strings
  187. optstr('coordinates', {'latitude': self.latitude.data, 'longitude': self.longitude.data}
  188. if self.latitude.data else {})
  189. optlist('coveredAreas', list(transform_covered_areas(self.covered_areas.data)))
  190. return json
  191. @classmethod
  192. def edit_json(cls, isp):
  193. json = isp.json
  194. obj = type('abject', (object,), {})()
  195. def set_attr(attr, itemk=None, d=json):
  196. if itemk is None:
  197. itemk = attr
  198. if itemk in d:
  199. setattr(obj, attr, d[itemk])
  200. set_attr('name')
  201. set_attr('shortname')
  202. set_attr('description')
  203. set_attr('logo_url', 'logoURL')
  204. set_attr('website')
  205. set_attr('contact_email', 'email')
  206. set_attr('main_ml', 'mainMailingList')
  207. set_attr('creation_date', 'creationDate')
  208. if hasattr(obj, 'creation_date'):
  209. obj.creation_date = ISP.str2date(obj.creation_date)
  210. set_attr('step', 'progressStatus')
  211. set_attr('member_count', 'memberCount')
  212. set_attr('subscriber_count', 'subscriberCount')
  213. set_attr('chatrooms', 'chatrooms')
  214. if 'coordinates' in json:
  215. set_attr('latitude', d=json['coordinates'])
  216. set_attr('longitude', d=json['coordinates'])
  217. if 'otherWebsites' in json:
  218. setattr(obj, 'other_websites', [{'name': n, 'url': w} for n, w in json['otherWebsites'].items()])
  219. set_attr('covered_areas', 'coveredAreas')
  220. obj.tech_email = isp.tech_email
  221. return cls(obj=obj)
  222. class URLField(TextField):
  223. def _value(self):
  224. if isinstance(self.data, str):
  225. return self.data
  226. elif self.data is None:
  227. return ''
  228. else:
  229. return urllib.parse.urlunsplit(self.data)
  230. def process_formdata(self, valuelist):
  231. if valuelist:
  232. try:
  233. self.data = urllib.parse.urlsplit(valuelist[0])
  234. except:
  235. self.data = None
  236. raise ValidationError(_('Invalid URL'))
  237. def is_url_unique(url):
  238. if isinstance(url, str):
  239. url = urllib.parse.urlsplit(url)
  240. t = list(url)
  241. t[2] = ''
  242. u1 = urllib.parse.urlunsplit(t)
  243. t[0] = 'http' if t[0] == 'https' else 'https'
  244. u2 = urllib.parse.urlunsplit(t)
  245. if ISP.query.filter(ISP.json_url.startswith(u1) | ISP.json_url.startswith(u2)).count() > 0:
  246. return False
  247. return True
  248. class ProjectJSONForm(Form):
  249. json_url = URLField(_('base url'), description=[_('E.g. https://isp.com/'),
  250. _('A ressource implementing our JSON-Schema specification ' +
  251. 'must exist at path /isp.json')])
  252. tech_email = TextField(_('Email'), validators=[Email()], description=[None,
  253. _('Technical contact, in case of problems')])
  254. def validate_json_url(self, field):
  255. if not field.data.netloc:
  256. raise ValidationError(_('Invalid URL'))
  257. if field.data.scheme not in ('http', 'https'):
  258. raise ValidationError(_('Invalid URL (must be HTTP(S))'))
  259. if not field.object_data and not is_url_unique(field.data):
  260. raise ValidationError(_('This URL is already in our database'))
  261. class RequestEditToken(Form):
  262. tech_email = TextField(_('Tech Email'), validators=[Email()], description=[None,
  263. _('The Technical contact you provided while registering')])