123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594 |
- from netaddr import IPNetwork, cidr_merge
- from django.conf import settings
- from django.contrib.contenttypes.fields import GenericRelation
- from django.core.exceptions import ValidationError
- from django.core.urlresolvers import reverse
- from django.core.validators import MaxValueValidator, MinValueValidator
- from django.db import models
- from django.db.models.expressions import RawSQL
- from django.utils.encoding import python_2_unicode_compatible
- from dcim.models import Interface
- from extras.models import CustomFieldModel, CustomFieldValue
- from tenancy.models import Tenant
- from utilities.models import CreatedUpdatedModel
- from utilities.sql import NullsFirstQuerySet
- from utilities.utils import csv_format
- from .fields import IPNetworkField, IPAddressField
- AF_CHOICES = (
- (4, 'IPv4'),
- (6, 'IPv6'),
- )
- PREFIX_STATUS_CONTAINER = 0
- PREFIX_STATUS_ACTIVE = 1
- PREFIX_STATUS_RESERVED = 2
- PREFIX_STATUS_DEPRECATED = 3
- PREFIX_STATUS_CHOICES = (
- (PREFIX_STATUS_CONTAINER, 'Container'),
- (PREFIX_STATUS_ACTIVE, 'Active'),
- (PREFIX_STATUS_RESERVED, 'Reserved'),
- (PREFIX_STATUS_DEPRECATED, 'Deprecated')
- )
- IPADDRESS_STATUS_ACTIVE = 1
- IPADDRESS_STATUS_RESERVED = 2
- IPADDRESS_STATUS_DEPRECATED = 3
- IPADDRESS_STATUS_DHCP = 5
- IPADDRESS_STATUS_CHOICES = (
- (IPADDRESS_STATUS_ACTIVE, 'Active'),
- (IPADDRESS_STATUS_RESERVED, 'Reserved'),
- (IPADDRESS_STATUS_DEPRECATED, 'Deprecated'),
- (IPADDRESS_STATUS_DHCP, 'DHCP')
- )
- VLAN_STATUS_ACTIVE = 1
- VLAN_STATUS_RESERVED = 2
- VLAN_STATUS_DEPRECATED = 3
- VLAN_STATUS_CHOICES = (
- (VLAN_STATUS_ACTIVE, 'Active'),
- (VLAN_STATUS_RESERVED, 'Reserved'),
- (VLAN_STATUS_DEPRECATED, 'Deprecated')
- )
- STATUS_CHOICE_CLASSES = {
- 0: 'default',
- 1: 'primary',
- 2: 'info',
- 3: 'danger',
- 4: 'warning',
- 5: 'success',
- }
- IP_PROTOCOL_TCP = 6
- IP_PROTOCOL_UDP = 17
- IP_PROTOCOL_CHOICES = (
- (IP_PROTOCOL_TCP, 'TCP'),
- (IP_PROTOCOL_UDP, 'UDP'),
- )
- @python_2_unicode_compatible
- class VRF(CreatedUpdatedModel, CustomFieldModel):
- """
- A virtual routing and forwarding (VRF) table represents a discrete layer three forwarding domain (e.g. a routing
- table). Prefixes and IPAddresses can optionally be assigned to VRFs. (Prefixes and IPAddresses not assigned to a VRF
- are said to exist in the "global" table.)
- """
- name = models.CharField(max_length=50)
- rd = models.CharField(max_length=21, unique=True, verbose_name='Route distinguisher')
- tenant = models.ForeignKey(Tenant, related_name='vrfs', blank=True, null=True, on_delete=models.PROTECT)
- enforce_unique = models.BooleanField(default=True, verbose_name='Enforce unique space',
- help_text="Prevent duplicate prefixes/IP addresses within this VRF")
- description = models.CharField(max_length=100, blank=True)
- custom_field_values = GenericRelation(CustomFieldValue, content_type_field='obj_type', object_id_field='obj_id')
- class Meta:
- ordering = ['name']
- verbose_name = 'VRF'
- verbose_name_plural = 'VRFs'
- def __str__(self):
- return self.name
- def get_absolute_url(self):
- return reverse('ipam:vrf', args=[self.pk])
- def to_csv(self):
- return csv_format([
- self.name,
- self.rd,
- self.tenant.name if self.tenant else None,
- self.enforce_unique,
- self.description,
- ])
- @python_2_unicode_compatible
- class RIR(models.Model):
- """
- A Regional Internet Registry (RIR) is responsible for the allocation of a large portion of the global IP address
- space. This can be an organization like ARIN or RIPE, or a governing standard such as RFC 1918.
- """
- name = models.CharField(max_length=50, unique=True)
- slug = models.SlugField(unique=True)
- is_private = models.BooleanField(default=False, verbose_name='Private',
- help_text='IP space managed by this RIR is considered private')
- class Meta:
- ordering = ['name']
- verbose_name = 'RIR'
- verbose_name_plural = 'RIRs'
- def __str__(self):
- return self.name
- def get_absolute_url(self):
- return "{}?rir={}".format(reverse('ipam:aggregate_list'), self.slug)
- @python_2_unicode_compatible
- class Aggregate(CreatedUpdatedModel, CustomFieldModel):
- """
- An aggregate exists at the root level of the IP address space hierarchy in NetBox. Aggregates are used to organize
- the hierarchy and track the overall utilization of available address space. Each Aggregate is assigned to a RIR.
- """
- family = models.PositiveSmallIntegerField(choices=AF_CHOICES)
- prefix = IPNetworkField()
- rir = models.ForeignKey('RIR', related_name='aggregates', on_delete=models.PROTECT, verbose_name='RIR')
- date_added = models.DateField(blank=True, null=True)
- description = models.CharField(max_length=100, blank=True)
- custom_field_values = GenericRelation(CustomFieldValue, content_type_field='obj_type', object_id_field='obj_id')
- class Meta:
- ordering = ['family', 'prefix']
- def __str__(self):
- return str(self.prefix)
- def get_absolute_url(self):
- return reverse('ipam:aggregate', args=[self.pk])
- def clean(self):
- if self.prefix:
- # Clear host bits from prefix
- self.prefix = self.prefix.cidr
- # Ensure that the aggregate being added is not covered by an existing aggregate
- covering_aggregates = Aggregate.objects.filter(prefix__net_contains_or_equals=str(self.prefix))
- if self.pk:
- covering_aggregates = covering_aggregates.exclude(pk=self.pk)
- if covering_aggregates:
- raise ValidationError({
- 'prefix': "Aggregates cannot overlap. {} is already covered by an existing aggregate ({}).".format(
- self.prefix, covering_aggregates[0]
- )
- })
- # Ensure that the aggregate being added does not cover an existing aggregate
- covered_aggregates = Aggregate.objects.filter(prefix__net_contained=str(self.prefix))
- if self.pk:
- covered_aggregates = covered_aggregates.exclude(pk=self.pk)
- if covered_aggregates:
- raise ValidationError({
- 'prefix': "Aggregates cannot overlap. {} covers an existing aggregate ({}).".format(
- self.prefix, covered_aggregates[0]
- )
- })
- def save(self, *args, **kwargs):
- if self.prefix:
- # Infer address family from IPNetwork object
- self.family = self.prefix.version
- super(Aggregate, self).save(*args, **kwargs)
- def to_csv(self):
- return csv_format([
- self.prefix,
- self.rir.name,
- self.date_added.isoformat() if self.date_added else None,
- self.description,
- ])
- def get_utilization(self):
- """
- Determine the utilization rate of the aggregate prefix and return it as a percentage.
- """
- child_prefixes = Prefix.objects.filter(prefix__net_contained_or_equal=str(self.prefix))
- # Remove overlapping prefixes from list of children
- networks = cidr_merge([c.prefix for c in child_prefixes])
- children_size = float(0)
- for p in networks:
- children_size += p.size
- return int(children_size / self.prefix.size * 100)
- @python_2_unicode_compatible
- class Role(models.Model):
- """
- A Role represents the functional role of a Prefix or VLAN; for example, "Customer," "Infrastructure," or
- "Management."
- """
- name = models.CharField(max_length=50, unique=True)
- slug = models.SlugField(unique=True)
- weight = models.PositiveSmallIntegerField(default=1000)
- class Meta:
- ordering = ['weight', 'name']
- def __str__(self):
- return self.name
- @property
- def count_prefixes(self):
- return self.prefixes.count()
- @property
- def count_vlans(self):
- return self.vlans.count()
- class PrefixQuerySet(NullsFirstQuerySet):
- def annotate_depth(self, limit=None):
- """
- Iterate through a QuerySet of Prefixes and annotate the hierarchical level of each. While it would be preferable
- to do this using .extra() on the QuerySet to count the unique parents of each prefix, that approach introduces
- performance issues at scale.
- Because we're adding a non-field attribute to the model, annotation must be made *after* any QuerySet
- modifications.
- """
- queryset = self
- stack = []
- for p in queryset:
- try:
- prev_p = stack[-1]
- except IndexError:
- prev_p = None
- if prev_p is not None:
- while (p.prefix not in prev_p.prefix) or p.prefix == prev_p.prefix:
- stack.pop()
- try:
- prev_p = stack[-1]
- except IndexError:
- prev_p = None
- break
- if prev_p is not None:
- prev_p.has_children = True
- stack.append(p)
- p.depth = len(stack) - 1
- if limit is None:
- return queryset
- return list(filter(lambda p: p.depth <= limit, queryset))
- @python_2_unicode_compatible
- class Prefix(CreatedUpdatedModel, CustomFieldModel):
- """
- A Prefix represents an IPv4 or IPv6 network, including mask length. Prefixes can optionally be assigned to Sites and
- VRFs. A Prefix must be assigned a status and may optionally be assigned a used-define Role. A Prefix can also be
- assigned to a VLAN where appropriate.
- """
- family = models.PositiveSmallIntegerField(choices=AF_CHOICES, editable=False)
- prefix = IPNetworkField(help_text="IPv4 or IPv6 network with mask")
- site = models.ForeignKey('dcim.Site', related_name='prefixes', on_delete=models.PROTECT, blank=True, null=True)
- vrf = models.ForeignKey('VRF', related_name='prefixes', on_delete=models.PROTECT, blank=True, null=True,
- verbose_name='VRF')
- tenant = models.ForeignKey(Tenant, related_name='prefixes', blank=True, null=True, on_delete=models.PROTECT)
- vlan = models.ForeignKey('VLAN', related_name='prefixes', on_delete=models.PROTECT, blank=True, null=True,
- verbose_name='VLAN')
- status = models.PositiveSmallIntegerField('Status', choices=PREFIX_STATUS_CHOICES, default=PREFIX_STATUS_ACTIVE,
- help_text="Operational status of this prefix")
- role = models.ForeignKey('Role', related_name='prefixes', on_delete=models.SET_NULL, blank=True, null=True,
- help_text="The primary function of this prefix")
- is_pool = models.BooleanField(verbose_name='Is a pool', default=False,
- help_text="All IP addresses within this prefix are considered usable")
- description = models.CharField(max_length=100, blank=True)
- custom_field_values = GenericRelation(CustomFieldValue, content_type_field='obj_type', object_id_field='obj_id')
- objects = PrefixQuerySet.as_manager()
- class Meta:
- ordering = ['vrf', 'family', 'prefix']
- verbose_name_plural = 'prefixes'
- def __str__(self):
- return str(self.prefix)
- def get_absolute_url(self):
- return reverse('ipam:prefix', args=[self.pk])
- def get_duplicates(self):
- return Prefix.objects.filter(vrf=self.vrf, prefix=str(self.prefix)).exclude(pk=self.pk)
- def clean(self):
- if self.prefix:
- # Disallow host masks
- if self.prefix.version == 4 and self.prefix.prefixlen == 32:
- raise ValidationError({
- 'prefix': "Cannot create host addresses (/32) as prefixes. Create an IPv4 address instead."
- })
- elif self.prefix.version == 6 and self.prefix.prefixlen == 128:
- raise ValidationError({
- 'prefix': "Cannot create host addresses (/128) as prefixes. Create an IPv6 address instead."
- })
- # Enforce unique IP space (if applicable)
- if (self.vrf is None and settings.ENFORCE_GLOBAL_UNIQUE) or (self.vrf and self.vrf.enforce_unique):
- duplicate_prefixes = self.get_duplicates()
- if duplicate_prefixes:
- raise ValidationError({
- 'prefix': "Duplicate prefix found in {}: {}".format(
- "VRF {}".format(self.vrf) if self.vrf else "global table",
- duplicate_prefixes.first(),
- )
- })
- def save(self, *args, **kwargs):
- if self.prefix:
- # Clear host bits from prefix
- self.prefix = self.prefix.cidr
- # Infer address family from IPNetwork object
- self.family = self.prefix.version
- super(Prefix, self).save(*args, **kwargs)
- def to_csv(self):
- return csv_format([
- self.prefix,
- self.vrf.rd if self.vrf else None,
- self.tenant.name if self.tenant else None,
- self.site.name if self.site else None,
- self.vlan.group.name if self.vlan and self.vlan.group else None,
- self.vlan.vid if self.vlan else None,
- self.get_status_display(),
- self.role.name if self.role else None,
- self.is_pool,
- self.description,
- ])
- @property
- def new_subnet(self):
- if self.family == 4:
- if self.prefix.prefixlen <= 30:
- return IPNetwork('{}/{}'.format(self.prefix.network, self.prefix.prefixlen + 1))
- return None
- if self.family == 6:
- if self.prefix.prefixlen <= 126:
- return IPNetwork('{}/{}'.format(self.prefix.network, self.prefix.prefixlen + 1))
- return None
- def get_status_class(self):
- return STATUS_CHOICE_CLASSES[self.status]
- class IPAddressManager(models.Manager):
- def get_queryset(self):
- """
- By default, PostgreSQL will order INETs with shorter (larger) prefix lengths ahead of those with longer
- (smaller) masks. This makes no sense when ordering IPs, which should be ordered solely by family and host
- address. We can use HOST() to extract just the host portion of the address (ignoring its mask), but we must
- then re-cast this value to INET() so that records will be ordered properly. We are essentially re-casting each
- IP address as a /32 or /128.
- """
- qs = super(IPAddressManager, self).get_queryset()
- return qs.annotate(host=RawSQL('INET(HOST(ipam_ipaddress.address))', [])).order_by('family', 'host')
- @python_2_unicode_compatible
- class IPAddress(CreatedUpdatedModel, CustomFieldModel):
- """
- An IPAddress represents an individual IPv4 or IPv6 address and its mask. The mask length should match what is
- configured in the real world. (Typically, only loopback interfaces are configured with /32 or /128 masks.) Like
- Prefixes, IPAddresses can optionally be assigned to a VRF. An IPAddress can optionally be assigned to an Interface.
- Interfaces can have zero or more IPAddresses assigned to them.
- An IPAddress can also optionally point to a NAT inside IP, designating itself as a NAT outside IP. This is useful,
- for example, when mapping public addresses to private addresses. When an Interface has been assigned an IPAddress
- which has a NAT outside IP, that Interface's Device can use either the inside or outside IP as its primary IP.
- """
- family = models.PositiveSmallIntegerField(choices=AF_CHOICES, editable=False)
- address = IPAddressField(help_text="IPv4 or IPv6 address (with mask)")
- vrf = models.ForeignKey('VRF', related_name='ip_addresses', on_delete=models.PROTECT, blank=True, null=True,
- verbose_name='VRF')
- tenant = models.ForeignKey(Tenant, related_name='ip_addresses', blank=True, null=True, on_delete=models.PROTECT)
- status = models.PositiveSmallIntegerField('Status', choices=IPADDRESS_STATUS_CHOICES, default=1)
- interface = models.ForeignKey(Interface, related_name='ip_addresses', on_delete=models.CASCADE, blank=True,
- null=True)
- nat_inside = models.OneToOneField('self', related_name='nat_outside', on_delete=models.SET_NULL, blank=True,
- null=True, verbose_name='NAT (Inside)',
- help_text="The IP for which this address is the \"outside\" IP")
- description = models.CharField(max_length=100, blank=True)
- custom_field_values = GenericRelation(CustomFieldValue, content_type_field='obj_type', object_id_field='obj_id')
- objects = IPAddressManager()
- class Meta:
- ordering = ['family', 'address']
- verbose_name = 'IP address'
- verbose_name_plural = 'IP addresses'
- def __str__(self):
- return str(self.address)
- def get_absolute_url(self):
- return reverse('ipam:ipaddress', args=[self.pk])
- def get_duplicates(self):
- return IPAddress.objects.filter(vrf=self.vrf, address__net_host=str(self.address.ip)).exclude(pk=self.pk)
- def clean(self):
- if self.address:
- # Enforce unique IP space (if applicable)
- if (self.vrf is None and settings.ENFORCE_GLOBAL_UNIQUE) or (self.vrf and self.vrf.enforce_unique):
- duplicate_ips = self.get_duplicates()
- if duplicate_ips:
- raise ValidationError({
- 'address': "Duplicate IP address found in {}: {}".format(
- "VRF {}".format(self.vrf) if self.vrf else "global table",
- duplicate_ips.first(),
- )
- })
- def save(self, *args, **kwargs):
- if self.address:
- # Infer address family from IPAddress object
- self.family = self.address.version
- super(IPAddress, self).save(*args, **kwargs)
- def to_csv(self):
- # Determine if this IP is primary for a Device
- is_primary = False
- if self.family == 4 and getattr(self, 'primary_ip4_for', False):
- is_primary = True
- elif self.family == 6 and getattr(self, 'primary_ip6_for', False):
- is_primary = True
- return csv_format([
- self.address,
- self.vrf.rd if self.vrf else None,
- self.tenant.name if self.tenant else None,
- self.get_status_display(),
- self.device.identifier if self.device else None,
- self.interface.name if self.interface else None,
- is_primary,
- self.description,
- ])
- @property
- def device(self):
- if self.interface:
- return self.interface.device
- return None
- def get_status_class(self):
- return STATUS_CHOICE_CLASSES[self.status]
- @python_2_unicode_compatible
- class VLANGroup(models.Model):
- """
- A VLAN group is an arbitrary collection of VLANs within which VLAN IDs and names must be unique.
- """
- name = models.CharField(max_length=50)
- slug = models.SlugField()
- site = models.ForeignKey('dcim.Site', related_name='vlan_groups', on_delete=models.PROTECT, blank=True, null=True)
- class Meta:
- ordering = ['site', 'name']
- unique_together = [
- ['site', 'name'],
- ['site', 'slug'],
- ]
- verbose_name = 'VLAN group'
- verbose_name_plural = 'VLAN groups'
- def __str__(self):
- if self.site is None:
- return self.name
- return u'{} - {}'.format(self.site.name, self.name)
- def get_absolute_url(self):
- return "{}?group_id={}".format(reverse('ipam:vlan_list'), self.pk)
- @python_2_unicode_compatible
- class VLAN(CreatedUpdatedModel, CustomFieldModel):
- """
- A VLAN is a distinct layer two forwarding domain identified by a 12-bit integer (1-4094). Each VLAN must be assigned
- to a Site, however VLAN IDs need not be unique within a Site. A VLAN may optionally be assigned to a VLANGroup,
- within which all VLAN IDs and names but be unique.
- Like Prefixes, each VLAN is assigned an operational status and optionally a user-defined Role. A VLAN can have zero
- or more Prefixes assigned to it.
- """
- site = models.ForeignKey('dcim.Site', related_name='vlans', on_delete=models.PROTECT, blank=True, null=True)
- group = models.ForeignKey('VLANGroup', related_name='vlans', blank=True, null=True, on_delete=models.PROTECT)
- vid = models.PositiveSmallIntegerField(verbose_name='ID', validators=[
- MinValueValidator(1),
- MaxValueValidator(4094)
- ])
- name = models.CharField(max_length=64)
- tenant = models.ForeignKey(Tenant, related_name='vlans', blank=True, null=True, on_delete=models.PROTECT)
- status = models.PositiveSmallIntegerField('Status', choices=VLAN_STATUS_CHOICES, default=1)
- role = models.ForeignKey('Role', related_name='vlans', on_delete=models.SET_NULL, blank=True, null=True)
- description = models.CharField(max_length=100, blank=True)
- custom_field_values = GenericRelation(CustomFieldValue, content_type_field='obj_type', object_id_field='obj_id')
- class Meta:
- ordering = ['site', 'group', 'vid']
- unique_together = [
- ['group', 'vid'],
- ['group', 'name'],
- ]
- verbose_name = 'VLAN'
- verbose_name_plural = 'VLANs'
- def __str__(self):
- return self.display_name
- def get_absolute_url(self):
- return reverse('ipam:vlan', args=[self.pk])
- def clean(self):
- # Validate VLAN group
- if self.group and self.group.site != self.site:
- raise ValidationError({
- 'group': "VLAN group must belong to the assigned site ({}).".format(self.site)
- })
- def to_csv(self):
- return csv_format([
- self.site.name if self.site else None,
- self.group.name if self.group else None,
- self.vid,
- self.name,
- self.tenant.name if self.tenant else None,
- self.get_status_display(),
- self.role.name if self.role else None,
- self.description,
- ])
- @property
- def display_name(self):
- return u'{} ({})'.format(self.vid, self.name)
- def get_status_class(self):
- return STATUS_CHOICE_CLASSES[self.status]
- @python_2_unicode_compatible
- class Service(CreatedUpdatedModel):
- """
- A Service represents a layer-four service (e.g. HTTP or SSH) running on a Device. A Service may optionally be tied
- to one or more specific IPAddresses belonging to the Device.
- """
- device = models.ForeignKey('dcim.Device', related_name='services', on_delete=models.CASCADE, verbose_name='device')
- name = models.CharField(max_length=30)
- protocol = models.PositiveSmallIntegerField(choices=IP_PROTOCOL_CHOICES)
- port = models.PositiveIntegerField(validators=[MinValueValidator(1), MaxValueValidator(65535)],
- verbose_name='Port number')
- ipaddresses = models.ManyToManyField('ipam.IPAddress', related_name='services', blank=True,
- verbose_name='IP addresses')
- description = models.CharField(max_length=100, blank=True)
- class Meta:
- ordering = ['device', 'protocol', 'port']
- unique_together = ['device', 'protocol', 'port']
- def __str__(self):
- return u'{} ({}/{})'.format(self.name, self.port, self.get_protocol_display())
|