views.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. from collections import OrderedDict
  2. from django_tables2 import RequestConfig
  3. from django.contrib import messages
  4. from django.contrib.contenttypes.models import ContentType
  5. from django.core.exceptions import ImproperlyConfigured
  6. from django.core.urlresolvers import reverse
  7. from django.db import transaction, IntegrityError
  8. from django.db.models import ProtectedError
  9. from django.forms import ModelMultipleChoiceField, MultipleHiddenInput, TypedChoiceField
  10. from django.http import HttpResponse, HttpResponseRedirect
  11. from django.shortcuts import get_object_or_404, redirect, render
  12. from django.template import TemplateSyntaxError
  13. from django.utils.http import is_safe_url
  14. from django.views.generic import View
  15. from extras.forms import CustomFieldForm
  16. from extras.models import CustomField, CustomFieldValue, ExportTemplate, UserAction
  17. from .error_handlers import handle_protectederror
  18. from .forms import ConfirmationForm
  19. from .paginator import EnhancedPaginator
  20. class annotate_custom_fields:
  21. def __init__(self, queryset, custom_fields):
  22. self.queryset = queryset
  23. self.custom_fields = custom_fields
  24. def __iter__(self):
  25. for obj in self.queryset:
  26. values_dict = {cfv.field_id: cfv.value for cfv in obj.custom_field_values.all()}
  27. obj.custom_fields = OrderedDict([(field, values_dict.get(field.pk)) for field in self.custom_fields])
  28. yield obj
  29. class ObjectListView(View):
  30. queryset = None
  31. filter = None
  32. filter_form = None
  33. table = None
  34. edit_permissions = []
  35. template_name = None
  36. redirect_on_single_result = True
  37. def get(self, request, *args, **kwargs):
  38. model = self.queryset.model
  39. object_ct = ContentType.objects.get_for_model(model)
  40. if self.filter:
  41. self.queryset = self.filter(request.GET, self.queryset).qs
  42. # If this type of object has one or more custom fields, prefetch any relevant custom field values
  43. custom_fields = CustomField.objects.filter(obj_type=ContentType.objects.get_for_model(model))\
  44. .prefetch_related('choices')
  45. if custom_fields:
  46. self.queryset = self.queryset.prefetch_related('custom_field_values')
  47. # Check for export template rendering
  48. if request.GET.get('export'):
  49. et = get_object_or_404(ExportTemplate, content_type=object_ct, name=request.GET.get('export'))
  50. queryset = annotate_custom_fields(self.queryset, custom_fields) if custom_fields else self.queryset
  51. try:
  52. response = et.to_response(context_dict={'queryset': queryset},
  53. filename='netbox_{}'.format(model._meta.verbose_name_plural))
  54. return response
  55. except TemplateSyntaxError:
  56. messages.error(request, "There was an error rendering the selected export template ({})."
  57. .format(et.name))
  58. # Fall back to built-in CSV export
  59. elif 'export' in request.GET and hasattr(model, 'to_csv'):
  60. output = '\n'.join([obj.to_csv() for obj in self.queryset])
  61. response = HttpResponse(
  62. output,
  63. content_type='text/csv'
  64. )
  65. response['Content-Disposition'] = 'attachment; filename="netbox_{}.csv"'\
  66. .format(self.queryset.model._meta.verbose_name_plural)
  67. return response
  68. # Attempt to redirect automatically if the search query returns a single result
  69. if self.redirect_on_single_result and self.queryset.count() == 1 and request.GET:
  70. try:
  71. return HttpResponseRedirect(self.queryset[0].get_absolute_url())
  72. except AttributeError:
  73. pass
  74. # Provide a hook to tweak the queryset based on the request immediately prior to rendering the object list
  75. self.queryset = self.alter_queryset(request)
  76. # Construct the table based on the user's permissions
  77. table = self.table(self.queryset)
  78. table.model = model
  79. if 'pk' in table.base_columns and any([request.user.has_perm(perm) for perm in self.edit_permissions]):
  80. table.base_columns['pk'].visible = True
  81. RequestConfig(request, paginate={'klass': EnhancedPaginator}).configure(table)
  82. context = {
  83. 'table': table,
  84. 'filter_form': self.filter_form(request.GET, label_suffix='') if self.filter_form else None,
  85. 'export_templates': ExportTemplate.objects.filter(content_type=object_ct),
  86. }
  87. context.update(self.extra_context())
  88. return render(request, self.template_name, context)
  89. def alter_queryset(self, request):
  90. # .all() is necessary to avoid caching queries
  91. return self.queryset.all()
  92. def extra_context(self):
  93. return {}
  94. class ObjectEditView(View):
  95. model = None
  96. form_class = None
  97. fields_initial = []
  98. template_name = 'utilities/obj_edit.html'
  99. success_url = None
  100. cancel_url = None
  101. def get_object(self, kwargs):
  102. # Look up object by slug if one has been provided. Otherwise, use PK.
  103. if 'slug' in kwargs:
  104. return get_object_or_404(self.model, slug=kwargs['slug'])
  105. else:
  106. return get_object_or_404(self.model, pk=kwargs['pk'])
  107. def get(self, request, *args, **kwargs):
  108. if kwargs:
  109. obj = self.get_object(kwargs)
  110. form = self.form_class(instance=obj)
  111. else:
  112. obj = None
  113. form = self.form_class(initial={k: request.GET.get(k) for k in self.fields_initial})
  114. return render(request, self.template_name, {
  115. 'obj': obj,
  116. 'obj_type': self.model._meta.verbose_name,
  117. 'form': form,
  118. 'cancel_url': obj.get_absolute_url() if hasattr(obj, 'get_absolute_url') else reverse(self.cancel_url),
  119. })
  120. def post(self, request, *args, **kwargs):
  121. # Validate object if editing an existing object
  122. obj = self.get_object(kwargs) if kwargs else None
  123. form = self.form_class(request.POST, instance=obj)
  124. if form.is_valid():
  125. obj = form.save(commit=False)
  126. obj_created = not obj.pk
  127. obj.save()
  128. if isinstance(form, CustomFieldForm):
  129. form.save_custom_fields()
  130. msg = u'Created ' if obj_created else u'Modified '
  131. msg += self.model._meta.verbose_name
  132. if hasattr(obj, 'get_absolute_url'):
  133. msg = u'{} <a href="{}">{}</a>'.format(msg, obj.get_absolute_url(), obj)
  134. else:
  135. msg = u'{} {}'.format(msg, obj)
  136. messages.success(request, msg)
  137. if obj_created:
  138. UserAction.objects.log_create(request.user, obj, msg)
  139. else:
  140. UserAction.objects.log_edit(request.user, obj, msg)
  141. if '_addanother' in request.POST:
  142. return redirect(request.path)
  143. elif self.success_url:
  144. return redirect(self.success_url)
  145. else:
  146. return redirect(obj.get_absolute_url())
  147. return render(request, self.template_name, {
  148. 'obj': obj,
  149. 'obj_type': self.model._meta.verbose_name,
  150. 'form': form,
  151. 'cancel_url': obj.get_absolute_url() if hasattr(obj, 'get_absolute_url') else reverse(self.cancel_url),
  152. })
  153. class ObjectDeleteView(View):
  154. model = None
  155. template_name = 'utilities/obj_delete.html'
  156. redirect_url = None
  157. def get_object(self, kwargs):
  158. # Look up object by slug if one has been provided. Otherwise, use PK.
  159. if 'slug' in kwargs:
  160. return get_object_or_404(self.model, slug=kwargs['slug'])
  161. else:
  162. return get_object_or_404(self.model, pk=kwargs['pk'])
  163. def get(self, request, *args, **kwargs):
  164. obj = self.get_object(kwargs)
  165. form = ConfirmationForm()
  166. return render(request, self.template_name, {
  167. 'obj': obj,
  168. 'form': form,
  169. 'obj_type': self.model._meta.verbose_name,
  170. 'cancel_url': obj.get_absolute_url(),
  171. })
  172. def post(self, request, *args, **kwargs):
  173. obj = self.get_object(kwargs)
  174. form = ConfirmationForm(request.POST)
  175. if form.is_valid():
  176. try:
  177. obj.delete()
  178. msg = u'Deleted {} {}'.format(self.model._meta.verbose_name, obj)
  179. messages.success(request, msg)
  180. UserAction.objects.log_delete(request.user, obj, msg)
  181. return redirect(self.redirect_url)
  182. except ProtectedError, e:
  183. handle_protectederror(obj, request, e)
  184. return redirect(obj.get_absolute_url())
  185. return render(request, self.template_name, {
  186. 'obj': obj,
  187. 'form': form,
  188. 'obj_type': self.model._meta.verbose_name,
  189. 'cancel_url': obj.get_absolute_url(),
  190. })
  191. class BulkImportView(View):
  192. form = None
  193. table = None
  194. template_name = None
  195. obj_list_url = None
  196. def get(self, request, *args, **kwargs):
  197. return render(request, self.template_name, {
  198. 'form': self.form(),
  199. 'obj_list_url': self.obj_list_url,
  200. })
  201. def post(self, request, *args, **kwargs):
  202. form = self.form(request.POST)
  203. if form.is_valid():
  204. new_objs = []
  205. try:
  206. with transaction.atomic():
  207. for obj in form.cleaned_data['csv']:
  208. self.save_obj(obj)
  209. new_objs.append(obj)
  210. obj_table = self.table(new_objs)
  211. if new_objs:
  212. msg = u'Imported {} {}'.format(len(new_objs), new_objs[0]._meta.verbose_name_plural)
  213. messages.success(request, msg)
  214. UserAction.objects.log_import(request.user, ContentType.objects.get_for_model(new_objs[0]), msg)
  215. return render(request, "import_success.html", {
  216. 'table': obj_table,
  217. })
  218. except IntegrityError as e:
  219. form.add_error('csv', "Record {}: {}".format(len(new_objs) + 1, e.__cause__))
  220. return render(request, self.template_name, {
  221. 'form': form,
  222. 'obj_list_url': self.obj_list_url,
  223. })
  224. def save_obj(self, obj):
  225. obj.save()
  226. class BulkEditView(View):
  227. cls = None
  228. form = None
  229. template_name = None
  230. default_redirect_url = None
  231. def get(self, request, *args, **kwargs):
  232. return redirect(self.default_redirect_url)
  233. def post(self, request, *args, **kwargs):
  234. posted_redirect_url = request.POST.get('redirect_url')
  235. if posted_redirect_url and is_safe_url(url=posted_redirect_url, host=request.get_host()):
  236. redirect_url = posted_redirect_url
  237. else:
  238. redirect_url = reverse(self.default_redirect_url)
  239. if request.POST.get('_all'):
  240. pk_list = [int(pk) for pk in request.POST.get('pk_all').split(',') if pk]
  241. else:
  242. pk_list = [int(pk) for pk in request.POST.getlist('pk')]
  243. if '_apply' in request.POST:
  244. if hasattr(self.form, 'custom_fields'):
  245. form = self.form(self.cls, request.POST)
  246. else:
  247. form = self.form(request.POST)
  248. if form.is_valid():
  249. custom_fields = form.custom_fields if hasattr(form, 'custom_fields') else []
  250. standard_fields = [field for field in form.fields if field not in custom_fields and field != 'pk']
  251. # Update objects
  252. updated_count = self.update_objects(pk_list, form, standard_fields)
  253. # Update custom fields for objects
  254. if custom_fields:
  255. objs_updated = self.update_custom_fields(pk_list, form, custom_fields)
  256. if objs_updated and not updated_count:
  257. updated_count = objs_updated
  258. if updated_count:
  259. msg = u'Updated {} {}'.format(updated_count, self.cls._meta.verbose_name_plural)
  260. messages.success(self.request, msg)
  261. UserAction.objects.log_bulk_edit(request.user, ContentType.objects.get_for_model(self.cls), msg)
  262. return redirect(redirect_url)
  263. else:
  264. if hasattr(self.form, 'custom_fields'):
  265. form = self.form(self.cls, initial={'pk': pk_list})
  266. else:
  267. form = self.form(initial={'pk': pk_list})
  268. selected_objects = self.cls.objects.filter(pk__in=pk_list)
  269. if not selected_objects:
  270. messages.warning(request, "No {} were selected.".format(self.cls._meta.verbose_name_plural))
  271. return redirect(redirect_url)
  272. return render(request, self.template_name, {
  273. 'form': form,
  274. 'selected_objects': selected_objects,
  275. 'cancel_url': redirect_url,
  276. })
  277. def update_objects(self, pk_list, form, fields):
  278. fields_to_update = {}
  279. for name in fields:
  280. # Check for zero value (bulk editing)
  281. if isinstance(form.fields[name], TypedChoiceField) and form.cleaned_data[name] == 0:
  282. fields_to_update[name] = None
  283. elif form.cleaned_data[name]:
  284. fields_to_update[name] = form.cleaned_data[name]
  285. return self.cls.objects.filter(pk__in=pk_list).update(**fields_to_update)
  286. def update_custom_fields(self, pk_list, form, fields):
  287. obj_type = ContentType.objects.get_for_model(self.cls)
  288. objs_updated = False
  289. for name in fields:
  290. if form.cleaned_data[name] not in [None, u'']:
  291. field = form.fields[name].model
  292. # Check for zero value (bulk editing)
  293. if isinstance(form.fields[name], TypedChoiceField) and form.cleaned_data[name] == 0:
  294. serialized_value = field.serialize_value(None)
  295. else:
  296. serialized_value = field.serialize_value(form.cleaned_data[name])
  297. # Gather any pre-existing CustomFieldValues for the objects being edited.
  298. existing_cfvs = CustomFieldValue.objects.filter(field=field, obj_type=obj_type, obj_id__in=pk_list)
  299. # Determine which objects have an existing CFV to update and which need a new CFV created.
  300. update_list = [cfv['obj_id'] for cfv in existing_cfvs.values()]
  301. create_list = list(set(pk_list) - set(update_list))
  302. # Creating/updating CFVs
  303. if serialized_value:
  304. existing_cfvs.update(serialized_value=serialized_value)
  305. CustomFieldValue.objects.bulk_create([
  306. CustomFieldValue(field=field, obj_type=obj_type, obj_id=pk, serialized_value=serialized_value)
  307. for pk in create_list
  308. ])
  309. # Deleting CFVs
  310. else:
  311. existing_cfvs.delete()
  312. objs_updated = True
  313. return len(pk_list) if objs_updated else 0
  314. class BulkDeleteView(View):
  315. cls = None
  316. parent_cls = None
  317. form = None
  318. template_name = 'utilities/confirm_bulk_delete.html'
  319. default_redirect_url = None
  320. def post(self, request, *args, **kwargs):
  321. # Attempt to derive parent object if a parent class has been given
  322. if self.parent_cls:
  323. parent_obj = get_object_or_404(self.parent_cls, **kwargs)
  324. else:
  325. parent_obj = None
  326. # Determine URL to redirect users upon deletion of objects
  327. posted_redirect_url = request.POST.get('redirect_url')
  328. if posted_redirect_url and is_safe_url(url=posted_redirect_url, host=request.get_host()):
  329. redirect_url = posted_redirect_url
  330. elif parent_obj:
  331. redirect_url = parent_obj.get_absolute_url()
  332. elif self.default_redirect_url:
  333. redirect_url = reverse(self.default_redirect_url)
  334. else:
  335. raise ImproperlyConfigured('No redirect URL has been provided.')
  336. # Are we deleting *all* objects in the queryset or just a selected subset?
  337. if request.POST.get('_all'):
  338. pk_list = [x for x in request.POST.get('pk_all').split(',') if x]
  339. else:
  340. pk_list = request.POST.getlist('pk')
  341. form_cls = self.get_form()
  342. if '_confirm' in request.POST:
  343. form = form_cls(request.POST)
  344. if form.is_valid():
  345. # Delete objects
  346. queryset = self.cls.objects.filter(pk__in=pk_list)
  347. try:
  348. deleted_count = queryset.delete()[1][self.cls._meta.label]
  349. except ProtectedError, e:
  350. handle_protectederror(list(queryset), request, e)
  351. return redirect(redirect_url)
  352. msg = u'Deleted {} {}'.format(deleted_count, self.cls._meta.verbose_name_plural)
  353. messages.success(request, msg)
  354. UserAction.objects.log_bulk_delete(request.user, ContentType.objects.get_for_model(self.cls), msg)
  355. return redirect(redirect_url)
  356. else:
  357. form = form_cls(initial={'pk': pk_list})
  358. selected_objects = self.cls.objects.filter(pk__in=pk_list)
  359. if not selected_objects:
  360. messages.warning(request, "No {} were selected for deletion.".format(self.cls._meta.verbose_name_plural))
  361. return redirect(redirect_url)
  362. return render(request, self.template_name, {
  363. 'form': form,
  364. 'parent_obj': parent_obj,
  365. 'obj_type_plural': self.cls._meta.verbose_name_plural,
  366. 'selected_objects': selected_objects,
  367. 'cancel_url': redirect_url,
  368. })
  369. def get_form(self):
  370. """Provide a standard bulk delete form if none has been specified for the view"""
  371. class BulkDeleteForm(ConfirmationForm):
  372. pk = ModelMultipleChoiceField(queryset=self.cls.objects.all(), widget=MultipleHiddenInput)
  373. if self.form:
  374. return self.form
  375. return BulkDeleteForm