views.py 25 KB


  1. from collections import OrderedDict
  2. from django_tables2 import RequestConfig
  3. from django.conf import settings
  4. from django.contrib import messages
  5. from django.contrib.contenttypes.models import ContentType
  6. from django.core.exceptions import ValidationError
  7. from django.db import transaction, IntegrityError
  8. from django.db.models import ProtectedError
  9. from django.forms import CharField, ModelMultipleChoiceField, MultipleHiddenInput, TypedChoiceField
  10. from django.http import HttpResponse
  11. from django.shortcuts import get_object_or_404, redirect, render
  12. from django.template import TemplateSyntaxError
  13. from django.urls import reverse
  14. from django.utils.html import escape
  15. from django.utils.http import is_safe_url
  16. from django.utils.safestring import mark_safe
  17. from django.views.generic import View
  18. from extras.models import CustomField, CustomFieldValue, ExportTemplate, UserAction
  19. from .error_handlers import handle_protectederror
  20. from .forms import ConfirmationForm
  21. from .paginator import EnhancedPaginator
  22. class CustomFieldQueryset:
  23. """
  24. Annotate custom fields on objects within a QuerySet.
  25. """
  26. def __init__(self, queryset, custom_fields):
  27. self.queryset = queryset
  28. self.custom_fields = custom_fields
  29. def __iter__(self):
  30. for obj in self.queryset:
  31. values_dict = {cfv.field_id: cfv.value for cfv in obj.custom_field_values.all()}
  32. obj.custom_fields = OrderedDict([(field, values_dict.get(field.pk)) for field in self.custom_fields])
  33. yield obj
  34. class GetReturnURLMixin(object):
  35. """
  36. Provides logic for determining where a user should be redirected after processing a form.
  37. """
  38. default_return_url = None
  39. def get_return_url(self, request, obj):
  40. query_param = request.GET.get('return_url')
  41. if query_param and is_safe_url(url=query_param, host=request.get_host()):
  42. return query_param
  43. elif obj.pk and hasattr(obj, 'get_absolute_url'):
  44. return obj.get_absolute_url()
  45. elif self.default_return_url is not None:
  46. return reverse(self.default_return_url)
  47. return reverse('home')
  48. class ObjectListView(View):
  49. """
  50. List a series of objects.
  51. queryset: The queryset of objects to display
  52. filter: A django-filter FilterSet that is applied to the queryset
  53. filter_form: The form used to render filter options
  54. table: The django-tables2 Table used to render the objects list
  55. template_name: The name of the template
  56. """
  57. queryset = None
  58. filter = None
  59. filter_form = None
  60. table = None
  61. template_name = None
  62. def get(self, request):
  63. model = self.queryset.model
  64. object_ct = ContentType.objects.get_for_model(model)
  65. if self.filter:
  66. self.queryset = self.filter(request.GET, self.queryset).qs
  67. # If this type of object has one or more custom fields, prefetch any relevant custom field values
  68. custom_fields = CustomField.objects.filter(obj_type=ContentType.objects.get_for_model(model))\
  69. .prefetch_related('choices')
  70. if custom_fields:
  71. self.queryset = self.queryset.prefetch_related('custom_field_values')
  72. # Check for export template rendering
  73. if request.GET.get('export'):
  74. et = get_object_or_404(ExportTemplate, content_type=object_ct, name=request.GET.get('export'))
  75. queryset = CustomFieldQueryset(self.queryset, custom_fields) if custom_fields else self.queryset
  76. try:
  77. response = et.to_response(context_dict={'queryset': queryset},
  78. filename='netbox_{}'.format(model._meta.verbose_name_plural))
  79. return response
  80. except TemplateSyntaxError:
  81. messages.error(request, u"There was an error rendering the selected export template ({})."
  82. .format(et.name))
  83. # Fall back to built-in CSV export
  84. elif 'export' in request.GET and hasattr(model, 'to_csv'):
  85. output = '\n'.join([obj.to_csv() for obj in self.queryset])
  86. response = HttpResponse(
  87. output,
  88. content_type='text/csv'
  89. )
  90. response['Content-Disposition'] = 'attachment; filename="netbox_{}.csv"'\
  91. .format(self.queryset.model._meta.verbose_name_plural)
  92. return response
  93. # Provide a hook to tweak the queryset based on the request immediately prior to rendering the object list
  94. self.queryset = self.alter_queryset(request)
  95. # Compile user model permissions for access from within the template
  96. perm_base_name = '{}.{{}}_{}'.format(model._meta.app_label, model._meta.model_name)
  97. permissions = {p: request.user.has_perm(perm_base_name.format(p)) for p in ['add', 'change', 'delete']}
  98. # Construct the table based on the user's permissions
  99. table = self.table(self.queryset)
  100. if 'pk' in table.base_columns and (permissions['change'] or permissions['delete']):
  101. table.base_columns['pk'].visible = True
  102. # Apply the request context
  103. paginate = {
  104. 'klass': EnhancedPaginator,
  105. 'per_page': request.GET.get('per_page', settings.PAGINATE_COUNT)
  106. }
  107. RequestConfig(request, paginate).configure(table)
  108. context = {
  109. 'table': table,
  110. 'permissions': permissions,
  111. 'filter_form': self.filter_form(request.GET, label_suffix='') if self.filter_form else None,
  112. 'export_templates': ExportTemplate.objects.filter(content_type=object_ct),
  113. }
  114. context.update(self.extra_context())
  115. return render(request, self.template_name, context)
  116. def alter_queryset(self, request):
  117. # .all() is necessary to avoid caching queries
  118. return self.queryset.all()
  119. def extra_context(self):
  120. return {}
  121. class ObjectEditView(GetReturnURLMixin, View):
  122. """
  123. Create or edit a single object.
  124. model: The model of the object being edited
  125. form_class: The form used to create or edit the object
  126. template_name: The name of the template
  127. default_return_url: The name of the URL used to display a list of this object type
  128. """
  129. model = None
  130. form_class = None
  131. template_name = 'utilities/obj_edit.html'
  132. def get_object(self, kwargs):
  133. # Look up object by slug or PK. Return None if neither was provided.
  134. if 'slug' in kwargs:
  135. return get_object_or_404(self.model, slug=kwargs['slug'])
  136. elif 'pk' in kwargs:
  137. return get_object_or_404(self.model, pk=kwargs['pk'])
  138. return self.model()
  139. def alter_obj(self, obj, request, url_args, url_kwargs):
  140. # Allow views to add extra info to an object before it is processed. For example, a parent object can be defined
  141. # given some parameter from the request URL.
  142. return obj
  143. def get(self, request, *args, **kwargs):
  144. obj = self.get_object(kwargs)
  145. obj = self.alter_obj(obj, request, args, kwargs)
  146. # Parse initial data manually to avoid setting field values as lists
  147. initial_data = {k: request.GET[k] for k in request.GET}
  148. form = self.form_class(instance=obj, initial=initial_data)
  149. return render(request, self.template_name, {
  150. 'obj': obj,
  151. 'obj_type': self.model._meta.verbose_name,
  152. 'form': form,
  153. 'return_url': self.get_return_url(request, obj),
  154. })
  155. def post(self, request, *args, **kwargs):
  156. obj = self.get_object(kwargs)
  157. obj = self.alter_obj(obj, request, args, kwargs)
  158. form = self.form_class(request.POST, request.FILES, instance=obj)
  159. if form.is_valid():
  160. obj_created = not form.instance.pk
  161. obj = form.save()
  162. msg = u'Created ' if obj_created else u'Modified '
  163. msg += self.model._meta.verbose_name
  164. if hasattr(obj, 'get_absolute_url'):
  165. msg = u'{} <a href="{}">{}</a>'.format(msg, obj.get_absolute_url(), escape(obj))
  166. else:
  167. msg = u'{} {}'.format(msg, escape(obj))
  168. messages.success(request, mark_safe(msg))
  169. if obj_created:
  170. UserAction.objects.log_create(request.user, obj, msg)
  171. else:
  172. UserAction.objects.log_edit(request.user, obj, msg)
  173. if '_addanother' in request.POST:
  174. return redirect(request.path)
  175. return_url = form.cleaned_data.get('return_url')
  176. if return_url is not None and is_safe_url(url=return_url, host=request.get_host()):
  177. return redirect(return_url)
  178. else:
  179. return redirect(self.get_return_url(request, obj))
  180. return render(request, self.template_name, {
  181. 'obj': obj,
  182. 'obj_type': self.model._meta.verbose_name,
  183. 'form': form,
  184. 'return_url': self.get_return_url(request, obj),
  185. })
  186. class ObjectDeleteView(GetReturnURLMixin, View):
  187. """
  188. Delete a single object.
  189. model: The model of the object being edited
  190. template_name: The name of the template
  191. default_return_url: Name of the URL to which the user is redirected after deleting the object
  192. """
  193. model = None
  194. template_name = 'utilities/obj_delete.html'
  195. def get_object(self, kwargs):
  196. # Look up object by slug if one has been provided. Otherwise, use PK.
  197. if 'slug' in kwargs:
  198. return get_object_or_404(self.model, slug=kwargs['slug'])
  199. else:
  200. return get_object_or_404(self.model, pk=kwargs['pk'])
  201. def get(self, request, **kwargs):
  202. obj = self.get_object(kwargs)
  203. form = ConfirmationForm(initial=request.GET)
  204. return render(request, self.template_name, {
  205. 'obj': obj,
  206. 'form': form,
  207. 'obj_type': self.model._meta.verbose_name,
  208. 'return_url': self.get_return_url(request, obj),
  209. })
  210. def post(self, request, **kwargs):
  211. obj = self.get_object(kwargs)
  212. form = ConfirmationForm(request.POST)
  213. if form.is_valid():
  214. try:
  215. obj.delete()
  216. except ProtectedError as e:
  217. handle_protectederror(obj, request, e)
  218. return redirect(obj.get_absolute_url())
  219. msg = u'Deleted {} {}'.format(self.model._meta.verbose_name, obj)
  220. messages.success(request, msg)
  221. UserAction.objects.log_delete(request.user, obj, msg)
  222. return_url = form.cleaned_data.get('return_url')
  223. if return_url is not None and is_safe_url(url=return_url, host=request.get_host()):
  224. return redirect(return_url)
  225. else:
  226. return redirect(self.get_return_url(request, obj))
  227. return render(request, self.template_name, {
  228. 'obj': obj,
  229. 'form': form,
  230. 'obj_type': self.model._meta.verbose_name,
  231. 'return_url': self.get_return_url(request, obj),
  232. })
  233. class BulkAddView(View):
  234. """
  235. Create new objects in bulk.
  236. form: Form class
  237. model_form: The ModelForm used to create individual objects
  238. template_name: The name of the template
  239. default_return_url: Name of the URL to which the user is redirected after creating the objects
  240. """
  241. form = None
  242. model_form = None
  243. template_name = None
  244. default_return_url = 'home'
  245. def get(self, request):
  246. form = self.form()
  247. return render(request, self.template_name, {
  248. 'obj_type': self.model_form._meta.model._meta.verbose_name,
  249. 'form': form,
  250. 'return_url': reverse(self.default_return_url),
  251. })
  252. def post(self, request):
  253. model = self.model_form._meta.model
  254. form = self.form(request.POST)
  255. if form.is_valid():
  256. # Read the pattern field and target from the form's pattern_map
  257. pattern_field, pattern_target = form.pattern_map
  258. pattern = form.cleaned_data[pattern_field]
  259. model_form_data = form.cleaned_data
  260. new_objs = []
  261. try:
  262. with transaction.atomic():
  263. # Validate and save each object individually
  264. for value in pattern:
  265. model_form_data[pattern_target] = value
  266. model_form = self.model_form(model_form_data)
  267. if model_form.is_valid():
  268. obj = model_form.save()
  269. new_objs.append(obj)
  270. else:
  271. for error in model_form.errors.as_data().values():
  272. form.add_error(None, error)
  273. # Abort the creation of all objects if errors exist
  274. if form.errors:
  275. raise ValidationError("Validation of one or more model forms failed.")
  276. except ValidationError:
  277. pass
  278. if not form.errors:
  279. msg = u"Added {} {}".format(len(new_objs), model._meta.verbose_name_plural)
  280. messages.success(request, msg)
  281. UserAction.objects.log_bulk_create(request.user, ContentType.objects.get_for_model(model), msg)
  282. if '_addanother' in request.POST:
  283. return redirect(request.path)
  284. return redirect(self.default_return_url)
  285. return render(request, self.template_name, {
  286. 'form': form,
  287. 'obj_type': model._meta.verbose_name,
  288. 'return_url': reverse(self.default_return_url),
  289. })
  290. class BulkImportView(View):
  291. """
  292. Import objects in bulk (CSV format).
  293. form: Form class
  294. table: The django-tables2 Table used to render the list of imported objects
  295. template_name: The name of the template
  296. default_return_url: The name of the URL to use for the cancel button
  297. """
  298. form = None
  299. table = None
  300. template_name = None
  301. default_return_url = None
  302. def get(self, request):
  303. return render(request, self.template_name, {
  304. 'form': self.form(),
  305. 'return_url': self.default_return_url,
  306. })
  307. def post(self, request):
  308. form = self.form(request.POST)
  309. if form.is_valid():
  310. new_objs = []
  311. try:
  312. with transaction.atomic():
  313. for obj in form.cleaned_data['csv']:
  314. self.save_obj(obj)
  315. new_objs.append(obj)
  316. obj_table = self.table(new_objs)
  317. if new_objs:
  318. msg = u'Imported {} {}'.format(len(new_objs), new_objs[0]._meta.verbose_name_plural)
  319. messages.success(request, msg)
  320. UserAction.objects.log_import(request.user, ContentType.objects.get_for_model(new_objs[0]), msg)
  321. return render(request, "import_success.html", {
  322. 'table': obj_table,
  323. 'return_url': self.default_return_url,
  324. })
  325. except IntegrityError as e:
  326. form.add_error('csv', "Record {}: {}".format(len(new_objs) + 1, e.__cause__))
  327. return render(request, self.template_name, {
  328. 'form': form,
  329. 'return_url': self.default_return_url,
  330. })
  331. def save_obj(self, obj):
  332. obj.save()
  333. class BulkEditView(View):
  334. """
  335. Edit objects in bulk.
  336. cls: The model of the objects being edited
  337. parent_cls: The model of the parent object (if any)
  338. filter: FilterSet to apply when deleting by QuerySet
  339. form: The form class used to edit objects in bulk
  340. template_name: The name of the template
  341. default_return_url: Name of the URL to which the user is redirected after editing the objects (can be overridden by
  342. POSTing return_url)
  343. """
  344. cls = None
  345. parent_cls = None
  346. filter = None
  347. form = None
  348. template_name = None
  349. default_return_url = 'home'
  350. def get(self):
  351. return redirect(self.default_return_url)
  352. def post(self, request, **kwargs):
  353. # Attempt to derive parent object if a parent class has been given
  354. if self.parent_cls:
  355. parent_obj = get_object_or_404(self.parent_cls, **kwargs)
  356. else:
  357. parent_obj = None
  358. # Determine URL to redirect users upon modification of objects
  359. posted_return_url = request.POST.get('return_url')
  360. if posted_return_url and is_safe_url(url=posted_return_url, host=request.get_host()):
  361. return_url = posted_return_url
  362. elif parent_obj:
  363. return_url = parent_obj.get_absolute_url()
  364. else:
  365. return_url = reverse(self.default_return_url)
  366. # Are we editing *all* objects in the queryset or just a selected subset?
  367. if request.POST.get('_all') and self.filter is not None:
  368. pk_list = [obj.pk for obj in self.filter(request.GET, self.cls.objects.only('pk')).qs]
  369. else:
  370. pk_list = [int(pk) for pk in request.POST.getlist('pk')]
  371. if '_apply' in request.POST:
  372. form = self.form(self.cls, request.POST)
  373. if form.is_valid():
  374. custom_fields = form.custom_fields if hasattr(form, 'custom_fields') else []
  375. standard_fields = [field for field in form.fields if field not in custom_fields and field != 'pk']
  376. # Update standard fields. If a field is listed in _nullify, delete its value.
  377. nullified_fields = request.POST.getlist('_nullify')
  378. fields_to_update = {}
  379. for field in standard_fields:
  380. if field in form.nullable_fields and field in nullified_fields:
  381. if isinstance(form.fields[field], CharField):
  382. fields_to_update[field] = ''
  383. else:
  384. fields_to_update[field] = None
  385. elif form.cleaned_data[field] not in (None, ''):
  386. fields_to_update[field] = form.cleaned_data[field]
  387. updated_count = self.cls.objects.filter(pk__in=pk_list).update(**fields_to_update)
  388. # Update custom fields for objects
  389. if custom_fields:
  390. objs_updated = self.update_custom_fields(pk_list, form, custom_fields, nullified_fields)
  391. if objs_updated and not updated_count:
  392. updated_count = objs_updated
  393. if updated_count:
  394. msg = u'Updated {} {}'.format(updated_count, self.cls._meta.verbose_name_plural)
  395. messages.success(self.request, msg)
  396. UserAction.objects.log_bulk_edit(request.user, ContentType.objects.get_for_model(self.cls), msg)
  397. return redirect(return_url)
  398. else:
  399. initial_data = request.POST.copy()
  400. initial_data['pk'] = pk_list
  401. form = self.form(self.cls, initial=initial_data)
  402. selected_objects = self.cls.objects.filter(pk__in=pk_list)
  403. if not selected_objects:
  404. messages.warning(request, u"No {} were selected.".format(self.cls._meta.verbose_name_plural))
  405. return redirect(return_url)
  406. return render(request, self.template_name, {
  407. 'form': form,
  408. 'selected_objects': selected_objects,
  409. 'return_url': return_url,
  410. })
  411. def update_custom_fields(self, pk_list, form, fields, nullified_fields):
  412. obj_type = ContentType.objects.get_for_model(self.cls)
  413. objs_updated = False
  414. for name in fields:
  415. field = form.fields[name].model
  416. # Setting the field to null
  417. if name in form.nullable_fields and name in nullified_fields:
  418. # Delete all CustomFieldValues for instances of this field belonging to the selected objects.
  419. CustomFieldValue.objects.filter(field=field, obj_type=obj_type, obj_id__in=pk_list).delete()
  420. objs_updated = True
  421. # Updating the value of the field
  422. elif form.cleaned_data[name] not in [None, u'']:
  423. # Check for zero value (bulk editing)
  424. if isinstance(form.fields[name], TypedChoiceField) and form.cleaned_data[name] == 0:
  425. serialized_value = field.serialize_value(None)
  426. else:
  427. serialized_value = field.serialize_value(form.cleaned_data[name])
  428. # Gather any pre-existing CustomFieldValues for the objects being edited.
  429. existing_cfvs = CustomFieldValue.objects.filter(field=field, obj_type=obj_type, obj_id__in=pk_list)
  430. # Determine which objects have an existing CFV to update and which need a new CFV created.
  431. update_list = [cfv['obj_id'] for cfv in existing_cfvs.values()]
  432. create_list = list(set(pk_list) - set(update_list))
  433. # Creating/updating CFVs
  434. if serialized_value:
  435. existing_cfvs.update(serialized_value=serialized_value)
  436. CustomFieldValue.objects.bulk_create([
  437. CustomFieldValue(field=field, obj_type=obj_type, obj_id=pk, serialized_value=serialized_value)
  438. for pk in create_list
  439. ])
  440. # Deleting CFVs
  441. else:
  442. existing_cfvs.delete()
  443. objs_updated = True
  444. return len(pk_list) if objs_updated else 0
  445. class BulkDeleteView(View):
  446. """
  447. Delete objects in bulk.
  448. cls: The model of the objects being deleted
  449. parent_cls: The model of the parent object (if any)
  450. filter: FilterSet to apply when deleting by QuerySet
  451. form: The form class used to delete objects in bulk
  452. template_name: The name of the template
  453. default_return_url: Name of the URL to which the user is redirected after deleting the objects (can be overriden by
  454. POSTing return_url)
  455. """
  456. cls = None
  457. parent_cls = None
  458. filter = None
  459. form = None
  460. template_name = 'utilities/confirm_bulk_delete.html'
  461. default_return_url = 'home'
  462. def post(self, request, **kwargs):
  463. # Attempt to derive parent object if a parent class has been given
  464. if self.parent_cls:
  465. parent_obj = get_object_or_404(self.parent_cls, **kwargs)
  466. else:
  467. parent_obj = None
  468. # Determine URL to redirect users upon deletion of objects
  469. posted_return_url = request.POST.get('return_url')
  470. if posted_return_url and is_safe_url(url=posted_return_url, host=request.get_host()):
  471. return_url = posted_return_url
  472. elif parent_obj:
  473. return_url = parent_obj.get_absolute_url()
  474. else:
  475. return_url = reverse(self.default_return_url)
  476. # Are we deleting *all* objects in the queryset or just a selected subset?
  477. if request.POST.get('_all') and self.filter is not None:
  478. pk_list = [obj.pk for obj in self.filter(request.GET, self.cls.objects.only('pk')).qs]
  479. else:
  480. pk_list = [int(pk) for pk in request.POST.getlist('pk')]
  481. form_cls = self.get_form()
  482. if '_confirm' in request.POST:
  483. form = form_cls(request.POST)
  484. if form.is_valid():
  485. # Delete objects
  486. queryset = self.cls.objects.filter(pk__in=pk_list)
  487. try:
  488. deleted_count = queryset.delete()[1][self.cls._meta.label]
  489. except ProtectedError as e:
  490. handle_protectederror(list(queryset), request, e)
  491. return redirect(return_url)
  492. msg = u'Deleted {} {}'.format(deleted_count, self.cls._meta.verbose_name_plural)
  493. messages.success(request, msg)
  494. UserAction.objects.log_bulk_delete(request.user, ContentType.objects.get_for_model(self.cls), msg)
  495. return redirect(return_url)
  496. else:
  497. form = form_cls(initial={'pk': pk_list, 'return_url': return_url})
  498. selected_objects = self.cls.objects.filter(pk__in=pk_list)
  499. if not selected_objects:
  500. messages.warning(request, u"No {} were selected for deletion.".format(self.cls._meta.verbose_name_plural))
  501. return redirect(return_url)
  502. return render(request, self.template_name, {
  503. 'form': form,
  504. 'parent_obj': parent_obj,
  505. 'obj_type_plural': self.cls._meta.verbose_name_plural,
  506. 'selected_objects': selected_objects,
  507. 'return_url': return_url,
  508. })
  509. def get_form(self):
  510. """
  511. Provide a standard bulk delete form if none has been specified for the view
  512. """
  513. class BulkDeleteForm(ConfirmationForm):
  514. pk = ModelMultipleChoiceField(queryset=self.cls.objects.all(), widget=MultipleHiddenInput)
  515. if self.form:
  516. return self.form
  517. return BulkDeleteForm