1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- # -*- coding: utf-8 -*-
- from django.db import models
- from django.core.exceptions import ValidationError
- from django.core.validators import MaxValueValidator
- from django.db.models import Q
- from netfields import CidrAddressField, NetManager
- from netaddr import IPNetwork, IPSet
- def validate_subnet(cidr):
- """Checks that a CIDR object is indeed a subnet, i.e. the host bits are
- all set to zero."""
- if not isinstance(cidr, IPNetwork):
- raise ValidationError("Internal error, expected IPNetwork object")
- if cidr.ip != cidr.network:
- raise ValidationError("{} is not a proper subnet, you probably mean {}".format(cidr, cidr.cidr))
- class IPPool(models.Model):
- """Pool of IP addresses (either v4 or v6)."""
- name = models.CharField(max_length=255, blank=False, null=False,
- verbose_name='Name of the IP pool')
- default_subnetsize = models.PositiveSmallIntegerField(blank=False,
- verbose_name='Default subnet size to allocate to subscribers in this pool',
- validators=[MaxValueValidator(64)])
- inet = CidrAddressField(validators=[validate_subnet])
- objects = NetManager()
- def clean(self):
- if self.inet:
- max_subnetsize = 64 if self.inet.version == 6 else 32
- if not self.inet.prefixlen <= self.default_subnetsize <= max_subnetsize:
- raise ValidationError('Invalid default subnet size')
- # Check that related subnet are in the pool (useful when
- # modifying an existing pool that already has subnets
- # allocated in it)
- incorrect = [str(subnet) for subnet in self.ipsubnet_set.all()
- if not subnet.inet in self.inet]
- if incorrect:
- err = 'Some subnets allocated in this pool are outside the pool: {}'.format(incorrect)
- raise ValidationError(err)
- def __unicode__(self):
- return self.name
- class IPSubnet(models.Model):
- inet = CidrAddressField(blank=True, validators=[validate_subnet],
- verbose_name="Leave empty for automatic allocation")
- objects = NetManager()
- ip_pool = models.ForeignKey(IPPool)
- offer_subscription = models.ForeignKey('offers.OfferSubscription',
- related_name='ip_subnet')
- def clean(self):
- if not self.inet:
- # Automatically allocate a free subnet
- pool = IPSet([self.ip_pool.inet])
- used = IPSet((s.inet for s in self.ip_pool.ipsubnet_set.all()))
- free = pool.difference(used)
- # Generator for efficiency (we don't build the whole list)
- available = (p for p in free.iter_cidrs() if p.prefixlen <= self.ip_pool.default_subnetsize)
- # TODO: for IPv4, get rid of the network and broadcast
- # addresses? Not really needed nowadays, and we usually don't
- # have a real subnet in practice (i.e. Ethernet segment), but
- # many /32.
- try:
- first_free = available.next()
- except StopIteration:
- raise ValidationError('Unable to allocate an IP subnet in the specified pool: not enough space left.')
- self.inet = first_free.subnet(self.ip_pool.default_subnetsize, 1).next()
- else:
- # Check that we are included in the IP pool.
- if not self.inet in self.ip_pool.inet:
- raise ValidationError('Subnet must be included in the IP pool.')
- # Check that we don't conflict with existing subnets.
- conflicting = self.ip_pool.ipsubnet_set.filter(Q(inet__net_contained_or_equal=self.inet) |
- Q(inet__net_contains_or_equals=self.inet)).exclude(id=self.id)
- if conflicting:
- raise ValidationError('Subnet must not intersect with existing subnets.\nIntersected subnets: {}.'.format(conflicting))
- def __unicode__(self):
- return str(self.inet)
|