from django.db import models from django.contrib.gis.db import models as geo_models from django.db.models import Q from django.core.validators import MaxValueValidator from django.utils import timezone from django.contrib.auth.models import Group from django.contrib.contenttypes.fields import GenericRelation from django.core.exceptions import ValidationError from django.urls import reverse from django.utils.html import format_html, mark_safe, escape from django.core.exceptions import PermissionDenied from django.core.validators import RegexValidator, MinValueValidator, MaxValueValidator from django.conf import settings from ipaddress import ip_network from urllib.parse import quote from hashlib import sha256 from base64 import urlsafe_b64encode from djadhere.utils import get_active_filter, is_overlapping from adhesions.models import Adhesion from banking.models import RecurringPayment def ipprefix_validator(value): try: ip_network(value) except ValueError: raise ValidationError('%s n’est pas un préfixe valide' % value) class IPPrefix(models.Model): prefix = models.CharField(max_length=128, verbose_name='Préfixe', validators=[ipprefix_validator], unique=True) class Meta: ordering = ['prefix'] verbose_name = 'Réseau' verbose_name_plural = 'Réseaux' def __str__(self): return self.prefix class IPResourceManager(models.Manager): def get_queryset(self): qs = super().get_queryset() # On rajoute une super annotation « in_use » pour savoir si l’IP est dispo ou non :-) qs = qs.annotate( in_use_by_service=models.Exists( ServiceAllocation.objects.filter(Q(resource=models.OuterRef('pk')) & get_active_filter()) ), in_use_by_antenna=models.Exists( AntennaAllocation.objects.filter(Q(resource=models.OuterRef('pk')) & get_active_filter()) ) ) qs = qs.annotate( in_use=models.Case( models.When(Q(in_use_by_service=True) | Q(in_use_by_antenna=True), then=True), default=False, output_field=models.BooleanField() ) ) # Ouf, pas de duplication car l’IP ne peut être utilisé que par un service / une antenne ! return qs class ActiveAllocationManager(models.Manager): def get_queryset(self): qs = super().get_queryset() qs = qs.annotate( active=models.Case( models.When(get_active_filter(), then=True), default=False, output_field=models.BooleanField() ) ) return qs class ActiveServiceManager(models.Manager): def get_queryset(self): qs = super().get_queryset() qs = qs.annotate( active=models.Exists( ServiceAllocation.objects.filter(Q(service=models.OuterRef('pk')) & get_active_filter()) ) ) return qs class IPResource(models.Model): CATEGORY_PUBLIC = 0 CATEGORY_ANTENNA = 1 CATEGORIES = ( (CATEGORY_PUBLIC, 'IP Public'), (CATEGORY_ANTENNA, 'IP Antenne'), ) ip = models.GenericIPAddressField(verbose_name='IP', primary_key=True) prefixes = models.ManyToManyField(IPPrefix, verbose_name='préfixes') reserved = models.BooleanField(default=False, verbose_name='réservée') category = models.IntegerField(choices=CATEGORIES, verbose_name='catégorie') notes = models.TextField(blank=True, default='') checkmk_label = models.CharField(max_length=128, blank=True, default='') last_state = models.ForeignKey("IPResourceState", on_delete=models.PROTECT, related_name='+', verbose_name='dernier état') last_time_up = models.DateTimeField(null=True, blank=True, verbose_name='Dernière réponse au ping') objects = IPResourceManager() @property def allocations(self): if self.category == self.CATEGORY_PUBLIC: return self.service_allocations if self.category == self.CATEGORY_ANTENNA: return self.antenna_allocations @property def checkmk_url(self): if self.checkmk_label: return mark_safe(settings.CHECK_MK_URL.format(host=quote(self.checkmk_label))) else: return None def password(self): data = sha256((settings.MASTER_PASSWORD + self.ip).encode('utf-8')).digest(); return urlsafe_b64encode(data).decode('utf-8')[:8] password.short_description = 'Mot de passe' class Meta: ordering = ['ip'] verbose_name = 'IP' verbose_name_plural = 'IP' def __str__(self): return str(self.ip) class IPResourceState(models.Model): STATE_DOWN = 0 STATE_UP = 1 STATE_UNKNOWN = 2 STATE_CHOICES = ( (STATE_DOWN, 'DOWN'), (STATE_UP, 'UP'), (STATE_UNKNOWN, 'Inconnu'), ) ip = models.ForeignKey(IPResource, on_delete=models.CASCADE, related_name='state_set') date = models.DateTimeField(default=timezone.now) state = models.IntegerField(choices=STATE_CHOICES) def __str__(self): return self.get_state_display() class ServiceType(models.Model): name = models.CharField(max_length=64, verbose_name='Nom', unique=True) contact = models.CharField(max_length=64, verbose_name='Contact en cas de problème', blank=True, default='') class Meta: ordering = ['name'] verbose_name = 'type de service' verbose_name_plural = 'types de service' def __str__(self): return self.name class Service(models.Model): """ En cas d’ajout de champs, penser à mettre à jour la méthode save_model de ServiceAdmin. """ adhesion = models.ForeignKey(Adhesion, verbose_name='Adhérent·e', related_name='services', on_delete=models.CASCADE) service_type = models.ForeignKey(ServiceType, related_name='services', verbose_name='Type de service', on_delete=models.PROTECT) label = models.CharField(blank=True, default='', max_length=128) notes = models.TextField(blank=True, default='') created = models.DateTimeField(auto_now_add=True) loan_equipment = models.BooleanField(default=False, verbose_name='Matériel en prêt') contribution = models.OneToOneField(RecurringPayment, on_delete=models.CASCADE) objects = ActiveServiceManager() def save(self, *args, **kwargs): if not hasattr(self, 'contribution'): self.contribution = RecurringPayment.objects.create() super().save(*args, **kwargs) def clean(self): super().clean() # Vérification de l’unicité par type de service du label if self.label != '' and Service.objects.exclude(pk=self.pk).filter(service_type=self.service_type, label=self.label): raise ValidationError("Un service du même type existe déjà avec ce label.") def is_active(self): return any(map(lambda allocation: allocation.is_active() , self.allocations.all())) is_active.boolean = True is_active.short_description = 'Actif' @property def active_allocations(self): return self.allocations.filter(get_active_filter()) @property def inactive_allocations(self): return self.allocations.exclude(get_active_filter()) def get_absolute_url(self): return reverse('admin:%s_%s_change' % (self._meta.app_label, self._meta.model_name), args=(self.pk,)) def __str__(self): s = '#%d %s' % (self.pk, self.service_type) if self.label: s += ' ' + self.label return s class Antenna(models.Model): MODE_UNKNOWN = 0 MODE_AP = 1 MODE_STA = 2 MODE_CHOICES = ( (MODE_UNKNOWN, 'Inconnu'), (MODE_AP, 'AP'), (MODE_STA, 'Station'), ) label = models.CharField(max_length=128, blank=True, default='') mode = models.IntegerField(choices=MODE_CHOICES, default=MODE_UNKNOWN) ssid = models.CharField(max_length=64, blank=True, default='', verbose_name='SSID') mac = models.CharField( blank=True, default='', max_length=17, validators=[ RegexValidator(r'^([0-9a-fA-F]{2}([:-]?|$)){6}$'), ], verbose_name='Adresse MAC') contact = models.ForeignKey(Adhesion, null=True, blank=True, on_delete=models.PROTECT) notes = models.TextField(blank=True) position = geo_models.PointField(null=True, blank=True) orientation = models.IntegerField(verbose_name='Orientation (°)', null=True, blank=True) def clean(self): super().clean() if self.orientation: self.orientation = self.orientation % 360 def get_absolute_url(self): return reverse('admin:%s_%s_change' % (self._meta.app_label, self._meta.model_name), args=(self.pk,)) def get_absolute_link(self): name = 'Antenne n°%d' % self.pk if self.label: name += ' : %s' % self.label link = format_html('{}', self.get_absolute_url(), name) if self.allocations.filter(active=True).exists(): link += ' (' link += ', '.join(map( lambda alloc: format_html('{}', alloc.resource, alloc.resource), self.allocations.filter(active=True).all() )) link += ')' return mark_safe(link) @property def absolute_url(self): return self.get_absolute_url() class Meta: verbose_name = 'antenne' def __str__(self): name = 'Antenne %d' % self.pk if self.label: name += ' (%s)' % self.label return name class Route(models.Model): name = models.CharField(max_length=64, unique=True) class Meta: ordering = ['name'] def get_ip(self): allocations = self.allocations.filter(get_active_filter()) return allocations.values_list('resource', flat=True) def get_adh(self): allocations = self.allocations.filter(get_active_filter()) return Adhesion.objects.filter(pk__in=allocations.values_list('service__adhesion', flat=True)) def get_tel(self): adhesions = self.get_adh() user_tel = filter(lambda x: x, adhesions.values_list('user__profile__phone_number', flat=True)) corp_tel = filter(lambda x: x, adhesions.values_list('corporation__phone_number', flat=True)) return set(user_tel) | set(corp_tel) def get_email(self): adhesions = self.get_adh() user_email = filter(lambda x: x, adhesions.values_list('user__email', flat=True)) corp_email = filter(lambda x: x, adhesions.values_list('corporation__email', flat=True)) return set(user_email) | set(corp_email) def __str__(self): return self.name class Tunnel(Route): description = models.CharField(max_length=128, blank=True) created = models.DateTimeField(default=timezone.now, verbose_name='Date de création') ended = models.DateTimeField(null=True, blank=True, verbose_name='Date de désactivation') port = models.IntegerField(null=True, blank=True) local_ip = models.GenericIPAddressField(null=True, blank=True, verbose_name='IP locale') remote_ip = models.GenericIPAddressField(null=True, blank=True, verbose_name='IP distante') networks = models.ManyToManyField(IPPrefix, blank=True, verbose_name='Réseaux') notes = models.TextField(blank=True, default='') def clean(self): super().clean() if self.ended: # Vérification de la cohérence des champs created et ended if self.created > self.ended: raise ValidationError({'ended': "La date de désactivation doit être postérieur " "à la date de création du tunnel."}) elif self.port: # Vérification de l’unicité d’un tunnel actif avec un port donné if Tunnel.objects.exclude(pk=self.pk).filter(port=self.port, ended__isnull=True).exists(): raise ValidationError({'port': "Ce numéro de port est déjà utilisé par un autre tunnel."}) class Allocation(models.Model): start = models.DateTimeField(verbose_name='Début de la période d’allocation', default=timezone.now) end = models.DateTimeField(null=True, blank=True, verbose_name='Fin de la période d’allocation') notes = models.TextField(blank=True, default='') objects = ActiveAllocationManager() def clean(self): super().clean() # Vérification de la cohérence des champs start et end if self.end and self.start > self.end: raise ValidationError("La date de début de l’allocation doit être antérieur " "à la date de fin de l’allocation.") if self.resource_id: if self.resource.reserved and (not self.end or self.end > timezone.now()): raise ValidationError("L’IP sélectionnée est réservée") # Vérification de l’abscence de chevauchement de la période d’allocation allocations = type(self).objects.filter(resource__pk=self.resource.pk) if is_overlapping(self, allocations): raise ValidationError("La période d’allocation de cette ressource chevauche " "avec une période d’allocation précédente.") def is_active(self): now = timezone.now() return self.start < now and (self.end is None or self.end > now) class Meta: abstract = True ordering = ['-start'] def __str__(self): return str(self.resource) class ServiceAllocation(Allocation): resource = models.ForeignKey(IPResource, verbose_name='Ressource', related_name='service_allocations', related_query_name='service_allocation', limit_choices_to={'category': 0}, on_delete=models.CASCADE) service = models.ForeignKey(Service, related_name='allocations', related_query_name='allocation', on_delete=models.CASCADE) route = models.ForeignKey(Route, verbose_name='Route', related_name='allocations', related_query_name='allocation', on_delete=models.PROTECT) class Meta: verbose_name = 'allocation' verbose_name_plural = 'allocations' class AntennaAllocation(Allocation): resource = models.ForeignKey(IPResource, verbose_name='Ressource', related_name='antenna_allocations', related_query_name='antenna_allocation', limit_choices_to={'category': 1}, on_delete=models.CASCADE) antenna = models.ForeignKey(Antenna, related_name='allocations', related_query_name='allocation', on_delete=models.CASCADE) class Meta: verbose_name = 'allocation' verbose_name_plural = 'allocations' class Switch(models.Model): name = models.CharField(max_length=64, verbose_name='Nom', unique=True) first_port = models.IntegerField(validators=[MinValueValidator(0)], verbose_name='Premier port') last_port = models.IntegerField(validators=[MaxValueValidator(64)], verbose_name='Dernier port') notes = models.TextField(blank=True, default='') class Meta: ordering = ('name',) def __str__(self): return self.name class Port(models.Model): switch = models.ForeignKey(Switch, related_name='ports', on_delete=models.CASCADE) service = models.ForeignKey(Service, null=True, blank=True, related_name='ports', on_delete=models.SET_NULL) port = models.IntegerField(verbose_name='N° de port') reserved = models.BooleanField(default=False, verbose_name='réservé') notes = models.CharField(max_length=256, blank=True, default='') up = models.NullBooleanField() def clean(self): if self.reserved and self.service: raise ValidationError('Un port réservé ne peut avoir de service.') class Meta: unique_together = ('switch', 'port',) ordering = ('switch', 'port',) def __str__(self): return '%s #%d' % (self.switch, self.port)