Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes: #15924 - Prevent API payload from allowing tagged_vlans while interface mode is set to tagged-all #17211

Merged
merged 20 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
5c9a145
Fixes: #15924 - Prevent API payload from allowing tagged_vlans while …
DanSheps Aug 20, 2024
282836c
Prevent cleanup of tagged_vlans when no tagged_vlans set on interface
DanSheps Aug 20, 2024
4ea0047
Fix test errors
DanSheps Aug 20, 2024
d4c8e88
Remove accidental debug statements
DanSheps Aug 22, 2024
bab5a5b
Update validation to model clean method instead of serializer
DanSheps Aug 25, 2024
4f830ce
Remove clearing of tagged vlans from `save()`
DanSheps Aug 25, 2024
d159547
Make changes to validation to account for M2M not being available und…
DanSheps Aug 27, 2024
9a13caa
Optimize untagged vlan check
DanSheps Aug 27, 2024
07444e6
Re-ordering statements in validators
DanSheps Aug 27, 2024
5776db6
Forgot to call super().clean()
DanSheps Aug 27, 2024
140d552
Merge branch 'feature' of https://github.com/netbox-community/netbox …
DanSheps Sep 23, 2024
1789862
Adjust logic for form and serializer. Add tests
DanSheps Sep 23, 2024
8dd6119
Merge branch 'feature' of https://github.com/netbox-community/netbox …
DanSheps Oct 17, 2024
365e2fb
Fix test failure
DanSheps Oct 17, 2024
afb748c
Merge branch 'feature' of https://github.com/netbox-community/netbox …
DanSheps Jan 20, 2025
e858706
Fix ruff errors
DanSheps Jan 20, 2025
79262c6
Fix test by removing now invalid test
DanSheps Jan 21, 2025
8ee5d0a
Update serializer, form and tests
DanSheps Feb 24, 2025
2c08258
Optimize API test for vlan fields
DanSheps Feb 25, 2025
41e6a55
Optimize API serializer logic
DanSheps Feb 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion netbox/dcim/api/serializers_/device_components.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from django.utils.translation import gettext as _
from django.contrib.contenttypes.models import ContentType
from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
Expand Down Expand Up @@ -232,8 +233,56 @@ class Meta:

def validate(self, data):

# Validate many-to-many VLAN assignments
if not self.nested:

# Validate 802.1q mode and vlan(s)
mode = None
tagged_vlans = []

# Gather Information
if self.instance:
mode = data.get('mode') if 'mode' in data.keys() else self.instance.mode
untagged_vlan = data.get('untagged_vlan') if 'untagged_vlan' in data.keys() else \
self.instance.untagged_vlan
qinq_svlan = data.get('qinq_svlan') if 'qinq_svlan' in data.keys() else \
self.instance.qinq_svlan
tagged_vlans = data.get('tagged_vlans') if 'tagged_vlans' in data.keys() else \
self.instance.tagged_vlans.all()
else:
mode = data.get('mode', None)
untagged_vlan = data.get('untagged_vlan') if 'untagged_vlan' in data.keys() else None
qinq_svlan = data.get('qinq_svlan') if 'qinq_svlan' in data.keys() else None
tagged_vlans = data.get('tagged_vlans') if 'tagged_vlans' in data.keys() else None

errors = {}

# Non Q-in-Q mode with service vlan set
if mode != InterfaceModeChoices.MODE_Q_IN_Q and qinq_svlan:
errors.update({
'qinq_svlan': _("Interface mode does not support q-in-q service vlan")
})
# Routed mode
if not mode:
# Untagged vlan
if untagged_vlan:
errors.update({
'untagged_vlan': _("Interface mode does not support untagged vlan")
})
# Tagged vlan
if tagged_vlans:
errors.update({
'tagged_vlans': _("Interface mode does not support tagged vlans")
})
# Non-tagged mode
elif mode in (InterfaceModeChoices.MODE_TAGGED_ALL, InterfaceModeChoices.MODE_ACCESS) and tagged_vlans:
errors.update({
'tagged_vlans': _("Interface mode does not support tagged vlans")
})

if errors:
raise serializers.ValidationError(errors)

# Validate many-to-many VLAN assignments
device = self.instance.device if self.instance else data.get('device')
for vlan in data.get('tagged_vlans', []):
if vlan.site not in [device.site, None]:
Expand Down
18 changes: 6 additions & 12 deletions netbox/dcim/forms/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,14 @@ def clean(self):
super().clean()

parent_field = 'device' if 'device' in self.cleaned_data else 'virtual_machine'
tagged_vlans = self.cleaned_data.get('tagged_vlans')

# Untagged interfaces cannot be assigned tagged VLANs
if self.cleaned_data['mode'] == InterfaceModeChoices.MODE_ACCESS and tagged_vlans:
raise forms.ValidationError({
'mode': _("An access interface cannot have tagged VLANs assigned.")
})

# Remove all tagged VLAN assignments from "tagged all" interfaces
elif self.cleaned_data['mode'] == InterfaceModeChoices.MODE_TAGGED_ALL:
self.cleaned_data['tagged_vlans'] = []
if 'tagged_vlans' in self.fields.keys():
tagged_vlans = self.cleaned_data.get('tagged_vlans') if self.is_bound else \
self.get_initial_for_field(self.fields['tagged_vlans'], 'tagged_vlans')
else:
tagged_vlans = []

# Validate tagged VLANs; must be a global VLAN or in the same site
elif self.cleaned_data['mode'] == InterfaceModeChoices.MODE_TAGGED and tagged_vlans:
if self.cleaned_data['mode'] == InterfaceModeChoices.MODE_TAGGED and tagged_vlans:
valid_sites = [None, self.cleaned_data[parent_field].site]
invalid_vlans = [str(v) for v in tagged_vlans if v.site not in valid_sites]

Expand Down
2 changes: 2 additions & 0 deletions netbox/dcim/models/device_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,8 @@ def clean(self):
raise ValidationError({'rf_channel_width': _("Cannot specify custom width with channel selected.")})

# VLAN validation
if not self.mode and self.untagged_vlan:
raise ValidationError({'untagged_vlan': _("Interface mode does not support an untagged vlan.")})

# Validate untagged VLAN
if self.untagged_vlan and self.untagged_vlan.site not in [self.device.site, None]:
Expand Down
70 changes: 70 additions & 0 deletions netbox/dcim/tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import json

from django.test import override_settings
from django.urls import reverse
from django.utils.translation import gettext as _
Expand Down Expand Up @@ -1748,6 +1750,23 @@ def setUpTestData(cls):
},
]

def _perform_interface_test_with_invalid_data(self, mode: str = None, invalid_data: dict = {}):
device = Device.objects.first()
data = {
'device': device.pk,
'name': 'Interface 1',
'type': InterfaceTypeChoices.TYPE_1GE_FIXED,
}
data.update({'mode': mode})
data.update(invalid_data)

response = self.client.post(self._get_list_url(), data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
content = json.loads(response.content)
for key in invalid_data.keys():
self.assertIn(key, content)
self.assertIsNone(content.get('data'))

def test_bulk_delete_child_interfaces(self):
interface1 = Interface.objects.get(name='Interface 1')
device = interface1.device
Expand Down Expand Up @@ -1775,6 +1794,57 @@ def test_bulk_delete_child_interfaces(self):
self.client.delete(self._get_list_url(), data, format='json', **self.header)
self.assertEqual(device.interfaces.count(), 2) # Child & parent were both deleted

def test_create_child_interfaces_mode_invalid_data(self):
"""
POST data to test interface mode check and invalid tagged/untagged VLANS.
"""
self.add_permissions('dcim.add_interface')

vlans = VLAN.objects.all()[0:3]

# Routed mode, untagged, tagged and qinq service vlan
invalid_data = {
'untagged_vlan': vlans[0].pk,
'tagged_vlans': [vlans[1].pk, vlans[2].pk],
'qinq_svlan': vlans[2].pk
}
self._perform_interface_test_with_invalid_data(None, invalid_data)

# Routed mode, untagged and tagged vlan
invalid_data = {
'untagged_vlan': vlans[0].pk,
'tagged_vlans': [vlans[1].pk, vlans[2].pk],
}
self._perform_interface_test_with_invalid_data(None, invalid_data)

# Routed mode, untagged vlan
invalid_data = {
'untagged_vlan': vlans[0].pk,
}
self._perform_interface_test_with_invalid_data(None, invalid_data)

invalid_data = {
'tagged_vlans': [vlans[1].pk, vlans[2].pk],
}
# Routed mode, qinq service vlan
self._perform_interface_test_with_invalid_data(None, invalid_data)
# Access mode, tagged vlans
self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_ACCESS, invalid_data)
# All tagged mode, tagged vlans
self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_TAGGED_ALL, invalid_data)

invalid_data = {
'qinq_svlan': vlans[0].pk,
}
# Routed mode, qinq service vlan
self._perform_interface_test_with_invalid_data(None, invalid_data)
# Access mode, qinq service vlan
self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_ACCESS, invalid_data)
# Tagged mode, qinq service vlan
self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_TAGGED, invalid_data)
# Tagged-all mode, qinq service vlan
self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_TAGGED_ALL, invalid_data)


class FrontPortTest(APIViewTestCases.APIViewTestCase):
model = FrontPort
Expand Down
166 changes: 164 additions & 2 deletions netbox/dcim/tests/test_forms.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from django.test import TestCase

from dcim.choices import DeviceFaceChoices, DeviceStatusChoices, InterfaceTypeChoices
from dcim.choices import DeviceFaceChoices, DeviceStatusChoices, InterfaceTypeChoices, InterfaceModeChoices
from dcim.forms import *
from dcim.models import *
from ipam.models import VLAN
from utilities.testing import create_test_device
from virtualization.models import Cluster, ClusterGroup, ClusterType

Expand Down Expand Up @@ -117,11 +118,23 @@ def test_non_racked_device_with_position(self):
self.assertIn('position', form.errors)


class LabelTestCase(TestCase):
class InterfaceTestCase(TestCase):

@classmethod
def setUpTestData(cls):
cls.device = create_test_device('Device 1')
cls.vlans = (
VLAN(name='VLAN 1', vid=1),
VLAN(name='VLAN 2', vid=2),
VLAN(name='VLAN 3', vid=3),
)
VLAN.objects.bulk_create(cls.vlans)
cls.interface = Interface.objects.create(
device=cls.device,
name='Interface 1',
type=InterfaceTypeChoices.TYPE_1GE_GBIC,
mode=InterfaceModeChoices.MODE_TAGGED,
)

def test_interface_label_count_valid(self):
"""
Expand Down Expand Up @@ -151,3 +164,152 @@ def test_interface_label_count_mismatch(self):

self.assertFalse(form.is_valid())
self.assertIn('label', form.errors)

def test_create_interface_mode_valid_data(self):
"""
Test that saving valid interface mode and tagged/untagged vlans works properly
"""

# Validate access mode
data = {
'device': self.device.pk,
'name': 'ethernet1/1',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_ACCESS,
'untagged_vlan': self.vlans[0].pk
}
form = InterfaceCreateForm(data)

self.assertTrue(form.is_valid())

# Validate tagged vlans
data = {
'device': self.device.pk,
'name': 'ethernet1/2',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_TAGGED,
'untagged_vlan': self.vlans[0].pk,
'tagged_vlans': [self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceCreateForm(data)
self.assertTrue(form.is_valid())

# Validate tagged vlans
data = {
'device': self.device.pk,
'name': 'ethernet1/3',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_TAGGED_ALL,
'untagged_vlan': self.vlans[0].pk,
}
form = InterfaceCreateForm(data)
self.assertTrue(form.is_valid())

def test_create_interface_mode_access_invalid_data(self):
"""
Test that saving invalid interface mode and tagged/untagged vlans works properly
"""
data = {
'device': self.device.pk,
'name': 'ethernet1/4',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_ACCESS,
'untagged_vlan': self.vlans[0].pk,
'tagged_vlans': [self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceCreateForm(data)

self.assertTrue(form.is_valid())
self.assertIn('untagged_vlan', form.cleaned_data.keys())
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())

def test_edit_interface_mode_access_invalid_data(self):
"""
Test that saving invalid interface mode and tagged/untagged vlans works properly
"""
data = {
'device': self.device.pk,
'name': 'Ethernet 1/5',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_ACCESS,
'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceForm(data, instance=self.interface)

self.assertTrue(form.is_valid())
self.assertIn('untagged_vlan', form.cleaned_data.keys())
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())

def test_create_interface_mode_tagged_all_invalid_data(self):
"""
Test that saving invalid interface mode and tagged/untagged vlans works properly
"""
data = {
'device': self.device.pk,
'name': 'ethernet1/6',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_TAGGED_ALL,
'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceCreateForm(data)

self.assertTrue(form.is_valid())
self.assertIn('untagged_vlan', form.cleaned_data.keys())
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())

def test_edit_interface_mode_tagged_all_invalid_data(self):
"""
Test that saving invalid interface mode and tagged/untagged vlans works properly
"""
data = {
'device': self.device.pk,
'name': 'Ethernet 1/7',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_TAGGED_ALL,
'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceForm(data)
self.assertTrue(form.is_valid())
self.assertIn('untagged_vlan', form.cleaned_data.keys())
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())

def test_create_interface_mode_routed_invalid_data(self):
"""
Test that saving invalid interface mode (routed) and tagged/untagged vlans works properly
"""
data = {
'device': self.device.pk,
'name': 'ethernet1/6',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': None,
'untagged_vlan': self.vlans[0].pk,
'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceCreateForm(data)

self.assertTrue(form.is_valid())
self.assertNotIn('untagged_vlan', form.cleaned_data.keys())
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())

def test_edit_interface_mode_routed_invalid_data(self):
"""
Test that saving invalid interface mode (routed) and tagged/untagged vlans works properly
"""
data = {
'device': self.device.pk,
'name': 'Ethernet 1/7',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': None,
'untagged_vlan': self.vlans[0].pk,
'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceForm(data)
self.assertTrue(form.is_valid())
self.assertNotIn('untagged_vlan', form.cleaned_data.keys())
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())