|
- from django.contrib import admin
- from django.db import models
- from django.db.models import Q
- from django.forms import ModelForm, BaseInlineFormSet
- from django.utils import timezone
- from django.urls import reverse, path
- from django.utils.html import format_html
- from django.conf.urls import url
- from django.template.response import TemplateResponse
- from django.core.serializers import serialize
- from django.http import HttpResponse
- from django.db.models.functions import Cast
- from django.contrib.postgres.aggregates import StringAgg
- from django.db import connection, transaction
- from django.core.cache import cache
- from django.contrib.humanize.templatetags.humanize import naturaltime
- from django.contrib.contenttypes.models import ContentType
- from django.http import HttpResponseRedirect
- from django.core.exceptions import ValidationError
- from djgeojson.views import GeoJSONLayerView
- from urllib.parse import urlencode
- from functools import partial, update_wrapper
- from datetime import timedelta
- from ipaddress import IPv4Address
- from djadhere.utils import get_active_filter
- from adhesions.models import Adhesion
- from banking.models import PaymentUpdate
- from .models import Service, ServiceType, IPPrefix, IPResource, IPResourceState, \
- ServiceAllocation, Antenna, AntennaAllocation, Allocation, \
- Route, Tunnel, Switch, Port
- from .utils.notifications import notify_allocation
- from .utils.ip_conversion import ipv4_to_ipv6
- from .forms import AntennaForm, StopAllocationForm
- ### Filters
- class ResourceInUseFilter(admin.SimpleListFilter):
- title = 'disponibilité'
- parameter_name = 'available'
- def lookups(self, request, model_admin):
- return (
- (1, 'Disponible'),
- (0, 'Non disponible'),
- )
- def queryset(self, request, queryset):
- available_filter = Q(reserved=False, in_use=False)
- if self.value() == '0': # non disponible
- return queryset.exclude(available_filter)
- if self.value() == '1': # disponible
- return queryset.filter(available_filter)
- class ResourcePingFilter(admin.SimpleListFilter):
- title = 'ping'
- parameter_name = 'ping'
- def lookups(self, request, model_admin):
- return (
- ('up', 'UP'),
- ('down', 'DOWN'),
- ('down-since', 'DOWN depuis…'),
- ('never-up', 'Jamais vu UP'),
- )
- def queryset(self, request, queryset):
- if self.value() == 'up':
- return queryset.filter(last_state__state=IPResourceState.STATE_UP)
- if self.value() == 'down':
- return queryset.exclude(last_state__state=IPResourceState.STATE_UP) # DOWN + UNKNOWN
- if self.value() == 'down-since':
- queryset = queryset.exclude(last_state__state=IPResourceState.STATE_UP)
- return queryset.filter(last_time_up__isnull=False)
- if self.value() == 'never-up':
- queryset = queryset.exclude(last_state__state=IPResourceState.STATE_UP) # DOWN + UNKWON
- return queryset.filter(last_time_up__isnull=True)
- class ActiveServiceFilter(admin.SimpleListFilter):
- title = 'actif'
- parameter_name = 'active'
- def lookups(self, request, model_admin):
- return (
- (1, 'Actif'),
- (0, 'Inactif'),
- )
- def queryset(self, request, queryset):
- if self.value() == '0': # inactif
- return queryset.exclude(get_active_filter('allocation'))
- if self.value() == '1': # actif
- return queryset.filter(get_active_filter('allocation'))
- class RouteFilter(admin.SimpleListFilter):
- title = 'route'
- parameter_name = 'route'
- def lookups(self, request, model_admin):
- return ServiceAllocation.objects.filter(active=True).values_list('route__pk', 'route__name').distinct()
- def queryset(self, request, queryset):
- try:
- route = int(self.value())
- except (TypeError, ValueError):
- pass
- else:
- allocations = ServiceAllocation.objects.filter(active=True, route__pk=route).values_list('pk', flat=True)
- queryset = queryset.filter(service_allocation__in=allocations)
- return queryset
- class AntennaPrefixFilter(admin.SimpleListFilter):
- title = 'préfix'
- parameter_name = 'prefix'
- def lookups(self, request, model_admin):
- resources = AntennaAllocation.objects.filter(active=True).values_list('resource__pk', flat=True)
- prefixes = IPPrefix.objects.filter(ipresource__in=resources).values_list('pk', 'prefix').distinct()
- return prefixes
- def queryset(self, request, queryset):
- try:
- prefix = int(self.value())
- except (TypeError, ValueError):
- pass
- else:
- allocations = AntennaAllocation.objects.filter(active=True, resource__prefixes__pk=prefix).values_list('pk', flat=True)
- queryset = queryset.filter(allocation__in=allocations)
- return queryset
- class AntennaPositionFilter(admin.SimpleListFilter):
- title = 'géolocalisation'
- parameter_name = 'position'
- def lookups(self, request, model_admin):
- return (
- ('1', 'Connue'),
- ('0', 'Inconnue'),
- )
- def queryset(self, request, queryset):
- query = Q(position__isnull=True)
- if self.value() == '0':
- return queryset.filter(query)
- if self.value() == '1':
- return queryset.exclude(query)
- return queryset
- class ActiveTunnelFilter(admin.SimpleListFilter):
- title = 'status'
- parameter_name = 'active'
- def lookups(self, request, model_admin):
- return (
- ('1', 'Actif'),
- ('0', 'Désactivé'),
- )
- def queryset(self, request, queryset):
- query = Q(ended__isnull=True)
- if self.value() == '0':
- return queryset.exclude(query)
- if self.value() == '1':
- return queryset.filter(query)
- return queryset
- ### Inlines
- class AllocationInlineFormSet(BaseInlineFormSet):
- def save_new(self, form, commit=True):
- obj = super().save_new(form, commit)
- if type(obj) == ServiceAllocation:
- notify_allocation(self.request, obj)
- return obj
- def save_existing(self, form, instance, commit=True):
- old = type(instance).objects.get(pk=instance.pk)
- if type(instance) == ServiceAllocation:
- notify_allocation(self.request, instance, old)
- return super().save_existing(form, instance, commit)
- class AllocationInline(admin.TabularInline):
- formset = AllocationInlineFormSet
- extra = 0
- show_change_link = True
- ordering = ('-start',)
- def resource_link(self, obj):
- url = reverse('admin:services_ipresource_change', args=[obj.resource.pk])
- return format_html('<a href="{}">{}</a>', url, str(obj.resource))
- resource_link.short_description = 'IP'
- def get_queryset(self, request):
- qs = super().get_queryset(request)
- qs = qs.select_related('resource')
- return qs
- def get_formset(self, request, obj=None, **kwargs):
- formset = super().get_formset(request, obj, **kwargs)
- formset.request = request
- return formset
- def has_delete_permission(self, request, obj=None):
- return False
- class NewAllocationMixin:
- verbose_name_plural = 'Nouvelle allocation'
- max_num = 1
- def get_queryset(self, request):
- return super().get_queryset(request).model.objects.none()
- class ActiveAllocationMixin:
- verbose_name_plural = 'Allocations actives'
- max_num = 0
- def get_queryset(self, request):
- return super().get_queryset(request).filter(get_active_filter())
- class InactiveAllocationMixin:
- verbose_name_plural = 'Anciennes allocations'
- max_num = 0
- def get_queryset(self, request):
- return super().get_queryset(request).exclude(get_active_filter())
- class ServiceAllocationMixin:
- model = ServiceAllocation
- fields = ('id', 'service', 'resource', 'route', 'start', 'end')
- raw_id_fields = ('resource',)
- autocomplete_fields = ('service',)
- def service_link(self, obj):
- url = reverse('admin:services_service_change', args=[obj.service.pk])
- return format_html('<a href="{}">{}</a>', url, str(obj.service))
- service_link.short_description = 'Service'
- def get_queryset(self, request):
- qs = super().get_queryset(request)
- qs = qs.select_related('route')
- return qs
- #class AntennaAllocationMixin:
- # model = AntennaAllocation
- # fields = ('id', 'antenna', 'resource', 'start', 'end')
- # raw_id_fields = ('resource',)
- # autocomplete_fields = ('antenna',)
- #
- # def get_queryset(self, request):
- # qs = super().get_queryset(request)
- # qs = qs.select_related('antenna')
- # return qs
- class NewServiceAllocationInline(ServiceAllocationMixin, NewAllocationMixin, AllocationInline):
- fields = ('id', 'service', 'resource', 'route',)
- class ActiveServiceAllocationInline(ServiceAllocationMixin, ActiveAllocationMixin, AllocationInline):
- fields = ('id', 'service_link', 'resource_link', 'route', 'start', 'stop',)
- readonly_fields = ('service_link', 'start', 'resource_link', 'stop',)
- def stop(self, obj):
- return format_html('<a href="{}" class="deletelink">Terminer</a>', reverse('admin:stop-allocation', kwargs={'resource': obj.resource.ip}))
- stop.short_description = 'Terminer l’allocation'
- class InactiveServiceAllocationInline(ServiceAllocationMixin, InactiveAllocationMixin, AllocationInline):
- fields = ('id', 'service_link', 'resource_link', 'route', 'start', 'end')
- readonly_fields = ('service_link', 'resource_link', 'route', 'start', 'end')
- #class ActiveAntennaAllocationInline(AntennaAllocationMixin, ActiveAllocationMixin, AllocationInline):
- # pass
- #class InactiveAntennaAllocationInline(AntennaAllocationMixin, InactiveAllocationMixin, AllocationInline):
- # pass
- class IPResourceStateInline(admin.TabularInline):
- model = IPResourceState
- verbose_name_plural = 'Historique des derniers changements d’état'
- fields = ['date']
- readonly_fields = ['date']
- ordering = ['-date']
- def has_add_permission(self, request):
- return False
- def has_delete_permission(self, request, obj=None):
- return False
- class PortInline(admin.TabularInline):
- model = Port
- max_num = 0
- def has_add_permission(self, request):
- return False
- def has_delete_permission(self, request, obj=None):
- return False
- class SwitchPortInline(PortInline):
- fields = ('port', 'up', 'reserved', 'service', 'notes',)
- readonly_fields = ('port', 'up',)
- autocomplete_fields = ('service',)
- def get_queryset(self, request):
- qs = super().get_queryset(request)
- qs = qs.select_related('switch', 'service', 'service__service_type')
- return qs
- class ServicePortInline(PortInline):
- fields = ('switch', 'port', 'up', 'notes',)
- readonly_fields = ('switch', 'port', 'up',)
- ### Forms
- class ServiceForm(ModelForm):
- def clean_adhesion(self):
- if hasattr(self.instance, 'adhesion') \
- and self.instance.adhesion.pk != self.cleaned_data['adhesion'].pk \
- and not self.instance.is_active():
- raise ValidationError('Il n’est pas possible de ré-affecter à un autre adhérent un service inactif (i.e. sans allocations actives).')
- return self.cleaned_data['adhesion']
- ### ModelAdmin
- class ServiceAdmin(admin.ModelAdmin):
- list_display = ('id', 'get_adhesion_link', 'get_adherent_link', 'service_type', 'label', 'is_active',)
- list_select_related = ('adhesion', 'adhesion__user', 'adhesion__user__profile', 'adhesion__corporation', 'service_type',)
- list_filter = (
- ActiveServiceFilter,
- 'loan_equipment',
- ('service_type', admin.RelatedOnlyFieldListFilter),
- )
- search_fields = ('=id', 'service_type__name', 'label', 'notes',)
- fields = ('adhesion', 'service_type', 'label', 'notes', 'loan_equipment', 'get_contribution_link', 'is_active',)
- readonly_fields = ('get_contribution_link', 'is_active',)
- raw_id_fields = ('adhesion',)
- form = ServiceForm
- def save_model(self, request, srv, form, change):
- if srv.pk and 'adhesion' in form.changed_data:
- with transaction.atomic():
- old_srv = Service.objects.get(pk=srv.pk)
- adhesion = srv.adhesion
- srv.adhesion = old_srv.adhesion
- label = srv.label
- srv.label = '%s (transféré à ADT%d le %s)' % (srv.label, adhesion.pk, timezone.now().strftime('%d/%m/%Y'))
- srv.save()
- new_srv = Service.objects.create(adhesion=adhesion, service_type=srv.service_type, label=label,
- notes=srv.notes, loan_equipment=srv.loan_equipment)
- for allocation in srv.active_allocations:
- allocation.end = timezone.now()
- allocation.save()
- ServiceAllocation.objects.create(resource=allocation.resource, service=new_srv, route=allocation.route)
- else:
- super().save_model(request, srv, form, change)
- def get_queryset(self, request):
- qs = super().get_queryset(request)
- qs = qs.prefetch_related('allocations',)
- return qs
- get_adhesion_link = lambda self, service: service.adhesion.get_adhesion_link()
- get_adhesion_link.short_description = Adhesion.get_adhesion_link.short_description
- get_adherent_link = lambda self, service: service.adhesion.get_adherent_link()
- get_adherent_link.short_description = Adhesion.get_adherent_link.short_description
- def get_contribution_link(self, obj):
- return format_html(u'<a href="{}">{}</a>', obj.contribution.get_absolute_url(), obj.contribution.get_current_payment_display())
- get_contribution_link.short_description = 'Contribution financière'
- def get_inline_instances(self, request, obj=None):
- inlines = []
- if obj and obj.ports.exists():
- inlines += [ServicePortInline]
- inlines += [NewServiceAllocationInline]
- if obj and obj.active_allocations.exists():
- inlines += [ActiveServiceAllocationInline]
- if obj and obj.inactive_allocations.exists():
- inlines += [InactiveServiceAllocationInline]
- return [inline(self.model, self.admin_site) for inline in inlines]
- def get_actions(self, request):
- actions = super().get_actions(request)
- if 'delete_selected' in actions:
- del actions['delete_selected']
- return actions
- def has_delete_permission(self, request, obj=None):
- if not obj:
- return False
- one_year_ago = timezone.now() - timedelta(days=365)
- contribution = obj.contribution.updates.filter(validated=True).first()
- # s’il y avait un paiement actif il y a moins d’un an
- if not contribution or contribution.payment_method != PaymentUpdate.STOP or contribution.start > one_year_ago:
- return False
- # s’il y avait une allocation active il y a moins d’un an
- if any(map(lambda a: a.end is None or a.end > one_year_ago, obj.allocations.all())):
- return False
- return True
- class IPPrefixAdmin(admin.ModelAdmin):
- def has_delete_permission(self, request, obj=None):
- # Interdiction de supprimer le préfix s’il est assigné à un tunnel
- return obj and obj.tunnel_set.exists()
- def has_change_permission(self, request, obj=None):
- if obj:
- return False
- else:
- return True
- # pour embêcher de by-passer le check has_delete_permission, on désactive l’action delete
- def get_actions(self, request):
- actions = super().get_actions(request)
- if 'delete_selected' in actions:
- del actions['delete_selected']
- return actions
- class IPResourceAdmin(admin.ModelAdmin):
- list_display = ('__str__', 'available_display', 'route', 'last_use', 'ping',)
- list_filter = (
- 'category',
- ResourceInUseFilter,
- ResourcePingFilter,
- 'reserved',
- ('prefixes', admin.RelatedOnlyFieldListFilter),
- RouteFilter,
- )
- search_fields = ('=ip', 'notes',)
- actions = ['contact_ip_owners']
- ordering = ['ip']
- inlines = [ IPResourceStateInline ]
- def get_fields(self, request, obj=None):
- return self.get_readonly_fields(request, obj)
- def get_readonly_fields(self, request, obj=None):
- fields = ['ip', 'ip6']
- if obj:
- if obj.reserved:
- fields += ['reserved']
- if not obj.in_use:
- fields += ['last_use']
- fields += ['last_state']
- if obj.last_state.state != IPResourceState.STATE_UP:
- fields += ['last_time_up']
- if obj.category == IPResource.CATEGORY_PUBLIC:
- fields += ['password']
- if obj.checkmk_label:
- fields += ['checkmk']
- if obj.notes:
- fields += ['notes']
- return fields
- def get_inline_instances(self, request, obj=None):
- super_inlines = super().get_inline_instances(request, obj)
- inlines = []
- if obj and obj.category == IPResource.CATEGORY_PUBLIC:
- if obj.allocations.filter(get_active_filter()).exists():
- inlines += [ActiveServiceAllocationInline]
- else:
- inlines += [NewServiceAllocationInline]
- if obj.allocations.exclude(get_active_filter()).exists():
- inlines += [InactiveServiceAllocationInline]
- return [inline(self.model, self.admin_site) for inline in inlines] + super_inlines
- def get_queryset(self, request):
- qs = super().get_queryset(request)
- now = timezone.now()
- qs = qs.annotate(
- last_use=models.Case(
- models.When(in_use=True, then=now),
- models.When(category=0, then=models.Max('service_allocation__end')),
- models.When(category=1, then=models.Max('antenna_allocation__end')),
- default=None,
- ))
- qs = qs.annotate(
- downtime=models.Case(
- models.When(last_state__state=IPResourceState.STATE_UP, then=models.F('last_state__date')-models.Value(now)),
- models.When(last_state__state=IPResourceState.STATE_DOWN, then=models.Value(now)-models.F('last_time_up')),
- default=None,
- output_field=models.DurationField(),
- ))
- qs = qs.annotate(
- route=models.Case(
- models.When(
- in_use_by_service=True,
- then=models.Subquery(
- ServiceAllocation.objects.filter(
- Q(resource=models.OuterRef('pk')) & get_active_filter()
- ).values('route__name')[:1]
- ),
- ),
- output_field=models.CharField(),
- ))
- return qs
- def available_display(self, obj):
- return not obj.reserved and not obj.in_use
- available_display.short_description = 'Disponible'
- available_display.boolean = True
- def last_use(self, obj):
- if obj.last_use:
- return naturaltime(obj.last_use)
- else:
- return '-'
- last_use.short_description = 'Dernière utilisation'
- last_use.admin_order_field = 'last_use'
- def ping(self, obj):
- if obj.last_state.state == IPResourceState.STATE_UP:
- label = 'UP'
- else:
- if obj.last_time_up:
- label = 'dernier ping : ' + naturaltime(obj.last_time_up)
- else:
- label = 'DOWN'
- if obj.checkmk_url:
- return format_html('<a href="{}">{}</a>', obj.checkmk_url, label)
- else:
- return label
- ping.short_description = 'ping'
- #ping.admin_order_field = 'last_state__date'
- ping.admin_order_field = 'downtime'
- def route(self, obj):
- return obj.route
- route.short_description = 'route'
- route.admin_order_field = 'route'
- def ip6(self, obj):
- return ipv4_to_ipv6(IPv4Address(obj.ip))
- ip6.short_description = 'Préfixe IPv6'
- ip6.admin_order_field = 'ip'
- def checkmk(self, obj):
- return format_html('<a href="{}">{}</a>', obj.checkmk_url, 'voir')
- checkmk.short_description = 'CheckMK'
- def contact_ip_owners(self, request, queryset):
- selected = request.POST.getlist(admin.ACTION_CHECKBOX_NAME)
- services = ServiceAllocation.objects.filter(resource__ip__in=selected) \
- .filter(get_active_filter()) \
- .values_list('service__adhesion', flat=True)
- antennas = AntennaAllocation.objects.filter(resource__ip__in=selected) \
- .filter(get_active_filter()) \
- .values_list('antenna__contact', flat=True)
- pk = ",".join(map(str, set(services) | set(antennas)))
- return HttpResponseRedirect(reverse('admin:contact-adherents') + "?pk=%s" % pk)
- contact_ip_owners.short_description = 'Contacter les adhérents'
- def stop_allocation(self, request, resource):
- resource = self.get_object(request, resource)
- allocation = resource.allocations.filter(get_active_filter()).first()
- if not allocation: # L’IP n’est pas allouée
- return HttpResponseRedirect(reverse('admin:services_ipresource_change', args=[resource.pk]))
- form = StopAllocationForm(request.POST or None)
- if request.method == 'POST' and form.is_valid():
- self.message_user(request, 'Allocation stoppée.')
- allocation.end = timezone.now()
- allocation.save()
- notify_allocation(request, allocation)
- # Il faudrait rajouter un redirect dans l’URL pour rediriger vers l’IP ou le Service
- return HttpResponseRedirect(reverse('admin:services_ipresource_change', args=[resource.pk]))
- context = self.admin_site.each_context(request)
- context.update({
- 'opts': self.model._meta,
- 'title': 'Stopper une allocation',
- 'object': resource,
- 'media': self.media,
- 'form': form,
- })
- return TemplateResponse(request, "admin/services/ipresource/stop_allocation.html", context)
- def get_urls(self):
- my_urls = [
- path('<resource>/stop/', self.admin_site.admin_view(self.stop_allocation), name='stop-allocation'),
- ]
- return my_urls + super().get_urls()
- def get_actions(self, request):
- actions = super().get_actions(request)
- if 'delete_selected' in actions:
- del actions['delete_selected']
- return actions
- def has_add_permission(self, request, obj=None):
- return False
- def has_delete_permission(self, request, obj=None):
- return False
- class IPResourceStateAdmin(admin.ModelAdmin):
- list_display = ('ip', 'date', 'state',)
- def get_actions(self, request):
- actions = super().get_actions(request)
- if 'delete_selected' in actions:
- del actions['delete_selected']
- return actions
- def has_add_permission(self, request, obj=None):
- return False
- def has_change_permission(self, request, obj=None):
- return False
- def has_delete_permission(self, request, obj=None):
- return False
- class RouteAdmin(admin.ModelAdmin):
- list_display = ('name',)
- search_fields = ('name',)
- def get_fieldsets(self, request, obj=None):
- if obj:
- return (
- (None, {'fields': ['name']}),
- ('Adhérent·e·s', {'fields': ['get_adh'], 'classes': ['collapse']}),
- ('E-mails', {'fields': ['get_email'], 'classes': ['collapse']}),
- ('SMS', {'fields': ['get_sms'], 'classes': ['collapse']}),
- ('IP', {'fields': ['get_ip'], 'classes': ['collapse']}),
- )
- else:
- return (
- (None, {'fields': ['name']}),
- )
- def get_readonly_fields(self, request, obj=None):
- if obj:
- return ('get_email', 'get_sms', 'get_ip', 'get_adh',)
- else:
- return ()
- def get_email(self, route):
- return '\n'.join(route.get_email())
- get_email.short_description = 'E-mails'
- def get_sms(self, route):
- sms_filter = lambda x: x[:2] == '06' or x[:2] == '07' or x[:3] == '+336' or x[:3] == '+337'
- return '\n'.join(filter(sms_filter, route.get_tel()))
- get_sms.short_description = 'SMS'
- def get_ip(self, route):
- return '\n'.join(route.get_ip())
- get_ip.short_description = 'IP'
- def get_adh(self, route):
- return '\n'.join(map(lambda adh: '%s %s' % (adh, adh.adherent), route.get_adh()))
- get_adh.short_description = 'Adhérent·e·s'
- def get_actions(self, request):
- actions = super().get_actions(request)
- if 'delete_selected' in actions:
- del actions['delete_selected']
- return actions
- def has_delete_permission(self, request, obj=None):
- if obj:
- if obj.allocations.exists():
- return False
- return True
- return False
- class TunnelAdmin(admin.ModelAdmin):
- list_display = ('name', 'description', 'created', 'active')
- list_filter = (
- ActiveTunnelFilter,
- )
- def get_actions(self, request):
- actions = super().get_actions(request)
- if 'delete_selected' in actions:
- del actions['delete_selected']
- return actions
- def has_delete_permission(self, request, obj=None):
- if obj:
- if obj.allocations.exists():
- return False
- return True
- return False
- def active(self, obj):
- return not obj.ended
- active.short_description = 'Actif'
- active.boolean = True
- class ServiceTypeAdmin(admin.ModelAdmin):
- fields = ('name', 'contact')
- def get_actions(self, request):
- actions = super().get_actions(request)
- if 'delete_selected' in actions:
- del actions['delete_selected']
- return actions
- def has_delete_permission(self, request, obj=None):
- return False
- class ActiveAntennaLayer(GeoJSONLayerView):
- def get_queryset(self):
- return Antenna.objects.filter(get_active_filter('allocation'))
- class AntennaAdmin(admin.ModelAdmin):
- #inlines = (ActiveAntennaAllocationInline, InactiveAntennaAllocationInline,)
- list_filter = (
- AntennaPrefixFilter,
- AntennaPositionFilter,
- 'mode',
- 'ssid',
- )
- list_display_links = ('id', 'label')
- search_fields = ('=id', 'label', 'notes', 'ssid')
- raw_id_fields = ('contact',)
- form = AntennaForm
- def get_queryset(self, request):
- qs = super().get_queryset(request)
- if connection.vendor == 'postgresql':
- qs = qs.annotate(
- ip=StringAgg( # concaténation des IP avec des virgules directement par postgresql
- Cast( # casting en TextField car StringApp oppère sur des string mais les ip sont des inet
- models.Case( # seulement les IP des allocations actives
- models.When(
- get_active_filter('allocation'),
- then='allocation__resource__ip'
- ),
- ),
- models.TextField()
- ),
- delimiter=', '
- )
- )
- return qs
- def get_list_display(self, request):
- # ssid_display needs request to access query string and preserve filters
- ssid_display = partial(self.ssid_display, request=request)
- update_wrapper(ssid_display, self.ssid_display)
- return ('id', 'label', 'mode', ssid_display, 'position_display', 'ip_display')
- def position_display(self, obj):
- return obj.position is not None
- position_display.short_description = 'Géolocalisé'
- position_display.boolean = True
- def ip_display(self, obj):
- if connection.vendor == 'postgresql':
- return obj.ip
- else:
- # peu efficace car génère une requête par ligne
- allocations = obj.allocations.filter(active=True)
- return ', '.join(allocations.values_list('resource__ip', flat=True)) or '-'
- ip_display.short_description = 'IP'
- def ssid_display(self, obj, request):
- if obj.ssid:
- qs = request.GET.copy()
- qs.update({'ssid': obj.ssid})
- ssid_url = reverse('admin:services_antenna_changelist') + '?' + urlencode(qs)
- return format_html(u'<a href="{}">{}</a>', ssid_url, obj.ssid)
- else:
- return None
- ssid_display.short_description = 'SSID'
- def get_actions(self, request):
- actions = super().get_actions(request)
- if 'delete_selected' in actions:
- del actions['delete_selected']
- return actions
- def has_delete_permission(self, request, obj=None):
- return False
- def get_urls(self):
- my_urls = [
- url(r'^map/$', self.admin_site.admin_view(self.map_view, cacheable=True), name='antenna-map'),
- url(r'^map/data.json$', self.admin_site.admin_view(ActiveAntennaLayer.as_view(
- model=Antenna,
- geometry_field='position',
- properties=('label', 'mode', 'ssid', 'orientation', 'absolute_url',),
- )), name='antenna-map-data'),
- ]
- return my_urls + super().get_urls()
- def map_view(self, request):
- return TemplateResponse(request, 'services/antenna_map.html', dict(
- self.admin_site.each_context(request),
- opts=self.model._meta,
- json_url=reverse('admin:antenna-map-data'),
- ))
- def map_data_view(self, request):
- geojson = serialize('geojson', Antenna.objects.all(), geometry_field='point', fields=('position',))
- return HttpResponse(geojson, content_type='application/json')
- class SwitchAdmin(admin.ModelAdmin):
- list_display = ('name', 'ports_count', 'active_ports_count', 'inactive_ports_count', 'unknown_ports_count',)
- fields = ('name', 'first_port', 'last_port', 'notes',)
- search_fields = ('name', 'notes', 'ports__notes', 'ports__service__label',)
- def get_queryset(self, request):
- qs = super().get_queryset(request)
- qs = qs.annotate(
- active_ports_count=models.Count(models.Case(models.When(ports__up=True, then=models.Value('1')))),
- unknown_ports_count=models.Count(models.Case(models.When(ports__up__isnull=True, then=models.Value('1')))),
- inactive_ports_count=models.Count(models.Case(models.When(ports__up=False, then=models.Value('1')))),
- )
- return qs
- def ports_count(self, switch):
- return switch.last_port - switch.first_port + 1
- ports_count.short_description = 'Nombre de ports'
- def active_ports_count(self, switch):
- return switch.active_ports_count
- active_ports_count.short_description = 'up'
- active_ports_count.admin_order_field = 'active_ports_count'
- def inactive_ports_count(self, switch):
- return switch.inactive_ports_count
- inactive_ports_count.short_description = 'down'
- inactive_ports_count.admin_order_field = 'inactive_ports_count'
- def unknown_ports_count(self, switch):
- return switch.unknown_ports_count
- unknown_ports_count.short_description = 'inconnus'
- unknown_ports_count.admin_order_field = 'unknown_ports_count'
- def get_inline_instances(self, request, obj=None):
- if obj:
- return [ SwitchPortInline(self.model, self.admin_site) ]
- else:
- return []
- def get_readonly_fields(self, request, obj=None):
- if obj:
- return ('first_port', 'last_port',)
- else:
- return ()
- def get_actions(self, request):
- actions = super().get_actions(request)
- if 'delete_selected' in actions:
- del actions['delete_selected']
- return actions
- def has_delete_permission(self, request, obj=None):
- return False
- admin.site.register(ServiceType, ServiceTypeAdmin)
- admin.site.register(Service, ServiceAdmin)
- admin.site.register(IPPrefix, IPPrefixAdmin)
- admin.site.register(IPResource, IPResourceAdmin)
- admin.site.register(Route, RouteAdmin)
- admin.site.register(Tunnel, TunnelAdmin)
- admin.site.register(Antenna, AntennaAdmin)
- admin.site.register(Switch, SwitchAdmin)
|