from django.db import models
from django.contrib.gis.db import models as geo_models
from django.db.models import Q
from django.core.validators import MaxValueValidator
from django.utils import timezone
from django.contrib.auth.models import Group
from django.contrib.contenttypes.fields import GenericRelation
from django.core.exceptions import ValidationError
from django.urls import reverse
from django.utils.html import format_html, mark_safe, escape
from django.core.exceptions import PermissionDenied
from django.core.validators import RegexValidator, MinValueValidator, MaxValueValidator
from django.conf import settings
from ipaddress import ip_network
from urllib.parse import quote
from hashlib import sha256
from base64 import urlsafe_b64encode
from djadhere.utils import get_active_filter, is_overlapping
from adhesions.models import Adhesion
from banking.models import RecurringPayment
def ipprefix_validator(value):
try:
ip_network(value)
except ValueError:
raise ValidationError('%s n’est pas un préfixe valide' % value)
class IPPrefix(models.Model):
prefix = models.CharField(max_length=128, verbose_name='Préfixe', validators=[ipprefix_validator], unique=True)
class Meta:
ordering = ['prefix']
verbose_name = 'Réseau'
verbose_name_plural = 'Réseaux'
def __str__(self):
return self.prefix
class IPResourceManager(models.Manager):
def get_queryset(self):
qs = super().get_queryset()
# On rajoute une super annotation « in_use » pour savoir si l’IP est dispo ou non :-)
qs = qs.annotate(
in_use_by_service=models.Exists(
ServiceAllocation.objects.filter(Q(resource=models.OuterRef('pk')) & get_active_filter())
),
in_use_by_antenna=models.Exists(
AntennaAllocation.objects.filter(Q(resource=models.OuterRef('pk')) & get_active_filter())
)
)
qs = qs.annotate(
in_use=models.Case(
models.When(Q(in_use_by_service=True) | Q(in_use_by_antenna=True), then=True),
default=False,
output_field=models.BooleanField()
)
)
# Ouf, pas de duplication car l’IP ne peut être utilisé que par un service / une antenne !
return qs
class ActiveAllocationManager(models.Manager):
def get_queryset(self):
qs = super().get_queryset()
qs = qs.annotate(
active=models.Case(
models.When(get_active_filter(), then=True),
default=False,
output_field=models.BooleanField()
)
)
return qs
class ActiveServiceManager(models.Manager):
def get_queryset(self):
qs = super().get_queryset()
qs = qs.annotate(
active=models.Exists(
ServiceAllocation.objects.filter(Q(service=models.OuterRef('pk')) & get_active_filter())
)
)
return qs
class IPResource(models.Model):
CATEGORY_PUBLIC = 0
CATEGORY_ANTENNA = 1
CATEGORIES = (
(CATEGORY_PUBLIC, 'IP Public'),
(CATEGORY_ANTENNA, 'IP Antenne'),
)
ip = models.GenericIPAddressField(verbose_name='IP', primary_key=True)
prefixes = models.ManyToManyField(IPPrefix, verbose_name='préfixes')
reserved = models.BooleanField(default=False, verbose_name='réservée')
category = models.IntegerField(choices=CATEGORIES, verbose_name='catégorie')
notes = models.TextField(blank=True, default='')
checkmk_label = models.CharField(max_length=128, blank=True, default='')
last_state = models.ForeignKey("IPResourceState", on_delete=models.PROTECT, related_name='+', verbose_name='dernier état')
last_time_up = models.DateTimeField(null=True, blank=True, verbose_name='Dernière réponse au ping')
objects = IPResourceManager()
@property
def allocations(self):
if self.category == self.CATEGORY_PUBLIC:
return self.service_allocations
if self.category == self.CATEGORY_ANTENNA:
return self.antenna_allocations
@property
def checkmk_url(self):
if self.checkmk_label:
return mark_safe(settings.CHECK_MK_URL.format(host=quote(self.checkmk_label)))
else:
return None
def password(self):
data = sha256((settings.MASTER_PASSWORD + self.ip).encode('utf-8')).digest();
return urlsafe_b64encode(data).decode('utf-8')[:8]
password.short_description = 'Mot de passe'
class Meta:
ordering = ['ip']
verbose_name = 'IP'
verbose_name_plural = 'IP'
def __str__(self):
return str(self.ip)
class IPResourceState(models.Model):
STATE_DOWN = 0
STATE_UP = 1
STATE_UNKNOWN = 2
STATE_CHOICES = (
(STATE_DOWN, 'DOWN'),
(STATE_UP, 'UP'),
(STATE_UNKNOWN, 'Inconnu'),
)
ip = models.ForeignKey(IPResource, on_delete=models.CASCADE, related_name='state_set')
date = models.DateTimeField(default=timezone.now)
state = models.IntegerField(choices=STATE_CHOICES)
def __str__(self):
return self.get_state_display()
class ServiceType(models.Model):
name = models.CharField(max_length=64, verbose_name='Nom', unique=True)
contact = models.CharField(max_length=64, verbose_name='Contact en cas de problème', blank=True, default='')
class Meta:
ordering = ['name']
verbose_name = 'type de service'
verbose_name_plural = 'types de service'
def __str__(self):
return self.name
class Service(models.Model):
"""
En cas d’ajout de champs, penser à mettre à jour la méthode save_model de ServiceAdmin.
"""
adhesion = models.ForeignKey(Adhesion, verbose_name='Adhérent·e', related_name='services', on_delete=models.CASCADE)
service_type = models.ForeignKey(ServiceType, related_name='services',
verbose_name='Type de service', on_delete=models.PROTECT)
label = models.CharField(blank=True, default='', max_length=128)
notes = models.TextField(blank=True, default='')
created = models.DateTimeField(auto_now_add=True)
loan_equipment = models.BooleanField(default=False, verbose_name='Matériel en prêt')
contribution = models.OneToOneField(RecurringPayment, on_delete=models.CASCADE)
objects = ActiveServiceManager()
def save(self, *args, **kwargs):
if not hasattr(self, 'contribution'):
self.contribution = RecurringPayment.objects.create()
super().save(*args, **kwargs)
def clean(self):
super().clean()
# Vérification de l’unicité par type de service du label
if self.label != '' and Service.objects.exclude(pk=self.pk).filter(service_type=self.service_type, label=self.label):
raise ValidationError("Un service du même type existe déjà avec ce label.")
def is_active(self):
return any(map(lambda allocation: allocation.is_active() , self.allocations.all()))
is_active.boolean = True
is_active.short_description = 'Actif'
@property
def active_allocations(self):
return self.allocations.filter(get_active_filter())
@property
def inactive_allocations(self):
return self.allocations.exclude(get_active_filter())
def get_absolute_url(self):
return reverse('admin:%s_%s_change' % (self._meta.app_label, self._meta.model_name), args=(self.pk,))
def __str__(self):
s = '#%d %s' % (self.pk, self.service_type)
if self.label:
s += ' ' + self.label
return s
class Antenna(models.Model):
MODE_UNKNOWN = 0
MODE_AP = 1
MODE_STA = 2
MODE_CHOICES = (
(MODE_UNKNOWN, 'Inconnu'),
(MODE_AP, 'AP'),
(MODE_STA, 'Station'),
)
label = models.CharField(max_length=128, blank=True, default='')
mode = models.IntegerField(choices=MODE_CHOICES, default=MODE_UNKNOWN)
ssid = models.CharField(max_length=64, blank=True, default='', verbose_name='SSID')
mac = models.CharField(
blank=True,
default='',
max_length=17,
validators=[
RegexValidator(r'^([0-9a-fA-F]{2}([:-]?|$)){6}$'),
],
verbose_name='Adresse MAC')
contact = models.ForeignKey(Adhesion, null=True, blank=True, on_delete=models.PROTECT)
notes = models.TextField(blank=True)
position = geo_models.PointField(null=True, blank=True)
orientation = models.IntegerField(verbose_name='Orientation (°)', null=True, blank=True)
def clean(self):
super().clean()
if self.orientation:
self.orientation = self.orientation % 360
def get_absolute_url(self):
return reverse('admin:%s_%s_change' % (self._meta.app_label, self._meta.model_name), args=(self.pk,))
def get_absolute_link(self):
name = 'Antenne n°%d' % self.pk
if self.label:
name += ' : %s' % self.label
link = format_html('{}', self.get_absolute_url(), name)
if self.allocations.filter(active=True).exists():
link += ' ('
link += ', '.join(map(
lambda alloc: format_html('{}', alloc.resource, alloc.resource),
self.allocations.filter(active=True).all()
))
link += ')'
return mark_safe(link)
@property
def absolute_url(self):
return self.get_absolute_url()
class Meta:
verbose_name = 'antenne'
def __str__(self):
name = 'Antenne %d' % self.pk
if self.label:
name += ' (%s)' % self.label
return name
class Route(models.Model):
name = models.CharField(max_length=64, unique=True)
class Meta:
ordering = ['name']
def get_ip(self):
allocations = self.allocations.filter(get_active_filter())
return allocations.values_list('resource', flat=True)
def get_adh(self):
allocations = self.allocations.filter(get_active_filter())
return Adhesion.objects.filter(pk__in=allocations.values_list('service__adhesion', flat=True))
def get_tel(self):
adhesions = self.get_adh()
user_tel = filter(lambda x: x, adhesions.values_list('user__profile__phone_number', flat=True))
corp_tel = filter(lambda x: x, adhesions.values_list('corporation__phone_number', flat=True))
return set(user_tel) | set(corp_tel)
def get_email(self):
adhesions = self.get_adh()
user_email = filter(lambda x: x, adhesions.values_list('user__email', flat=True))
corp_email = filter(lambda x: x, adhesions.values_list('corporation__email', flat=True))
return set(user_email) | set(corp_email)
def __str__(self):
return self.name
class Tunnel(Route):
description = models.CharField(max_length=128, blank=True)
created = models.DateTimeField(default=timezone.now, verbose_name='Date de création')
ended = models.DateTimeField(null=True, blank=True, verbose_name='Date de désactivation')
port = models.IntegerField(null=True, blank=True)
local_ip = models.GenericIPAddressField(null=True, blank=True, verbose_name='IP locale')
remote_ip = models.GenericIPAddressField(null=True, blank=True, verbose_name='IP distante')
networks = models.ManyToManyField(IPPrefix, blank=True, verbose_name='Réseaux')
notes = models.TextField(blank=True, default='')
def clean(self):
super().clean()
if self.ended:
# Vérification de la cohérence des champs created et ended
if self.created > self.ended:
raise ValidationError({'ended': "La date de désactivation doit être postérieur "
"à la date de création du tunnel."})
elif self.port:
# Vérification de l’unicité d’un tunnel actif avec un port donné
if Tunnel.objects.exclude(pk=self.pk).filter(port=self.port, ended__isnull=True).exists():
raise ValidationError({'port': "Ce numéro de port est déjà utilisé par un autre tunnel."})
class Allocation(models.Model):
start = models.DateTimeField(verbose_name='Début de la période d’allocation', default=timezone.now)
end = models.DateTimeField(null=True, blank=True, verbose_name='Fin de la période d’allocation')
notes = models.TextField(blank=True, default='')
objects = ActiveAllocationManager()
def clean(self):
super().clean()
# Vérification de la cohérence des champs start et end
if self.end and self.start > self.end:
raise ValidationError("La date de début de l’allocation doit être antérieur "
"à la date de fin de l’allocation.")
if self.resource_id:
if self.resource.reserved and (not self.end or self.end > timezone.now()):
raise ValidationError("L’IP sélectionnée est réservée")
# Vérification de l’abscence de chevauchement de la période d’allocation
allocations = type(self).objects.filter(resource__pk=self.resource.pk)
if is_overlapping(self, allocations):
raise ValidationError("La période d’allocation de cette ressource chevauche "
"avec une période d’allocation précédente.")
def is_active(self):
now = timezone.now()
return self.start < now and (self.end is None or self.end > now)
class Meta:
abstract = True
ordering = ['-start']
def __str__(self):
return str(self.resource)
class ServiceAllocation(Allocation):
resource = models.ForeignKey(IPResource, verbose_name='Ressource', related_name='service_allocations',
related_query_name='service_allocation', limit_choices_to={'category': 0}, on_delete=models.CASCADE)
service = models.ForeignKey(Service, related_name='allocations', related_query_name='allocation', on_delete=models.CASCADE)
route = models.ForeignKey(Route, verbose_name='Route', related_name='allocations', related_query_name='allocation', on_delete=models.PROTECT)
class Meta:
verbose_name = 'allocation'
verbose_name_plural = 'allocations'
class AntennaAllocation(Allocation):
resource = models.ForeignKey(IPResource, verbose_name='Ressource', related_name='antenna_allocations',
related_query_name='antenna_allocation', limit_choices_to={'category': 1}, on_delete=models.CASCADE)
antenna = models.ForeignKey(Antenna, related_name='allocations', related_query_name='allocation', on_delete=models.CASCADE)
class Meta:
verbose_name = 'allocation'
verbose_name_plural = 'allocations'
class Switch(models.Model):
name = models.CharField(max_length=64, verbose_name='Nom', unique=True)
first_port = models.IntegerField(validators=[MinValueValidator(0)], verbose_name='Premier port')
last_port = models.IntegerField(validators=[MaxValueValidator(64)], verbose_name='Dernier port')
notes = models.TextField(blank=True, default='')
class Meta:
ordering = ('name',)
def __str__(self):
return self.name
class Port(models.Model):
switch = models.ForeignKey(Switch, related_name='ports', on_delete=models.CASCADE)
service = models.ForeignKey(Service, null=True, blank=True, related_name='ports', on_delete=models.SET_NULL)
port = models.IntegerField(verbose_name='N° de port')
reserved = models.BooleanField(default=False, verbose_name='réservé')
notes = models.CharField(max_length=256, blank=True, default='')
up = models.NullBooleanField()
def clean(self):
if self.reserved and self.service:
raise ValidationError('Un port réservé ne peut avoir de service.')
class Meta:
unique_together = ('switch', 'port',)
ordering = ('switch', 'port',)
def __str__(self):
return '%s #%d' % (self.switch, self.port)