from django.contrib import admin
from django.db import models
from django.db.models import Q
from django.forms import ModelForm, BaseInlineFormSet
from django.utils import timezone
from django.urls import reverse, path
from django.utils.html import format_html
from django.conf.urls import url
from django.template.response import TemplateResponse
from django.core.serializers import serialize
from django.http import HttpResponse
from django.db.models.functions import Cast
from django.contrib.postgres.aggregates import StringAgg
from django.db import connection, transaction
from django.core.cache import cache
from django.contrib.humanize.templatetags.humanize import naturaltime
from django.contrib.contenttypes.models import ContentType
from django.http import HttpResponseRedirect
from django.core.exceptions import ValidationError
from djgeojson.views import GeoJSONLayerView
from urllib.parse import urlencode
from functools import partial, update_wrapper
from datetime import timedelta
from ipaddress import IPv4Address
from djadhere.utils import get_active_filter
from adhesions.models import Adhesion
from banking.models import PaymentUpdate
from .models import Service, ServiceType, IPPrefix, IPResource, IPResourceState, \
ServiceAllocation, Antenna, AntennaAllocation, Allocation, \
Route, Tunnel, Switch, Port
from .utils.notifications import notify_allocation
from .utils.ip_conversion import ipv4_to_ipv6
from .forms import AntennaForm, StopAllocationForm
### Filters
class ResourceInUseFilter(admin.SimpleListFilter):
title = 'disponibilité'
parameter_name = 'available'
def lookups(self, request, model_admin):
return (
(1, 'Disponible'),
(0, 'Non disponible'),
)
def queryset(self, request, queryset):
available_filter = Q(reserved=False, in_use=False)
if self.value() == '0': # non disponible
return queryset.exclude(available_filter)
if self.value() == '1': # disponible
return queryset.filter(available_filter)
class ResourcePingFilter(admin.SimpleListFilter):
title = 'ping'
parameter_name = 'ping'
def lookups(self, request, model_admin):
return (
('up', 'UP'),
('down', 'DOWN'),
('down-since', 'DOWN depuis…'),
('never-up', 'Jamais vu UP'),
)
def queryset(self, request, queryset):
if self.value() == 'up':
return queryset.filter(last_state__state=IPResourceState.STATE_UP)
if self.value() == 'down':
return queryset.exclude(last_state__state=IPResourceState.STATE_UP) # DOWN + UNKNOWN
if self.value() == 'down-since':
queryset = queryset.exclude(last_state__state=IPResourceState.STATE_UP)
return queryset.filter(last_time_up__isnull=False)
if self.value() == 'never-up':
queryset = queryset.exclude(last_state__state=IPResourceState.STATE_UP) # DOWN + UNKWON
return queryset.filter(last_time_up__isnull=True)
class ActiveServiceFilter(admin.SimpleListFilter):
title = 'actif'
parameter_name = 'active'
def lookups(self, request, model_admin):
return (
(1, 'Actif'),
(0, 'Inactif'),
)
def queryset(self, request, queryset):
if self.value() == '0': # inactif
return queryset.exclude(get_active_filter('allocation'))
if self.value() == '1': # actif
return queryset.filter(get_active_filter('allocation'))
class RouteFilter(admin.SimpleListFilter):
title = 'route'
parameter_name = 'route'
def lookups(self, request, model_admin):
return ServiceAllocation.objects.filter(active=True).values_list('route__pk', 'route__name').distinct()
def queryset(self, request, queryset):
try:
route = int(self.value())
except (TypeError, ValueError):
pass
else:
allocations = ServiceAllocation.objects.filter(active=True, route__pk=route).values_list('pk', flat=True)
queryset = queryset.filter(service_allocation__in=allocations)
return queryset
class AntennaPrefixFilter(admin.SimpleListFilter):
title = 'préfix'
parameter_name = 'prefix'
def lookups(self, request, model_admin):
resources = AntennaAllocation.objects.filter(active=True).values_list('resource__pk', flat=True)
prefixes = IPPrefix.objects.filter(ipresource__in=resources).values_list('pk', 'prefix').distinct()
return prefixes
def queryset(self, request, queryset):
try:
prefix = int(self.value())
except (TypeError, ValueError):
pass
else:
allocations = AntennaAllocation.objects.filter(active=True, resource__prefixes__pk=prefix).values_list('pk', flat=True)
queryset = queryset.filter(allocation__in=allocations)
return queryset
class AntennaPositionFilter(admin.SimpleListFilter):
title = 'géolocalisation'
parameter_name = 'position'
def lookups(self, request, model_admin):
return (
('1', 'Connue'),
('0', 'Inconnue'),
)
def queryset(self, request, queryset):
query = Q(position__isnull=True)
if self.value() == '0':
return queryset.filter(query)
if self.value() == '1':
return queryset.exclude(query)
return queryset
class ActiveTunnelFilter(admin.SimpleListFilter):
title = 'status'
parameter_name = 'active'
def lookups(self, request, model_admin):
return (
('1', 'Actif'),
('0', 'Désactivé'),
)
def queryset(self, request, queryset):
query = Q(ended__isnull=True)
if self.value() == '0':
return queryset.exclude(query)
if self.value() == '1':
return queryset.filter(query)
return queryset
### Inlines
class AllocationInlineFormSet(BaseInlineFormSet):
def save_new(self, form, commit=True):
obj = super().save_new(form, commit)
if type(obj) == ServiceAllocation:
notify_allocation(self.request, obj)
return obj
def save_existing(self, form, instance, commit=True):
old = type(instance).objects.get(pk=instance.pk)
if type(instance) == ServiceAllocation:
notify_allocation(self.request, instance, old)
return super().save_existing(form, instance, commit)
class AllocationInline(admin.TabularInline):
formset = AllocationInlineFormSet
extra = 0
show_change_link = True
ordering = ('-start',)
def resource_link(self, obj):
url = reverse('admin:services_ipresource_change', args=[obj.resource.pk])
return format_html('{}', url, str(obj.resource))
resource_link.short_description = 'IP'
def get_queryset(self, request):
qs = super().get_queryset(request)
qs = qs.select_related('resource')
return qs
def get_formset(self, request, obj=None, **kwargs):
formset = super().get_formset(request, obj, **kwargs)
formset.request = request
return formset
def has_delete_permission(self, request, obj=None):
return False
class NewAllocationMixin:
verbose_name_plural = 'Nouvelle allocation'
max_num = 1
def get_queryset(self, request):
return super().get_queryset(request).model.objects.none()
class ActiveAllocationMixin:
verbose_name_plural = 'Allocations actives'
max_num = 0
def get_queryset(self, request):
return super().get_queryset(request).filter(get_active_filter())
class InactiveAllocationMixin:
verbose_name_plural = 'Anciennes allocations'
max_num = 0
def get_queryset(self, request):
return super().get_queryset(request).exclude(get_active_filter())
class ServiceAllocationMixin:
model = ServiceAllocation
fields = ('id', 'service', 'resource', 'route', 'start', 'end')
raw_id_fields = ('resource',)
autocomplete_fields = ('service',)
def service_link(self, obj):
url = reverse('admin:services_service_change', args=[obj.service.pk])
return format_html('{}', url, str(obj.service))
service_link.short_description = 'Service'
def get_queryset(self, request):
qs = super().get_queryset(request)
qs = qs.select_related('route')
return qs
#class AntennaAllocationMixin:
# model = AntennaAllocation
# fields = ('id', 'antenna', 'resource', 'start', 'end')
# raw_id_fields = ('resource',)
# autocomplete_fields = ('antenna',)
#
# def get_queryset(self, request):
# qs = super().get_queryset(request)
# qs = qs.select_related('antenna')
# return qs
class NewServiceAllocationInline(ServiceAllocationMixin, NewAllocationMixin, AllocationInline):
fields = ('id', 'service', 'resource', 'route',)
class ActiveServiceAllocationInline(ServiceAllocationMixin, ActiveAllocationMixin, AllocationInline):
fields = ('id', 'service_link', 'resource_link', 'route', 'start', 'stop',)
readonly_fields = ('service_link', 'start', 'resource_link', 'stop',)
def stop(self, obj):
return format_html('Terminer', reverse('admin:stop-allocation', kwargs={'resource': obj.resource.ip}))
stop.short_description = 'Terminer l’allocation'
class InactiveServiceAllocationInline(ServiceAllocationMixin, InactiveAllocationMixin, AllocationInline):
fields = ('id', 'service_link', 'resource_link', 'route', 'start', 'end')
readonly_fields = ('service_link', 'resource_link', 'route', 'start', 'end')
#class ActiveAntennaAllocationInline(AntennaAllocationMixin, ActiveAllocationMixin, AllocationInline):
# pass
#class InactiveAntennaAllocationInline(AntennaAllocationMixin, InactiveAllocationMixin, AllocationInline):
# pass
class IPResourceStateInline(admin.TabularInline):
model = IPResourceState
verbose_name_plural = 'Historique des derniers changements d’état'
fields = ['date']
readonly_fields = ['date']
ordering = ['-date']
def has_add_permission(self, request):
return False
def has_delete_permission(self, request, obj=None):
return False
class PortInline(admin.TabularInline):
model = Port
max_num = 0
def has_add_permission(self, request):
return False
def has_delete_permission(self, request, obj=None):
return False
class SwitchPortInline(PortInline):
fields = ('port', 'up', 'reserved', 'service', 'notes',)
readonly_fields = ('port', 'up',)
autocomplete_fields = ('service',)
def get_queryset(self, request):
qs = super().get_queryset(request)
qs = qs.select_related('switch', 'service', 'service__service_type')
return qs
class ServicePortInline(PortInline):
fields = ('switch', 'port', 'up', 'notes',)
readonly_fields = ('switch', 'port', 'up',)
### Forms
class ServiceForm(ModelForm):
def clean_adhesion(self):
if hasattr(self.instance, 'adhesion') \
and self.instance.adhesion.pk != self.cleaned_data['adhesion'].pk \
and not self.instance.is_active():
raise ValidationError('Il n’est pas possible de ré-affecter à un autre adhérent un service inactif (i.e. sans allocations actives).')
return self.cleaned_data['adhesion']
### ModelAdmin
class ServiceAdmin(admin.ModelAdmin):
list_display = ('id', 'get_adhesion_link', 'get_adherent_link', 'service_type', 'label', 'is_active',)
list_select_related = ('adhesion', 'adhesion__user', 'adhesion__user__profile', 'adhesion__corporation', 'service_type',)
list_filter = (
ActiveServiceFilter,
'loan_equipment',
('service_type', admin.RelatedOnlyFieldListFilter),
)
search_fields = ('=id', 'service_type__name', 'label', 'notes',)
fields = ('adhesion', 'service_type', 'label', 'notes', 'loan_equipment', 'get_contribution_link', 'is_active',)
readonly_fields = ('get_contribution_link', 'is_active',)
raw_id_fields = ('adhesion',)
form = ServiceForm
def save_model(self, request, srv, form, change):
if srv.pk and 'adhesion' in form.changed_data:
with transaction.atomic():
old_srv = Service.objects.get(pk=srv.pk)
adhesion = srv.adhesion
srv.adhesion = old_srv.adhesion
label = srv.label
srv.label = '%s (transféré à ADT%d le %s)' % (srv.label, adhesion.pk, timezone.now().strftime('%d/%m/%Y'))
srv.save()
new_srv = Service.objects.create(adhesion=adhesion, service_type=srv.service_type, label=label,
notes=srv.notes, loan_equipment=srv.loan_equipment)
for allocation in srv.active_allocations:
allocation.end = timezone.now()
allocation.save()
ServiceAllocation.objects.create(resource=allocation.resource, service=new_srv, route=allocation.route)
else:
super().save_model(request, srv, form, change)
def get_queryset(self, request):
qs = super().get_queryset(request)
qs = qs.prefetch_related('allocations',)
return qs
get_adhesion_link = lambda self, service: service.adhesion.get_adhesion_link()
get_adhesion_link.short_description = Adhesion.get_adhesion_link.short_description
get_adherent_link = lambda self, service: service.adhesion.get_adherent_link()
get_adherent_link.short_description = Adhesion.get_adherent_link.short_description
def get_contribution_link(self, obj):
return format_html(u'{}', obj.contribution.get_absolute_url(), obj.contribution.get_current_payment_display())
get_contribution_link.short_description = 'Contribution financière'
def get_inline_instances(self, request, obj=None):
inlines = []
if obj and obj.ports.exists():
inlines += [ServicePortInline]
inlines += [NewServiceAllocationInline]
if obj and obj.active_allocations.exists():
inlines += [ActiveServiceAllocationInline]
if obj and obj.inactive_allocations.exists():
inlines += [InactiveServiceAllocationInline]
return [inline(self.model, self.admin_site) for inline in inlines]
def get_actions(self, request):
actions = super().get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
return actions
def has_delete_permission(self, request, obj=None):
if not obj:
return False
one_year_ago = timezone.now() - timedelta(days=365)
contribution = obj.contribution.updates.filter(validated=True).first()
# s’il y avait un paiement actif il y a moins d’un an
if not contribution or contribution.payment_method != PaymentUpdate.STOP or contribution.start > one_year_ago:
return False
# s’il y avait une allocation active il y a moins d’un an
if any(map(lambda a: a.end is None or a.end > one_year_ago, obj.allocations.all())):
return False
return True
class IPPrefixAdmin(admin.ModelAdmin):
def has_delete_permission(self, request, obj=None):
# Interdiction de supprimer le préfix s’il est assigné à un tunnel
return obj and obj.tunnel_set.exists()
def has_change_permission(self, request, obj=None):
if obj:
return False
else:
return True
# pour embêcher de by-passer le check has_delete_permission, on désactive l’action delete
def get_actions(self, request):
actions = super().get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
return actions
class IPResourceAdmin(admin.ModelAdmin):
list_display = ('__str__', 'available_display', 'route', 'last_use', 'ping',)
list_filter = (
'category',
ResourceInUseFilter,
ResourcePingFilter,
'reserved',
('prefixes', admin.RelatedOnlyFieldListFilter),
RouteFilter,
)
search_fields = ('=ip', 'notes',)
actions = ['contact_ip_owners']
ordering = ['ip']
inlines = [ IPResourceStateInline ]
def get_fields(self, request, obj=None):
return self.get_readonly_fields(request, obj)
def get_readonly_fields(self, request, obj=None):
fields = ['ip', 'ip6']
if obj:
if obj.reserved:
fields += ['reserved']
if not obj.in_use:
fields += ['last_use']
fields += ['last_state']
if obj.last_state.state != IPResourceState.STATE_UP:
fields += ['last_time_up']
if obj.category == IPResource.CATEGORY_PUBLIC:
fields += ['password']
if obj.checkmk_label:
fields += ['checkmk']
if obj.notes:
fields += ['notes']
return fields
def get_inline_instances(self, request, obj=None):
super_inlines = super().get_inline_instances(request, obj)
inlines = []
if obj and obj.category == IPResource.CATEGORY_PUBLIC:
if obj.allocations.filter(get_active_filter()).exists():
inlines += [ActiveServiceAllocationInline]
else:
inlines += [NewServiceAllocationInline]
if obj.allocations.exclude(get_active_filter()).exists():
inlines += [InactiveServiceAllocationInline]
return [inline(self.model, self.admin_site) for inline in inlines] + super_inlines
def get_queryset(self, request):
qs = super().get_queryset(request)
now = timezone.now()
qs = qs.annotate(
last_use=models.Case(
models.When(in_use=True, then=now),
models.When(category=0, then=models.Max('service_allocation__end')),
models.When(category=1, then=models.Max('antenna_allocation__end')),
default=None,
))
qs = qs.annotate(
downtime=models.Case(
models.When(last_state__state=IPResourceState.STATE_UP, then=models.F('last_state__date')-models.Value(now)),
models.When(last_state__state=IPResourceState.STATE_DOWN, then=models.Value(now)-models.F('last_time_up')),
default=None,
output_field=models.DurationField(),
))
qs = qs.annotate(
route=models.Case(
models.When(
in_use_by_service=True,
then=models.Subquery(
ServiceAllocation.objects.filter(
Q(resource=models.OuterRef('pk')) & get_active_filter()
).values('route__name')[:1]
),
),
output_field=models.CharField(),
))
return qs
def available_display(self, obj):
return not obj.reserved and not obj.in_use
available_display.short_description = 'Disponible'
available_display.boolean = True
def last_use(self, obj):
if obj.last_use:
return naturaltime(obj.last_use)
else:
return '-'
last_use.short_description = 'Dernière utilisation'
last_use.admin_order_field = 'last_use'
def ping(self, obj):
if obj.last_state.state == IPResourceState.STATE_UP:
label = 'UP'
else:
if obj.last_time_up:
label = 'dernier ping : ' + naturaltime(obj.last_time_up)
else:
label = 'DOWN'
if obj.checkmk_url:
return format_html('{}', obj.checkmk_url, label)
else:
return label
ping.short_description = 'ping'
#ping.admin_order_field = 'last_state__date'
ping.admin_order_field = 'downtime'
def route(self, obj):
return obj.route
route.short_description = 'route'
route.admin_order_field = 'route'
def ip6(self, obj):
return ipv4_to_ipv6(IPv4Address(obj.ip))
ip6.short_description = 'Préfixe IPv6'
ip6.admin_order_field = 'ip'
def checkmk(self, obj):
return format_html('{}', obj.checkmk_url, 'voir')
checkmk.short_description = 'CheckMK'
def contact_ip_owners(self, request, queryset):
selected = request.POST.getlist(admin.ACTION_CHECKBOX_NAME)
services = ServiceAllocation.objects.filter(resource__ip__in=selected) \
.filter(get_active_filter()) \
.values_list('service__adhesion', flat=True)
antennas = AntennaAllocation.objects.filter(resource__ip__in=selected) \
.filter(get_active_filter()) \
.values_list('antenna__contact', flat=True)
pk = ",".join(map(str, set(services) | set(antennas)))
return HttpResponseRedirect(reverse('admin:contact-adherents') + "?pk=%s" % pk)
contact_ip_owners.short_description = 'Contacter les adhérents'
def stop_allocation(self, request, resource):
resource = self.get_object(request, resource)
allocation = resource.allocations.filter(get_active_filter()).first()
if not allocation: # L’IP n’est pas allouée
return HttpResponseRedirect(reverse('admin:services_ipresource_change', args=[resource.pk]))
form = StopAllocationForm(request.POST or None)
if request.method == 'POST' and form.is_valid():
self.message_user(request, 'Allocation stoppée.')
allocation.end = timezone.now()
allocation.save()
notify_allocation(request, allocation)
# Il faudrait rajouter un redirect dans l’URL pour rediriger vers l’IP ou le Service
return HttpResponseRedirect(reverse('admin:services_ipresource_change', args=[resource.pk]))
context = self.admin_site.each_context(request)
context.update({
'opts': self.model._meta,
'title': 'Stopper une allocation',
'object': resource,
'media': self.media,
'form': form,
})
return TemplateResponse(request, "admin/services/ipresource/stop_allocation.html", context)
def get_urls(self):
my_urls = [
path('/stop/', self.admin_site.admin_view(self.stop_allocation), name='stop-allocation'),
]
return my_urls + super().get_urls()
def get_actions(self, request):
actions = super().get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
return actions
def has_add_permission(self, request, obj=None):
return False
def has_delete_permission(self, request, obj=None):
return False
class IPResourceStateAdmin(admin.ModelAdmin):
list_display = ('ip', 'date', 'state',)
def get_actions(self, request):
actions = super().get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
return actions
def has_add_permission(self, request, obj=None):
return False
def has_change_permission(self, request, obj=None):
return False
def has_delete_permission(self, request, obj=None):
return False
class RouteAdmin(admin.ModelAdmin):
list_display = ('name',)
search_fields = ('name',)
def get_fieldsets(self, request, obj=None):
if obj:
return (
(None, {'fields': ['name']}),
('Adhérent·e·s', {'fields': ['get_adh'], 'classes': ['collapse']}),
('E-mails', {'fields': ['get_email'], 'classes': ['collapse']}),
('SMS', {'fields': ['get_sms'], 'classes': ['collapse']}),
('IP', {'fields': ['get_ip'], 'classes': ['collapse']}),
)
else:
return (
(None, {'fields': ['name']}),
)
def get_readonly_fields(self, request, obj=None):
if obj:
return ('get_email', 'get_sms', 'get_ip', 'get_adh',)
else:
return ()
def get_email(self, route):
return '\n'.join(route.get_email())
get_email.short_description = 'E-mails'
def get_sms(self, route):
sms_filter = lambda x: x[:2] == '06' or x[:2] == '07' or x[:3] == '+336' or x[:3] == '+337'
return '\n'.join(filter(sms_filter, route.get_tel()))
get_sms.short_description = 'SMS'
def get_ip(self, route):
return '\n'.join(route.get_ip())
get_ip.short_description = 'IP'
def get_adh(self, route):
return '\n'.join(map(lambda adh: '%s %s' % (adh, adh.adherent), route.get_adh()))
get_adh.short_description = 'Adhérent·e·s'
def get_actions(self, request):
actions = super().get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
return actions
def has_delete_permission(self, request, obj=None):
if obj:
if obj.allocations.exists():
return False
return True
return False
class TunnelAdmin(admin.ModelAdmin):
list_display = ('name', 'description', 'created', 'active')
list_filter = (
ActiveTunnelFilter,
)
def get_actions(self, request):
actions = super().get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
return actions
def has_delete_permission(self, request, obj=None):
if obj:
if obj.allocations.exists():
return False
return True
return False
def active(self, obj):
return not obj.ended
active.short_description = 'Actif'
active.boolean = True
class ServiceTypeAdmin(admin.ModelAdmin):
fields = ('name', 'contact')
def get_actions(self, request):
actions = super().get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
return actions
def has_delete_permission(self, request, obj=None):
return False
class ActiveAntennaLayer(GeoJSONLayerView):
def get_queryset(self):
return Antenna.objects.filter(get_active_filter('allocation'))
class AntennaAdmin(admin.ModelAdmin):
#inlines = (ActiveAntennaAllocationInline, InactiveAntennaAllocationInline,)
list_filter = (
AntennaPrefixFilter,
AntennaPositionFilter,
'mode',
'ssid',
)
list_display_links = ('id', 'label')
search_fields = ('=id', 'label', 'notes', 'ssid')
raw_id_fields = ('contact',)
form = AntennaForm
def get_queryset(self, request):
qs = super().get_queryset(request)
if connection.vendor == 'postgresql':
qs = qs.annotate(
ip=StringAgg( # concaténation des IP avec des virgules directement par postgresql
Cast( # casting en TextField car StringApp oppère sur des string mais les ip sont des inet
models.Case( # seulement les IP des allocations actives
models.When(
get_active_filter('allocation'),
then='allocation__resource__ip'
),
),
models.TextField()
),
delimiter=', '
)
)
return qs
def get_list_display(self, request):
# ssid_display needs request to access query string and preserve filters
ssid_display = partial(self.ssid_display, request=request)
update_wrapper(ssid_display, self.ssid_display)
return ('id', 'label', 'mode', ssid_display, 'position_display', 'ip_display')
def position_display(self, obj):
return obj.position is not None
position_display.short_description = 'Géolocalisé'
position_display.boolean = True
def ip_display(self, obj):
if connection.vendor == 'postgresql':
return obj.ip
else:
# peu efficace car génère une requête par ligne
allocations = obj.allocations.filter(active=True)
return ', '.join(allocations.values_list('resource__ip', flat=True)) or '-'
ip_display.short_description = 'IP'
def ssid_display(self, obj, request):
if obj.ssid:
qs = request.GET.copy()
qs.update({'ssid': obj.ssid})
ssid_url = reverse('admin:services_antenna_changelist') + '?' + urlencode(qs)
return format_html(u'{}', ssid_url, obj.ssid)
else:
return None
ssid_display.short_description = 'SSID'
def get_actions(self, request):
actions = super().get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
return actions
def has_delete_permission(self, request, obj=None):
return False
def get_urls(self):
my_urls = [
url(r'^map/$', self.admin_site.admin_view(self.map_view, cacheable=True), name='antenna-map'),
url(r'^map/data.json$', self.admin_site.admin_view(ActiveAntennaLayer.as_view(
model=Antenna,
geometry_field='position',
properties=('label', 'mode', 'ssid', 'orientation', 'absolute_url',),
)), name='antenna-map-data'),
]
return my_urls + super().get_urls()
def map_view(self, request):
return TemplateResponse(request, 'services/antenna_map.html', dict(
self.admin_site.each_context(request),
opts=self.model._meta,
json_url=reverse('admin:antenna-map-data'),
))
def map_data_view(self, request):
geojson = serialize('geojson', Antenna.objects.all(), geometry_field='point', fields=('position',))
return HttpResponse(geojson, content_type='application/json')
class SwitchAdmin(admin.ModelAdmin):
list_display = ('name', 'ports_count', 'active_ports_count', 'inactive_ports_count', 'unknown_ports_count',)
fields = ('name', 'first_port', 'last_port', 'notes',)
search_fields = ('name', 'notes', 'ports__notes', 'ports__service__label',)
def get_queryset(self, request):
qs = super().get_queryset(request)
qs = qs.annotate(
active_ports_count=models.Count(models.Case(models.When(ports__up=True, then=models.Value('1')))),
unknown_ports_count=models.Count(models.Case(models.When(ports__up__isnull=True, then=models.Value('1')))),
inactive_ports_count=models.Count(models.Case(models.When(ports__up=False, then=models.Value('1')))),
)
return qs
def ports_count(self, switch):
return switch.last_port - switch.first_port + 1
ports_count.short_description = 'Nombre de ports'
def active_ports_count(self, switch):
return switch.active_ports_count
active_ports_count.short_description = 'up'
active_ports_count.admin_order_field = 'active_ports_count'
def inactive_ports_count(self, switch):
return switch.inactive_ports_count
inactive_ports_count.short_description = 'down'
inactive_ports_count.admin_order_field = 'inactive_ports_count'
def unknown_ports_count(self, switch):
return switch.unknown_ports_count
unknown_ports_count.short_description = 'inconnus'
unknown_ports_count.admin_order_field = 'unknown_ports_count'
def get_inline_instances(self, request, obj=None):
if obj:
return [ SwitchPortInline(self.model, self.admin_site) ]
else:
return []
def get_readonly_fields(self, request, obj=None):
if obj:
return ('first_port', 'last_port',)
else:
return ()
def get_actions(self, request):
actions = super().get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
return actions
def has_delete_permission(self, request, obj=None):
return False
admin.site.register(ServiceType, ServiceTypeAdmin)
admin.site.register(Service, ServiceAdmin)
admin.site.register(IPPrefix, IPPrefixAdmin)
admin.site.register(IPResource, IPResourceAdmin)
admin.site.register(Route, RouteAdmin)
admin.site.register(Tunnel, TunnelAdmin)
admin.site.register(Antenna, AntennaAdmin)
admin.site.register(Switch, SwitchAdmin)