models.py 16 KB


  1. from django.db import models
  2. from django.contrib.gis.db import models as geo_models
  3. from django.db.models import Q
  4. from django.core.validators import MaxValueValidator
  5. from django.utils import timezone
  6. from django.contrib.auth.models import Group
  7. from django.contrib.contenttypes.fields import GenericRelation
  8. from django.core.exceptions import ValidationError
  9. from django.urls import reverse
  10. from django.utils.html import format_html, mark_safe, escape
  11. from django.core.exceptions import PermissionDenied
  12. from django.core.validators import RegexValidator, MinValueValidator, MaxValueValidator
  13. from django.conf import settings
  14. from ipaddress import ip_network
  15. from urllib.parse import quote
  16. from hashlib import sha256
  17. from base64 import urlsafe_b64encode
  18. from djadhere.utils import get_active_filter, is_overlapping
  19. from adhesions.models import Adhesion
  20. from banking.models import RecurringPayment
  21. def ipprefix_validator(value):
  22. try:
  23. ip_network(value)
  24. except ValueError:
  25. raise ValidationError('%s n’est pas un préfixe valide' % value)
  26. class IPPrefix(models.Model):
  27. prefix = models.CharField(max_length=128, verbose_name='Préfixe', validators=[ipprefix_validator], unique=True)
  28. class Meta:
  29. ordering = ['prefix']
  30. verbose_name = 'Réseau'
  31. verbose_name_plural = 'Réseaux'
  32. def __str__(self):
  33. return self.prefix
  34. class IPResourceManager(models.Manager):
  35. def get_queryset(self):
  36. qs = super().get_queryset()
  37. # On rajoute une super annotation « in_use » pour savoir si l’IP est dispo ou non :-)
  38. qs = qs.annotate(
  39. in_use_by_service=models.Exists(
  40. ServiceAllocation.objects.filter(Q(resource=models.OuterRef('pk')) & get_active_filter())
  41. ),
  42. in_use_by_antenna=models.Exists(
  43. AntennaAllocation.objects.filter(Q(resource=models.OuterRef('pk')) & get_active_filter())
  44. )
  45. )
  46. qs = qs.annotate(
  47. in_use=models.Case(
  48. models.When(Q(in_use_by_service=True) | Q(in_use_by_antenna=True), then=True),
  49. default=False,
  50. output_field=models.BooleanField()
  51. )
  52. )
  53. # Ouf, pas de duplication car l’IP ne peut être utilisé que par un service / une antenne !
  54. return qs
  55. class ActiveAllocationManager(models.Manager):
  56. def get_queryset(self):
  57. qs = super().get_queryset()
  58. qs = qs.annotate(
  59. active=models.Case(
  60. models.When(get_active_filter(), then=True),
  61. default=False,
  62. output_field=models.BooleanField()
  63. )
  64. )
  65. return qs
  66. class ActiveServiceManager(models.Manager):
  67. def get_queryset(self):
  68. qs = super().get_queryset()
  69. qs = qs.annotate(
  70. active=models.Exists(
  71. ServiceAllocation.objects.filter(Q(service=models.OuterRef('pk')) & get_active_filter())
  72. )
  73. )
  74. return qs
  75. class IPResource(models.Model):
  76. CATEGORY_PUBLIC = 0
  77. CATEGORY_ANTENNA = 1
  78. CATEGORIES = (
  79. (CATEGORY_PUBLIC, 'IP Public'),
  80. (CATEGORY_ANTENNA, 'IP Antenne'),
  81. )
  82. ip = models.GenericIPAddressField(verbose_name='IP', primary_key=True)
  83. prefixes = models.ManyToManyField(IPPrefix, verbose_name='préfixes')
  84. reserved = models.BooleanField(default=False, verbose_name='réservée')
  85. category = models.IntegerField(choices=CATEGORIES, verbose_name='catégorie')
  86. notes = models.TextField(blank=True, default='')
  87. checkmk_label = models.CharField(max_length=128, blank=True, default='')
  88. last_state = models.ForeignKey("IPResourceState", on_delete=models.PROTECT, related_name='+', verbose_name='dernier état')
  89. last_time_up = models.DateTimeField(null=True, blank=True, verbose_name='Dernière réponse au ping')
  90. objects = IPResourceManager()
  91. @property
  92. def allocations(self):
  93. if self.category == self.CATEGORY_PUBLIC:
  94. return self.service_allocations
  95. if self.category == self.CATEGORY_ANTENNA:
  96. return self.antenna_allocations
  97. @property
  98. def checkmk_url(self):
  99. if self.checkmk_label:
  100. return mark_safe(settings.CHECK_MK_URL.format(host=quote(self.checkmk_label)))
  101. else:
  102. return None
  103. def password(self):
  104. data = sha256((settings.MASTER_PASSWORD + self.ip).encode('utf-8')).digest();
  105. return urlsafe_b64encode(data).decode('utf-8')[:8]
  106. password.short_description = 'Mot de passe'
  107. class Meta:
  108. ordering = ['ip']
  109. verbose_name = 'IP'
  110. verbose_name_plural = 'IP'
  111. def __str__(self):
  112. return str(self.ip)
  113. class IPResourceState(models.Model):
  114. STATE_DOWN = 0
  115. STATE_UP = 1
  116. STATE_UNKNOWN = 2
  117. STATE_CHOICES = (
  118. (STATE_DOWN, 'DOWN'),
  119. (STATE_UP, 'UP'),
  120. (STATE_UNKNOWN, 'Inconnu'),
  121. )
  122. ip = models.ForeignKey(IPResource, on_delete=models.CASCADE, related_name='state_set')
  123. date = models.DateTimeField(default=timezone.now)
  124. state = models.IntegerField(choices=STATE_CHOICES)
  125. def __str__(self):
  126. return self.get_state_display()
  127. class ServiceType(models.Model):
  128. name = models.CharField(max_length=64, verbose_name='Nom', unique=True)
  129. contact = models.CharField(max_length=64, verbose_name='Contact en cas de problème', blank=True, default='')
  130. class Meta:
  131. ordering = ['name']
  132. verbose_name = 'type de service'
  133. verbose_name_plural = 'types de service'
  134. def __str__(self):
  135. return self.name
  136. class Service(models.Model):
  137. """
  138. En cas d’ajout de champs, penser à mettre à jour la méthode save_model de ServiceAdmin.
  139. """
  140. adhesion = models.ForeignKey(Adhesion, verbose_name='Adhérent·e', related_name='services', on_delete=models.CASCADE)
  141. service_type = models.ForeignKey(ServiceType, related_name='services',
  142. verbose_name='Type de service', on_delete=models.PROTECT)
  143. label = models.CharField(blank=True, default='', max_length=128)
  144. notes = models.TextField(blank=True, default='')
  145. created = models.DateTimeField(auto_now_add=True)
  146. loan_equipment = models.BooleanField(default=False, verbose_name='Matériel en prêt')
  147. contribution = models.OneToOneField(RecurringPayment, on_delete=models.CASCADE)
  148. objects = ActiveServiceManager()
  149. def save(self, *args, **kwargs):
  150. if not hasattr(self, 'contribution'):
  151. self.contribution = RecurringPayment.objects.create()
  152. super().save(*args, **kwargs)
  153. def clean(self):
  154. super().clean()
  155. # Vérification de l’unicité par type de service du label
  156. if self.label != '' and Service.objects.exclude(pk=self.pk).filter(service_type=self.service_type, label=self.label):
  157. raise ValidationError("Un service du même type existe déjà avec ce label.")
  158. def is_active(self):
  159. return any(map(lambda allocation: allocation.is_active() , self.allocations.all()))
  160. is_active.boolean = True
  161. is_active.short_description = 'Actif'
  162. @property
  163. def active_allocations(self):
  164. return self.allocations.filter(get_active_filter())
  165. @property
  166. def inactive_allocations(self):
  167. return self.allocations.exclude(get_active_filter())
  168. def get_absolute_url(self):
  169. return reverse('admin:%s_%s_change' % (self._meta.app_label, self._meta.model_name), args=(self.pk,))
  170. def __str__(self):
  171. s = '#%d %s' % (self.pk, self.service_type)
  172. if self.label:
  173. s += ' ' + self.label
  174. return s
  175. class Antenna(models.Model):
  176. MODE_UNKNOWN = 0
  177. MODE_AP = 1
  178. MODE_STA = 2
  179. MODE_CHOICES = (
  180. (MODE_UNKNOWN, 'Inconnu'),
  181. (MODE_AP, 'AP'),
  182. (MODE_STA, 'Station'),
  183. )
  184. label = models.CharField(max_length=128, blank=True, default='')
  185. mode = models.IntegerField(choices=MODE_CHOICES, default=MODE_UNKNOWN)
  186. ssid = models.CharField(max_length=64, blank=True, default='', verbose_name='SSID')
  187. mac = models.CharField(
  188. blank=True,
  189. default='',
  190. max_length=17,
  191. validators=[
  192. RegexValidator(r'^([0-9a-fA-F]{2}([:-]?|$)){6}$'),
  193. ],
  194. verbose_name='Adresse MAC')
  195. contact = models.ForeignKey(Adhesion, null=True, blank=True, on_delete=models.PROTECT)
  196. notes = models.TextField(blank=True)
  197. position = geo_models.PointField(null=True, blank=True)
  198. orientation = models.IntegerField(verbose_name='Orientation (°)', null=True, blank=True)
  199. def clean(self):
  200. super().clean()
  201. if self.orientation:
  202. self.orientation = self.orientation % 360
  203. def get_absolute_url(self):
  204. return reverse('admin:%s_%s_change' % (self._meta.app_label, self._meta.model_name), args=(self.pk,))
  205. def get_absolute_link(self):
  206. name = 'Antenne n°%d' % self.pk
  207. if self.label:
  208. name += ' : %s' % self.label
  209. link = format_html('<a href="{}">{}</a>', self.get_absolute_url(), name)
  210. if self.allocations.filter(active=True).exists():
  211. link += ' ('
  212. link += ', '.join(map(
  213. lambda alloc: format_html('<a href="http://{}">{}</a>', alloc.resource, alloc.resource),
  214. self.allocations.filter(active=True).all()
  215. ))
  216. link += ')'
  217. return mark_safe(link)
  218. @property
  219. def absolute_url(self):
  220. return self.get_absolute_url()
  221. class Meta:
  222. verbose_name = 'antenne'
  223. def __str__(self):
  224. name = 'Antenne %d' % self.pk
  225. if self.label:
  226. name += ' (%s)' % self.label
  227. return name
  228. class Route(models.Model):
  229. name = models.CharField(max_length=64, unique=True)
  230. class Meta:
  231. ordering = ['name']
  232. def get_ip(self):
  233. allocations = self.allocations.filter(get_active_filter())
  234. return allocations.values_list('resource', flat=True)
  235. def get_adh(self):
  236. allocations = self.allocations.filter(get_active_filter())
  237. return Adhesion.objects.filter(pk__in=allocations.values_list('service__adhesion', flat=True))
  238. def get_tel(self):
  239. adhesions = self.get_adh()
  240. user_tel = filter(lambda x: x, adhesions.values_list('user__profile__phone_number', flat=True))
  241. corp_tel = filter(lambda x: x, adhesions.values_list('corporation__phone_number', flat=True))
  242. return set(user_tel) | set(corp_tel)
  243. def get_email(self):
  244. adhesions = self.get_adh()
  245. user_email = filter(lambda x: x, adhesions.values_list('user__email', flat=True))
  246. corp_email = filter(lambda x: x, adhesions.values_list('corporation__email', flat=True))
  247. return set(user_email) | set(corp_email)
  248. def __str__(self):
  249. return self.name
  250. class Tunnel(Route):
  251. description = models.CharField(max_length=128, blank=True)
  252. created = models.DateTimeField(default=timezone.now, verbose_name='Date de création')
  253. ended = models.DateTimeField(null=True, blank=True, verbose_name='Date de désactivation')
  254. port = models.IntegerField(null=True, blank=True)
  255. local_ip = models.GenericIPAddressField(null=True, blank=True, verbose_name='IP locale')
  256. remote_ip = models.GenericIPAddressField(null=True, blank=True, verbose_name='IP distante')
  257. networks = models.ManyToManyField(IPPrefix, blank=True, verbose_name='Réseaux')
  258. notes = models.TextField(blank=True, default='')
  259. def clean(self):
  260. super().clean()
  261. if self.ended:
  262. # Vérification de la cohérence des champs created et ended
  263. if self.created > self.ended:
  264. raise ValidationError({'ended': "La date de désactivation doit être postérieur "
  265. "à la date de création du tunnel."})
  266. elif self.port:
  267. # Vérification de l’unicité d’un tunnel actif avec un port donné
  268. if Tunnel.objects.exclude(pk=self.pk).filter(port=self.port, ended__isnull=True).exists():
  269. raise ValidationError({'port': "Ce numéro de port est déjà utilisé par un autre tunnel."})
  270. class Allocation(models.Model):
  271. start = models.DateTimeField(verbose_name='Début de la période d’allocation', default=timezone.now)
  272. end = models.DateTimeField(null=True, blank=True, verbose_name='Fin de la période d’allocation')
  273. notes = models.TextField(blank=True, default='')
  274. objects = ActiveAllocationManager()
  275. def clean(self):
  276. super().clean()
  277. # Vérification de la cohérence des champs start et end
  278. if self.end and self.start > self.end:
  279. raise ValidationError("La date de début de l’allocation doit être antérieur "
  280. "à la date de fin de l’allocation.")
  281. if self.resource_id:
  282. if self.resource.reserved and (not self.end or self.end > timezone.now()):
  283. raise ValidationError("L’IP sélectionnée est réservée")
  284. # Vérification de l’abscence de chevauchement de la période d’allocation
  285. allocations = type(self).objects.filter(resource__pk=self.resource.pk)
  286. if is_overlapping(self, allocations):
  287. raise ValidationError("La période d’allocation de cette ressource chevauche "
  288. "avec une période d’allocation précédente.")
  289. def is_active(self):
  290. now = timezone.now()
  291. return self.start < now and (self.end is None or self.end > now)
  292. class Meta:
  293. abstract = True
  294. ordering = ['-start']
  295. def __str__(self):
  296. return str(self.resource)
  297. class ServiceAllocation(Allocation):
  298. resource = models.ForeignKey(IPResource, verbose_name='Ressource', related_name='service_allocations',
  299. related_query_name='service_allocation', limit_choices_to={'category': 0}, on_delete=models.CASCADE)
  300. service = models.ForeignKey(Service, related_name='allocations', related_query_name='allocation', on_delete=models.CASCADE)
  301. route = models.ForeignKey(Route, verbose_name='Route', related_name='allocations', related_query_name='allocation', on_delete=models.PROTECT)
  302. class Meta:
  303. verbose_name = 'allocation'
  304. verbose_name_plural = 'allocations'
  305. class AntennaAllocation(Allocation):
  306. resource = models.ForeignKey(IPResource, verbose_name='Ressource', related_name='antenna_allocations',
  307. related_query_name='antenna_allocation', limit_choices_to={'category': 1}, on_delete=models.CASCADE)
  308. antenna = models.ForeignKey(Antenna, related_name='allocations', related_query_name='allocation', on_delete=models.CASCADE)
  309. class Meta:
  310. verbose_name = 'allocation'
  311. verbose_name_plural = 'allocations'
  312. class Switch(models.Model):
  313. name = models.CharField(max_length=64, verbose_name='Nom', unique=True)
  314. first_port = models.IntegerField(validators=[MinValueValidator(0)], verbose_name='Premier port')
  315. last_port = models.IntegerField(validators=[MaxValueValidator(64)], verbose_name='Dernier port')
  316. notes = models.TextField(blank=True, default='')
  317. class Meta:
  318. ordering = ('name',)
  319. def __str__(self):
  320. return self.name
  321. class Port(models.Model):
  322. switch = models.ForeignKey(Switch, related_name='ports', on_delete=models.CASCADE)
  323. service = models.ForeignKey(Service, null=True, blank=True, related_name='ports', on_delete=models.SET_NULL)
  324. port = models.IntegerField(verbose_name='N° de port')
  325. reserved = models.BooleanField(default=False, verbose_name='réservé')
  326. notes = models.CharField(max_length=256, blank=True, default='')
  327. up = models.NullBooleanField()
  328. def clean(self):
  329. if self.reserved and self.service:
  330. raise ValidationError('Un port réservé ne peut avoir de service.')
  331. class Meta:
  332. unique_together = ('switch', 'port',)
  333. ordering = ('switch', 'port',)
  334. def __str__(self):
  335. return '%s #%d' % (self.switch, self.port)