123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537 |
- from netaddr import IPNetwork
- from django import forms
- from django.db.models import Count
- from dcim.models import Site, Device, Interface
- from utilities.forms import BootstrapMixin, APISelect, Livesearch, CSVDataField, BulkImportForm, SlugField
- from .models import (
- Aggregate, IPAddress, Prefix, PREFIX_STATUS_CHOICES, RIR, Role, VLAN, VLANGroup, VLAN_STATUS_CHOICES, VRF,
- )
- FORM_PREFIX_STATUS_CHOICES = (('', '---------'),) + PREFIX_STATUS_CHOICES
- FORM_VLAN_STATUS_CHOICES = (('', '---------'),) + VLAN_STATUS_CHOICES
- #
- # VRFs
- #
- class VRFForm(forms.ModelForm, BootstrapMixin):
- class Meta:
- model = VRF
- fields = ['name', 'rd', 'enforce_unique', 'description']
- labels = {
- 'rd': "RD",
- }
- help_texts = {
- 'rd': "Route distinguisher in any format",
- }
- class VRFFromCSVForm(forms.ModelForm):
- class Meta:
- model = VRF
- fields = ['name', 'rd', 'enforce_unique', 'description']
- class VRFImportForm(BulkImportForm, BootstrapMixin):
- csv = CSVDataField(csv_form=VRFFromCSVForm)
- class VRFBulkEditForm(forms.Form, BootstrapMixin):
- pk = forms.ModelMultipleChoiceField(queryset=VRF.objects.all(), widget=forms.MultipleHiddenInput)
- description = forms.CharField(max_length=100, required=False)
- #
- # RIRs
- #
- class RIRForm(forms.ModelForm, BootstrapMixin):
- slug = SlugField()
- class Meta:
- model = RIR
- fields = ['name', 'slug']
- #
- # Aggregates
- #
- class AggregateForm(forms.ModelForm, BootstrapMixin):
- class Meta:
- model = Aggregate
- fields = ['prefix', 'rir', 'date_added', 'description']
- help_texts = {
- 'prefix': "IPv4 or IPv6 network",
- 'rir': "Regional Internet Registry responsible for this prefix",
- 'date_added': "Format: YYYY-MM-DD",
- }
- class AggregateFromCSVForm(forms.ModelForm):
- rir = forms.ModelChoiceField(queryset=RIR.objects.all(), to_field_name='name',
- error_messages={'invalid_choice': 'RIR not found.'})
- class Meta:
- model = Aggregate
- fields = ['prefix', 'rir', 'date_added', 'description']
- class AggregateImportForm(BulkImportForm, BootstrapMixin):
- csv = CSVDataField(csv_form=AggregateFromCSVForm)
- class AggregateBulkEditForm(forms.Form, BootstrapMixin):
- pk = forms.ModelMultipleChoiceField(queryset=Aggregate.objects.all(), widget=forms.MultipleHiddenInput)
- rir = forms.ModelChoiceField(queryset=RIR.objects.all(), required=False, label='RIR')
- date_added = forms.DateField(required=False)
- description = forms.CharField(max_length=50, required=False)
- def aggregate_rir_choices():
- rir_choices = RIR.objects.annotate(aggregate_count=Count('aggregates'))
- return [(r.slug, u'{} ({})'.format(r.name, r.aggregate_count)) for r in rir_choices]
- class AggregateFilterForm(forms.Form, BootstrapMixin):
- rir = forms.MultipleChoiceField(required=False, choices=aggregate_rir_choices, label='RIR',
- widget=forms.SelectMultiple(attrs={'size': 8}))
- #
- # Roles
- #
- class RoleForm(forms.ModelForm, BootstrapMixin):
- slug = SlugField()
- class Meta:
- model = Role
- fields = ['name', 'slug']
- #
- # Prefixes
- #
- class PrefixForm(forms.ModelForm, BootstrapMixin):
- site = forms.ModelChoiceField(queryset=Site.objects.all(), required=False, label='Site',
- widget=forms.Select(attrs={'filter-for': 'vlan'}))
- vlan = forms.ModelChoiceField(queryset=VLAN.objects.all(), required=False, label='VLAN',
- widget=APISelect(api_url='/api/ipam/vlans/?site_id={{site}}',
- display_field='display_name'))
- class Meta:
- model = Prefix
- fields = ['prefix', 'vrf', 'site', 'vlan', 'status', 'role', 'description']
- help_texts = {
- 'prefix': "IPv4 or IPv6 network",
- 'vrf': "VRF (if applicable)",
- 'site': "The site to which this prefix is assigned (if applicable)",
- 'vlan': "The VLAN to which this prefix is assigned (if applicable)",
- 'status': "Operational status of this prefix",
- 'role': "The primary function of this prefix",
- }
- def __init__(self, *args, **kwargs):
- super(PrefixForm, self).__init__(*args, **kwargs)
- self.fields['vrf'].empty_label = 'Global'
- # Initialize field without choices to avoid pulling all VLANs from the database
- if self.is_bound and self.data.get('site'):
- self.fields['vlan'].queryset = VLAN.objects.filter(site__pk=self.data['site'])
- elif self.initial.get('site'):
- self.fields['vlan'].queryset = VLAN.objects.filter(site=self.initial['site'])
- else:
- self.fields['vlan'].choices = []
- def clean_prefix(self):
- data = self.cleaned_data['prefix']
- try:
- prefix = IPNetwork(data)
- except:
- raise
- if prefix.version == 4 and prefix.prefixlen == 32:
- raise forms.ValidationError("Cannot create host addresses (/32) as prefixes. These should be IPv4 "
- "addresses instead.")
- elif prefix.version == 6 and prefix.prefixlen == 128:
- raise forms.ValidationError("Cannot create host addresses (/128) as prefixes. These should be IPv6 "
- "addresses instead.")
- return data
- class PrefixFromCSVForm(forms.ModelForm):
- vrf = forms.ModelChoiceField(queryset=VRF.objects.all(), required=False, to_field_name='rd',
- error_messages={'invalid_choice': 'VRF not found.'})
- site = forms.ModelChoiceField(queryset=Site.objects.all(), required=False, to_field_name='name',
- error_messages={'invalid_choice': 'Site not found.'})
- vlan_group_name = forms.CharField(required=False)
- vlan_vid = forms.IntegerField(required=False)
- status_name = forms.ChoiceField(choices=[(s[1], s[0]) for s in PREFIX_STATUS_CHOICES])
- role = forms.ModelChoiceField(queryset=Role.objects.all(), required=False, to_field_name='name',
- error_messages={'invalid_choice': 'Invalid role.'})
- class Meta:
- model = Prefix
- fields = ['prefix', 'vrf', 'site', 'vlan_group_name', 'vlan_vid', 'status_name', 'role', 'description']
- def clean(self):
- super(PrefixFromCSVForm, self).clean()
- site = self.cleaned_data.get('site')
- vlan_group_name = self.cleaned_data.get('vlan_group_name')
- vlan_vid = self.cleaned_data.get('vlan_vid')
- # Validate VLAN
- vlan_group = None
- if vlan_group_name:
- try:
- vlan_group = VLANGroup.objects.get(site=site, name=vlan_group_name)
- except VLANGroup.DoesNotExist:
- self.add_error('vlan_group_name', "Invalid VLAN group ({} - {}).".format(site, vlan_group_name))
- if vlan_vid and vlan_group:
- try:
- self.instance.vlan = VLAN.objects.get(group=vlan_group, vid=vlan_vid)
- except VLAN.DoesNotExist:
- self.add_error('vlan_vid', "Invalid VLAN ID ({} - {}).".format(vlan_group, vlan_vid))
- elif vlan_vid and site:
- try:
- self.instance.vlan = VLAN.objects.get(site=site, vid=vlan_vid)
- except VLAN.MultipleObjectsReturned:
- self.add_error('vlan_vid', "Multiple VLANs found ({} - VID {})".format(site, vlan_vid))
- elif vlan_vid:
- self.add_error('vlan_vid', "Must specify site and/or VLAN group when assigning a VLAN.")
- def save(self, *args, **kwargs):
- m = super(PrefixFromCSVForm, self).save(commit=False)
- # Assign Prefix status by name
- m.status = dict(self.fields['status_name'].choices)[self.cleaned_data['status_name']]
- if kwargs.get('commit'):
- m.save()
- return m
- class PrefixImportForm(BulkImportForm, BootstrapMixin):
- csv = CSVDataField(csv_form=PrefixFromCSVForm)
- class PrefixBulkEditForm(forms.Form, BootstrapMixin):
- pk = forms.ModelMultipleChoiceField(queryset=Prefix.objects.all(), widget=forms.MultipleHiddenInput)
- site = forms.ModelChoiceField(queryset=Site.objects.all(), required=False)
- vrf = forms.ModelChoiceField(queryset=VRF.objects.all(), required=False, label='VRF',
- help_text="Select the VRF to assign, or check below to remove VRF assignment")
- vrf_global = forms.BooleanField(required=False, label='Set VRF to global')
- status = forms.ChoiceField(choices=FORM_PREFIX_STATUS_CHOICES, required=False)
- role = forms.ModelChoiceField(queryset=Role.objects.all(), required=False)
- description = forms.CharField(max_length=50, required=False)
- def prefix_vrf_choices():
- vrf_choices = [('', 'All'), (0, 'Global')]
- vrf_choices += [(v.pk, v.name) for v in VRF.objects.all()]
- return vrf_choices
- def prefix_site_choices():
- site_choices = Site.objects.annotate(prefix_count=Count('prefixes'))
- return [(s.slug, u'{} ({})'.format(s.name, s.prefix_count)) for s in site_choices]
- def prefix_status_choices():
- status_counts = {}
- for status in Prefix.objects.values('status').annotate(count=Count('status')).order_by('status'):
- status_counts[status['status']] = status['count']
- return [(s[0], u'{} ({})'.format(s[1], status_counts.get(s[0], 0))) for s in PREFIX_STATUS_CHOICES]
- def prefix_role_choices():
- role_choices = Role.objects.annotate(prefix_count=Count('prefixes'))
- return [(r.slug, u'{} ({})'.format(r.name, r.prefix_count)) for r in role_choices]
- class PrefixFilterForm(forms.Form, BootstrapMixin):
- parent = forms.CharField(required=False, label='Search Within')
- vrf = forms.ChoiceField(required=False, choices=prefix_vrf_choices, label='VRF')
- status = forms.MultipleChoiceField(required=False, choices=prefix_status_choices)
- site = forms.MultipleChoiceField(required=False, choices=prefix_site_choices,
- widget=forms.SelectMultiple(attrs={'size': 8}))
- role = forms.MultipleChoiceField(required=False, choices=prefix_role_choices,
- widget=forms.SelectMultiple(attrs={'size': 8}))
- expand = forms.BooleanField(required=False, label='Expand prefix hierarchy')
- #
- # IP addresses
- #
- class IPAddressForm(forms.ModelForm, BootstrapMixin):
- nat_site = forms.ModelChoiceField(queryset=Site.objects.all(), required=False, label='Site',
- widget=forms.Select(attrs={'filter-for': 'nat_device'}))
- nat_device = forms.ModelChoiceField(queryset=Device.objects.all(), required=False, label='Device',
- widget=APISelect(api_url='/api/dcim/devices/?site_id={{nat_site}}',
- attrs={'filter-for': 'nat_inside'}))
- livesearch = forms.CharField(required=False, label='IP Address', widget=Livesearch(
- query_key='q', query_url='ipam-api:ipaddress_list', field_to_update='nat_inside', obj_label='address')
- )
- nat_inside = forms.ModelChoiceField(queryset=IPAddress.objects.all(), required=False, label='NAT (Inside)',
- widget=APISelect(api_url='/api/ipam/ip-addresses/?device_id={{nat_device}}',
- display_field='address'))
- class Meta:
- model = IPAddress
- fields = ['address', 'vrf', 'nat_device', 'nat_inside', 'description']
- help_texts = {
- 'address': "IPv4 or IPv6 address and mask",
- 'vrf': "VRF (if applicable)",
- }
- def __init__(self, *args, **kwargs):
- super(IPAddressForm, self).__init__(*args, **kwargs)
- self.fields['vrf'].empty_label = 'Global'
- if self.instance.nat_inside:
- nat_inside = self.instance.nat_inside
- # If the IP is assigned to an interface, populate site/device fields accordingly
- if self.instance.nat_inside.interface:
- self.initial['nat_site'] = self.instance.nat_inside.interface.device.rack.site.pk
- self.initial['nat_device'] = self.instance.nat_inside.interface.device.pk
- self.fields['nat_device'].queryset = Device.objects.filter(
- rack__site=nat_inside.interface.device.rack.site)
- self.fields['nat_inside'].queryset = IPAddress.objects.filter(
- interface__device=nat_inside.interface.device)
- else:
- self.fields['nat_inside'].queryset = IPAddress.objects.filter(pk=nat_inside.pk)
- else:
- # Initialize nat_device choices if nat_site is set
- if self.is_bound and self.data.get('nat_site'):
- self.fields['nat_device'].queryset = Device.objects.filter(rack__site__pk=self.data['nat_site'])
- elif self.initial.get('nat_site'):
- self.fields['nat_device'].queryset = Device.objects.filter(rack__site=self.initial['nat_site'])
- else:
- self.fields['nat_device'].choices = []
- # Initialize nat_inside choices if nat_device is set
- if self.is_bound and self.data.get('nat_device'):
- self.fields['nat_inside'].queryset = IPAddress.objects.filter(
- interface__device__pk=self.data['nat_device'])
- elif self.initial.get('nat_device'):
- self.fields['nat_inside'].queryset = IPAddress.objects.filter(
- interface__device__pk=self.initial['nat_device'])
- else:
- self.fields['nat_inside'].choices = []
- class IPAddressFromCSVForm(forms.ModelForm):
- vrf = forms.ModelChoiceField(queryset=VRF.objects.all(), required=False, to_field_name='rd',
- error_messages={'invalid_choice': 'VRF not found.'})
- device = forms.ModelChoiceField(queryset=Device.objects.all(), required=False, to_field_name='name',
- error_messages={'invalid_choice': 'Device not found.'})
- interface_name = forms.CharField(required=False)
- is_primary = forms.BooleanField(required=False)
- class Meta:
- model = IPAddress
- fields = ['address', 'vrf', 'device', 'interface_name', 'is_primary', 'description']
- def clean(self):
- device = self.cleaned_data.get('device')
- interface_name = self.cleaned_data.get('interface_name')
- is_primary = self.cleaned_data.get('is_primary')
- # Validate interface
- if device and interface_name:
- try:
- Interface.objects.get(device=device, name=interface_name)
- except Interface.DoesNotExist:
- self.add_error('interface_name', "Invalid interface ({}) for {}".format(interface_name, device))
- elif device and not interface_name:
- self.add_error('interface_name', "Device set ({}) but interface missing".format(device))
- elif interface_name and not device:
- self.add_error('device', "Interface set ({}) but device missing or invalid".format(interface_name))
- # Validate is_primary
- if is_primary and not device:
- self.add_error('is_primary', "No device specified; cannot set as primary IP")
- def save(self, commit=True):
- # Set interface
- if self.cleaned_data['device'] and self.cleaned_data['interface_name']:
- self.instance.interface = Interface.objects.get(device=self.cleaned_data['device'],
- name=self.cleaned_data['interface_name'])
- # Set as primary for device
- if self.cleaned_data['is_primary']:
- if self.instance.address.version == 4:
- self.instance.primary_ip4_for = self.cleaned_data['device']
- elif self.instance.address.version == 6:
- self.instance.primary_ip6_for = self.cleaned_data['device']
- return super(IPAddressFromCSVForm, self).save(commit=commit)
- class IPAddressImportForm(BulkImportForm, BootstrapMixin):
- csv = CSVDataField(csv_form=IPAddressFromCSVForm)
- class IPAddressBulkEditForm(forms.Form, BootstrapMixin):
- pk = forms.ModelMultipleChoiceField(queryset=IPAddress.objects.all(), widget=forms.MultipleHiddenInput)
- vrf = forms.ModelChoiceField(queryset=VRF.objects.all(), required=False, label='VRF',
- help_text="Select the VRF to assign, or check below to remove VRF assignment")
- vrf_global = forms.BooleanField(required=False, label='Set VRF to global')
- description = forms.CharField(max_length=50, required=False)
- def ipaddress_family_choices():
- return [('', 'All'), (4, 'IPv4'), (6, 'IPv6')]
- def ipaddress_vrf_choices():
- vrf_choices = [('', 'All'), (0, 'Global')]
- vrf_choices += [(v.pk, v.name) for v in VRF.objects.all()]
- return vrf_choices
- class IPAddressFilterForm(forms.Form, BootstrapMixin):
- family = forms.ChoiceField(required=False, choices=ipaddress_family_choices, label='Address Family')
- vrf = forms.ChoiceField(required=False, choices=ipaddress_vrf_choices, label='VRF')
- #
- # VLAN groups
- #
- class VLANGroupForm(forms.ModelForm, BootstrapMixin):
- slug = SlugField()
- class Meta:
- model = VLANGroup
- fields = ['site', 'name', 'slug']
- def vlangroup_site_choices():
- site_choices = Site.objects.annotate(vlangroup_count=Count('vlan_groups'))
- return [(s.slug, u'{} ({})'.format(s.name, s.vlangroup_count)) for s in site_choices]
- class VLANGroupFilterForm(forms.Form, BootstrapMixin):
- site = forms.MultipleChoiceField(required=False, choices=vlangroup_site_choices,
- widget=forms.SelectMultiple(attrs={'size': 8}))
- #
- # VLANs
- #
- class VLANForm(forms.ModelForm, BootstrapMixin):
- group = forms.ModelChoiceField(queryset=VLANGroup.objects.all(), required=False, label='Group', widget=APISelect(
- api_url='/api/ipam/vlan-groups/?site_id={{site}}',
- ))
- class Meta:
- model = VLAN
- fields = ['site', 'group', 'vid', 'name', 'description', 'status', 'role']
- help_texts = {
- 'site': "The site at which this VLAN exists",
- 'group': "VLAN group (optional)",
- 'vid': "Configured VLAN ID",
- 'name': "Configured VLAN name",
- 'status': "Operational status of this VLAN",
- 'role': "The primary function of this VLAN",
- }
- widgets = {
- 'site': forms.Select(attrs={'filter-for': 'group'}),
- }
- def __init__(self, *args, **kwargs):
- super(VLANForm, self).__init__(*args, **kwargs)
- # Limit VLAN group choices
- if self.is_bound and self.data.get('site'):
- self.fields['group'].queryset = VLANGroup.objects.filter(site__pk=self.data['site'])
- elif self.initial.get('site'):
- self.fields['group'].queryset = VLANGroup.objects.filter(site=self.initial['site'])
- else:
- self.fields['group'].choices = []
- class VLANFromCSVForm(forms.ModelForm):
- site = forms.ModelChoiceField(queryset=Site.objects.all(), to_field_name='name',
- error_messages={'invalid_choice': 'Device not found.'})
- group = forms.ModelChoiceField(queryset=VLANGroup.objects.all(), required=False, to_field_name='name',
- error_messages={'invalid_choice': 'VLAN group not found.'})
- status_name = forms.ChoiceField(choices=[(s[1], s[0]) for s in VLAN_STATUS_CHOICES])
- role = forms.ModelChoiceField(queryset=Role.objects.all(), required=False, to_field_name='name',
- error_messages={'invalid_choice': 'Invalid role.'})
- class Meta:
- model = VLAN
- fields = ['site', 'group', 'vid', 'name', 'status_name', 'role', 'description']
- def save(self, *args, **kwargs):
- m = super(VLANFromCSVForm, self).save(commit=False)
- # Assign VLAN status by name
- m.status = dict(self.fields['status_name'].choices)[self.cleaned_data['status_name']]
- if kwargs.get('commit'):
- m.save()
- return m
- class VLANImportForm(BulkImportForm, BootstrapMixin):
- csv = CSVDataField(csv_form=VLANFromCSVForm)
- class VLANBulkEditForm(forms.Form, BootstrapMixin):
- pk = forms.ModelMultipleChoiceField(queryset=VLAN.objects.all(), widget=forms.MultipleHiddenInput)
- site = forms.ModelChoiceField(queryset=Site.objects.all(), required=False)
- group = forms.ModelChoiceField(queryset=VLANGroup.objects.all(), required=False)
- status = forms.ChoiceField(choices=FORM_VLAN_STATUS_CHOICES, required=False)
- role = forms.ModelChoiceField(queryset=Role.objects.all(), required=False)
- description = forms.CharField(max_length=100, required=False)
- def vlan_site_choices():
- site_choices = Site.objects.annotate(vlan_count=Count('vlans'))
- return [(s.slug, u'{} ({})'.format(s.name, s.vlan_count)) for s in site_choices]
- def vlan_group_choices():
- group_choices = VLANGroup.objects.select_related('site').annotate(vlan_count=Count('vlans'))
- return [(g.pk, u'{} ({})'.format(g, g.vlan_count)) for g in group_choices]
- def vlan_status_choices():
- status_counts = {}
- for status in VLAN.objects.values('status').annotate(count=Count('status')).order_by('status'):
- status_counts[status['status']] = status['count']
- return [(s[0], u'{} ({})'.format(s[1], status_counts.get(s[0], 0))) for s in VLAN_STATUS_CHOICES]
- def vlan_role_choices():
- role_choices = Role.objects.annotate(vlan_count=Count('vlans'))
- return [(r.slug, u'{} ({})'.format(r.name, r.vlan_count)) for r in role_choices]
- class VLANFilterForm(forms.Form, BootstrapMixin):
- site = forms.MultipleChoiceField(required=False, choices=vlan_site_choices,
- widget=forms.SelectMultiple(attrs={'size': 8}))
- group_id = forms.MultipleChoiceField(required=False, choices=vlan_group_choices, label='VLAN Group',
- widget=forms.SelectMultiple(attrs={'size': 8}))
- status = forms.MultipleChoiceField(required=False, choices=vlan_status_choices)
- role = forms.MultipleChoiceField(required=False, choices=vlan_role_choices,
- widget=forms.SelectMultiple(attrs={'size': 8}))
|