from django.core.management.base import BaseCommand, CommandParser, CommandError from django.db.models import DecimalField, F, Sum, Func, Q from django.core.validators import validate_ipv4_address, validate_ipv6_address from django.core.exceptions import ValidationError import argparse from decimal import Decimal from services.models import Service, ServiceType, IPResource class IPAction(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): args = values.split('/') if len(args) == 1: ip, = args mask = 0 elif len(args) == 2: ip, mask = args else: raise argparse.ArgumentError(self, '"%s" n’est pas une IP valide' % values) try: validate_ipv4_address(ip) protocol = 'ipv4' except ValidationError: try: validate_ipv6_address(ip) protocol = 'ipv6' except ValidationError: raise argparse.ArgumentError(self, '"%s" n’est pas une IP valide' % values) try: mask = int(mask) except ValueError: raise argparse.ArgumentError(self, 'masque invalide') if (mask < 0) or \ (protocol == 'ipv4' and mask > 32) or \ (protocol == 'ipv6' and mask > 128): raise argparse.ArgumentError(self, 'masque invalide') ip, created = IPResource.objects.get_or_create(ip=ip, mask=mask) ip_set = getattr(namespace, 'ip') ip_set.append(ip) class Command(BaseCommand): help = 'Gestion des services' def add_arguments(self, parser): cmd = self class SubParser(CommandParser): def __init__(self, **kwargs): super().__init__(cmd, **kwargs) subparsers = parser.add_subparsers(dest='command', parser_class=SubParser) subparsers.required = True parser_stats = subparsers.add_parser('stats', help='Afficher les statistiques sur les services') parser_list = subparsers.add_parser('list', help='Lister les services') parser_list.add_argument('--type', choices=ServiceType.objects.all().values_list('name', flat=True), help='Afficher uniquement les services d’un certain type') group = parser_list.add_mutually_exclusive_group() group.add_argument('--ongoing', action='store_true', help='Afficher uniquement les services actifs') group.add_argument('--forthcoming', action='store_true', help='Afficher uniquement les services à venir (non configuré ou à date de début dans le futur)') group.add_argument('--finished', action='store_true', help='Afficher uniquement les services terminés') parser_show = subparsers.add_parser('show', help='Afficher les informations concernant un service') parser_show.add_argument('id', type=int) parser_add = subparsers.add_parser('add', help='Ajouter un nouveau service') parser_add.add_argument('--type', required=True, choices=ServiceType.objects.all().values_list('name', flat=True), help='Afficher uniquement les services d’un certain type') parser_add.add_argument('--ip', action=IPAction, default=[], help='Assigner une IP au service') parser_delete = subparsers.add_parser('delete', help='Supprimer un service existant') parser_delete.add_argument('id', type=int) def handle(self, *args, **options): cmd = options.pop('command') getattr(self, 'handle_{cmd}'.format(cmd=cmd))(*args, **options) def handle_stats(self, *args, **options): # TODO: move this code in utils.py or some file like that as_float_template = '%(function)s(%(expressions)s AS FLOAT)' total_income = Decimal(0.0) lines = [] for service_type in ServiceType.objects.all(): services = service_type.services.filter(Service.get_ongoing_filter()) ntotal = services.count() services = services.exclude(adherent=None) nadh = services.count() ninf = ntotal - nadh services = services.exclude( Q(contribution__isnull=True) | Q(contribution__amount=0) | Q(contribution__period=0) ) npay = services.count() ngra = nadh - npay amount = Func(F('contribution__amount'), function='CAST', template=as_float_template) period = F('contribution__period') output_field = DecimalField(max_digits=9, decimal_places=2) income = services.aggregate(income=Sum(amount / period, output_field=output_field))['income'] or Decimal(0.0) total_income += income lines += [(str(service_type), npay, ninf, ngra, income,)] self.stdout.write("%-16s%12s%12s%12s%12s%12s" % ('Service', 'npay', 'ninf', 'ngra', 'eur', 'pourcent')) for service_type, npay, ninf, ngra, income in lines: if total_income: percent = income / total_income * 100 else: percent = 0 self.stdout.write("%-16s%12d%12d%12d%12.2f%12.1f" % (service_type, npay, ninf, ngra, income, percent)) def handle_list(self, *args, **options): services = Service.objects.all() if options['type']: st = ServiceType.objects.get(name=options['type']) services = services.filter(service_type=st) if options['ongoing']: services = services.filter(Service.get_ongoing_filter()) if options['forthcoming']: services = services.filter(Service.get_forthcoming_filter()) if options['finished']: services = services.filter(Service.get_finished_filter()) fmt = '{:<8}{:<20}{:<20}{:<10}{:<40}' print(fmt.format('ID', 'Type', 'Adhérent', 'En cours', 'IPs')) for service in services: adh = str(service.adherent) if service.adherent else '-' ips = ', '.join(map(str, service.ip_resources.all())) or '-' ongoing = '✔' if service.is_ongoing else '✘' print(fmt.format(service.pk, str(service.service_type), adh, ongoing, ips)) def handle_show(self, *args, **options): try: service = Service.objects.get(pk=options['id']) except Service.DoesNotExist: raise CommandError('Le service n°%d n’existe pas' % options['id']) self.stdout.write('Service n°%d' % service.pk) self.stdout.write('\tType de service : %s' % service.service_type) self.stdout.write('\tAdhérent : %s' % (service.adherent or 'aucun')) self.stdout.write('\tDébut du service : %s' % (service.start or '-')) self.stdout.write('\tFin du service : %s' % (service.end or '-')) self.stdout.write('\tService en cours : %s' % ('oui' if service.is_ongoing else 'non')) def handle_add(self, *args, **options): for ip in options['ip']: services = Service.objects.filter(ip_resources=ip).exclude(Service.get_finished_filter()) if services.exists(): self.stdout.write(self.style.WARNING('Note : la ressource IP "%s" est également affecté aux ' 'services suivants :' % ip)) for service in services: self.stdout.write(self.style.WARNING('\t%d (%s)' % (service.pk, service.service_type))) value = input('Continuer ? [y/N] ') if value.lower() != 'y': raise CommandError('Opération annulée') st = ServiceType.objects.get(name=options['type']) service = Service.objects.create(service_type=st) service.ip_resources = options['ip'] service.save() self.stdout.write(self.style.SUCCESS('Service n°%d créé avec succès' % service.pk)) def handle_delete(self, *args, **options): try: service = Service.objects.get(pk=options['id']) except Service.DoesNotExist: raise CommandError('Le service n°%d n’existe pas' % options['id']) value = input('Êtes vous sûr de vouloir supprimer ce service ? [y/N] ') if value.lower() == 'y': service.delete() self.stdout.write(self.style.SUCCESS('Service supprimé')) else: self.stdout.write('Service non supprimé')