123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- # -*- coding: utf-8 -*-
- from django.db import models
- from django.core.exceptions import ValidationError
- from netfields import InetAddressField, NetManager
- from netaddr import IPAddress
- import ldapdb.models
- from ldapdb.models.fields import CharField, IntegerField, ListField
- from coin.models import CoinLdapSyncModel
- def validate_v4(address):
- if address.version != 4:
- raise ValidationError('{} is not an IPv4 address'.format(address))
- def validate_v6(address):
- if address.version != 6:
- raise ValidationError('{} is not an IPv6 address'.format(address))
- class VPNSubscription(CoinLdapSyncModel):
- administrative_subscription = models.OneToOneField('offers.OfferSubscription')
- # TODO: do some access control to prevent the user from changing this field
- activated = models.BooleanField(default=False)
- login = models.CharField(max_length=50)
- # TODO: define which hash to use
- password = models.CharField(max_length=256)
- ipv4_endpoint = InetAddressField(validators=[validate_v4], blank=True)
- ipv6_endpoint = InetAddressField(validators=[validate_v6], blank=True)
- objects = NetManager()
- def get_subnets(self, version):
- subnets = self.administrative_subscription.ip_subnet.all()
- return [subnet for subnet in subnets if subnet.inet.version == version]
- def sync_to_ldap(self, creation):
- if creation:
- config = LdapVPNConfig()
- else:
- config = LdapVPNConfig.objects.get(pk=self.login)
- config.login = config.sn = self.login
- # TODO: salt + hash the password
- config.password = self.password
- config.active = 'yes' if self.activated else 'no'
- config.ipv4_endpoint = str(self.ipv4_endpoint)
- config.ipv6_endpoint = str(self.ipv6_endpoint)
- config.ranges_v4 = [str(s) for s in self.get_subnets(4)]
- config.ranges_v6 = [str(s) for s in self.get_subnets(6)]
- config.save()
- def delete_from_ldap(self):
- # TODO: simple delete?
- pass
- def clean(self):
- # TODO: this should be factored for other technologies (DSL, etc)
- subnets = self.administrative_subscription.ip_subnet.all()
- # If saving for the first time and IP endpoints are not specified,
- # generate them automatically.
- if self.pk is None:
- subnets_v4 = [s for s in subnets if s.inet.version == 4]
- subnets_v6 = [s for s in subnets if s.inet.version == 6]
- if self.ipv4_endpoint is None:
- if len(subnets_v4) == 0:
- # TODO: should we fail silently instead?
- raise ValidationError('No IPv4 subnet defined, needed to choose an endpoint from')
- self.ipv4_endpoint = subnets_v4[0].inet.ip
- if self.ipv6_endpoint is None:
- if len(subnets_v6) == 0:
- # TODO: should we fail silently instead?
- raise ValidationError('No IPv6 subnet defined, needed to choose an endpoint from')
- # With v6, we choose the second host of the subnet (cafe::1)
- gen = subnets_v6[0].inet.iter_hosts()
- gen.next()
- self.ipv6_endpoint = gen.next()
- # Check that the endpoints are included in one of the routed subnets
- for endpoint in [self.ipv4_endpoint, self.ipv6_endpoint]:
- if endpoint:
- if not any([endpoint in subnet.inet for subnet in subnets]):
- raise ValidationError("Endpoint {} is not in an attributed range".format(endpoint))
- def __unicode__(self):
- return self.login
- class LdapVPNConfig(ldapdb.models.Model):
- # TODO: déplacer ligne suivante dans settings.py
- base_dn = "ou=vpn,o=ILLYSE,l=Villeurbanne,st=RHA,c=FR"
- object_classes = ['person', 'organizationalPerson', 'inetOrgPerson',
- 'top', 'radiusprofile']
- login = CharField(db_column='cn', primary_key=True, max_length=255)
- sn = CharField(db_column='sn', max_length=255)
- password = CharField(db_column='userPassword', max_length=255)
- active = CharField(db_column='dialupAccess', max_length=3)
- ipv4_endpoint = CharField(db_column='radiusFramedIPAddress', max_length=16)
- ipv6_endpoint = CharField(db_column='postalAddress', max_length=40)
- ranges_v4 = ListField(db_column='radiusFramedRoute')
- ranges_v6 = ListField(db_column='registeredAddress')
- def __unicode__(self):
- return self.login
- class Meta:
- managed = False # Indique à South de ne pas gérer le model LdapUser
|