Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/requirements-diode-netbox-plugin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ coverage==7.6.0
grpcio==1.62.1
protobuf==5.28.1
pytest==8.0.2
netboxlabs-netbox-branching
netboxlabs-netbox-branching==0.5.7
2 changes: 1 addition & 1 deletion netbox_diode_plugin/api/applier.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def _pre_apply(model_class: models.Model, change: Change, created: dict):
# resolve foreign key references to new objects
for ref_field in change.new_refs:
v = _get_path(data, ref_field)
if isinstance(v, (list, tuple)):
if isinstance(v, list | tuple):
ref_list = []
for ref in v:
if isinstance(ref, str):
Expand Down
6 changes: 3 additions & 3 deletions netbox_diode_plugin/api/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from collections import defaultdict
from dataclasses import dataclass, field
from enum import Enum
from zoneinfo import ZoneInfo

import netaddr
from django.apps import apps
Expand All @@ -20,7 +21,6 @@
from extras.models import CustomField
from netaddr.eui import EUI
from rest_framework import status
from zoneinfo import ZoneInfo

logger = logging.getLogger("netbox.diode_data")

Expand Down Expand Up @@ -166,7 +166,7 @@ def _validate_relations(self, change_data: dict, model: models.Model) -> tuple[l
excluded_relation_fields = []
rel_errors = defaultdict(list)
for f in model._meta.get_fields():
if isinstance(f, (GenericRelation, GenericForeignKey)):
if isinstance(f, GenericRelation | GenericForeignKey):
excluded_relation_fields.append(f.name)
continue
if not f.is_relation:
Expand Down Expand Up @@ -251,7 +251,7 @@ def error_from_validation_error(e, object_name):
if e.detail:
if isinstance(e.detail, dict):
errors[object_name] = e.detail
elif isinstance(e.detail, (list, tuple)):
elif isinstance(e.detail, list | tuple):
errors[object_name] = {
NON_FIELD_ERRORS: e.detail
}
Expand Down
86 changes: 86 additions & 0 deletions netbox_diode_plugin/api/compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/usr/bin/env python
# Copyright 2025 NetBox Labs Inc
"""Diode NetBox Plugin - API - Compatibility Transformations."""

import logging
import re
from collections import defaultdict
from functools import cache

from django.conf import settings
from packaging import version
from utilities.release import load_release_data

logger = logging.getLogger(__name__)

_MIGRATIONS_BY_OBJECT_TYPE = defaultdict(list)

def apply_entity_migrations(data: dict, object_type: str):
"""
Applies migrations to diode entity data prior to diffing to improve compatibility with current NetBox version.

These represent cases like deprecated fields that have been replaced with new fields, but
are supported for backwards compatibility.
"""
for migration in _MIGRATIONS_BY_OBJECT_TYPE.get(object_type, []):
logger.debug(f"Applying migration {migration.__name__} for {object_type}")
migration(data)

def _register_migration(func, min_version, max_version, object_type):
"""Registers a migration function."""
if in_version_range(min_version, max_version):
logger.debug(f"Registering migration {func.__name__} for {object_type}.")
_MIGRATIONS_BY_OBJECT_TYPE[object_type].append(func)
else:
logger.debug(f"Skipping migration {func.__name__} for {object_type}: {min_version} to {max_version}.")

@cache
def _current_netbox_version():
"""Returns the current version of NetBox."""
try:
return version.parse(settings.RELEASE.version)
except Exception:
logger.exception("Failed to determine current version of NetBox.")
return (0, 0, 0)

def in_version_range(min_version: str | None, max_version: str | None):
"""Returns True if the current version of NetBox is within the given version range."""
min_version = version.parse(min_version) if min_version else None
max_version = version.parse(max_version) if max_version else None
current_version = _current_netbox_version()
if min_version and current_version < min_version:
return False
if max_version and current_version > max_version:
return False
return True

def diode_migration(min_version: str, max_version: str | None, object_type: str):
"""Decorator to mark a function as a diode migration."""
def decorator(func):
_register_migration(func, min_version, max_version, object_type)
return func
return decorator

@diode_migration(min_version="4.3.0", max_version=None, object_type="ipam.service")
def _migrate_service_parent_object(data: dict):
"""Transforms ipam.service device and virtual_machine references to parent_object."""
device = data.pop("device", None)
if device:
if data.get("parent_object_device") is None:
data["parent_object_device"] = device
# else ignored.

virtual_machine = data.pop("virtual_machine", None)
if virtual_machine:
if data.get("parent_object_virtual_machine") is None:
data["parent_object_virtual_machine"] = virtual_machine
# else ignored.

@diode_migration(min_version="4.3.0", max_version=None, object_type="tenancy.contact")
def _migrate_contact_group(data: dict):
"""Transforms tenancy.contact group references to groups."""
group = data.pop("group", None)
if group:
if data.get("groups") is None:
data["groups"] = [group]
# else ignored.
2 changes: 1 addition & 1 deletion netbox_diode_plugin/api/differ.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def prechange_data_from_instance(instance) -> dict: # noqa: C901
custom_field_values = instance.get_custom_fields()
cfmap = {}
for cf, value in custom_field_values.items():
if isinstance(value, (datetime.datetime, datetime.date)):
if isinstance(value, datetime.datetime | datetime.date):
cfmap[cf.name] = value
else:
cfmap[cf.name] = cf.serialize(value)
Expand Down
72 changes: 63 additions & 9 deletions netbox_diode_plugin/api/matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import logging
from dataclasses import dataclass
from functools import cache, lru_cache
from typing import Type

import netaddr
from django.contrib.contenttypes.fields import ContentType
Expand All @@ -18,6 +17,7 @@
from extras.models.customfields import CustomField

from .common import UnresolvedReference
from .compat import in_version_range
from .plugin_utils import content_type_id, get_object_type, get_object_type_model

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -163,18 +163,28 @@
name="logical_service_name_no_device_or_vm",
model_class=get_object_type_model("ipam.service"),
condition=Q(device__isnull=True, virtual_machine__isnull=True),
max_version="4.2.99",
),
ObjectMatchCriteria(
fields=("name", "device"),
name="logical_service_name_on_device",
model_class=get_object_type_model("ipam.service"),
condition=Q(device__isnull=False),
max_version="4.2.99",
),
ObjectMatchCriteria(
fields=("name", "virtual_machine"),
name="logical_service_name_on_vm",
model_class=get_object_type_model("ipam.service"),
condition=Q(virtual_machine__isnull=False),
max_version="4.2.99",
),
ObjectMatchCriteria(
fields=("name", "parent_object_type", "parent_object_id"),
name="logical_service_name_on_parent",
model_class=get_object_type_model("ipam.service"),
condition=Q(parent_object_type__isnull=False),
min_version="4.3.0"
),
],
"dcim.modulebay": lambda: [
Expand Down Expand Up @@ -202,6 +212,32 @@
model_class=get_object_type_model("ipam.fhrpgroup"),
)
],
"tenancy.contact": lambda: [
ObjectMatchCriteria(
# contacts are unconstrained in 4.3.0
# in 4.2 they are constrained by unique name per group
fields=("name", ),
name="logical_contact_name",
model_class=get_object_type_model("tenancy.contact"),
min_version="4.3.0",
)
],
"dcim.devicerole": lambda: [
ObjectMatchCriteria(
fields=("name",),
name="logical_device_role_name_no_parent",
model_class=get_object_type_model("dcim.devicerole"),
condition=Q(parent__isnull=True),
min_version="4.3.0",
),
ObjectMatchCriteria(
fields=("slug",),
name="logical_device_role_slug_no_parent",
model_class=get_object_type_model("dcim.devicerole"),
condition=Q(parent__isnull=True),
min_version="4.3.0",
)
],
}

@dataclass
Expand All @@ -221,9 +257,12 @@ class ObjectMatchCriteria:
fields: tuple[str] | None = None
expressions: tuple | None = None
condition: Q | None = None
model_class: Type[models.Model] | None = None
model_class: type[models.Model] | None = None
name: str | None = None

min_version: str | None = None
max_version: str | None = None

def __hash__(self):
"""Hash the object match criteria."""
return hash((self.fields, self.expressions, self.condition, self.model_class.__name__, self.name))
Expand Down Expand Up @@ -365,7 +404,7 @@ def _build_expressions_queryset(self, data) -> models.QuerySet:
"""Builds a queryset for the constraint with the given data."""
data = self._prepare_data(data)
replacements = {
F(field): Value(value) if isinstance(value, (str, int, float, bool)) else value
F(field): Value(value) if isinstance(value, str | int | float | bool) else value
for field, value in data.items()
}

Expand Down Expand Up @@ -413,7 +452,10 @@ class CustomFieldMatcher:

name: str
custom_field: str
model_class: Type[models.Model]
model_class: type[models.Model]

min_version: str | None = None
max_version: str | None = None

def fingerprint(self, data: dict) -> str|None:
"""Fingerprint the custom field value."""
Expand Down Expand Up @@ -448,9 +490,12 @@ class GlobalIPNetworkIPMatcher:

ip_fields: tuple[str]
vrf_field: str
model_class: Type[models.Model]
model_class: type[models.Model]
name: str

min_version: str | None = None
max_version: str | None = None

def _check_condition(self, data: dict) -> bool:
"""Check the condition for the custom field."""
return data.get(self.vrf_field, None) is None
Expand Down Expand Up @@ -508,9 +553,12 @@ class VRFIPNetworkIPMatcher:

ip_fields: tuple[str]
vrf_field: str
model_class: Type[models.Model]
model_class: type[models.Model]
name: str

min_version: str | None = None
max_version: str | None = None

def _check_condition(self, data: dict) -> bool:
"""Check the condition for the custom field."""
return data.get(self.vrf_field, None) is not None
Expand Down Expand Up @@ -583,7 +631,10 @@ class AutoSlugMatcher:

name: str
slug_field: str
model_class: Type[models.Model]
model_class: type[models.Model]

min_version: str | None = None
max_version: str | None = None

def fingerprint(self, data: dict) -> str|None:
"""Fingerprint the custom field value."""
Expand Down Expand Up @@ -650,7 +701,10 @@ def _get_autoslug_matchers(model_class) -> list:
@lru_cache(maxsize=256)
def _get_model_matchers(model_class) -> list[ObjectMatchCriteria]:
object_type = get_object_type(model_class)
matchers = _LOGICAL_MATCHERS.get(object_type, lambda: [])()
matchers = [
x for x in _LOGICAL_MATCHERS.get(object_type, lambda: [])()
if in_version_range(x.min_version, x.max_version)
]

# collect single fields that are unique
for field in model_class._meta.fields:
Expand Down Expand Up @@ -750,7 +804,7 @@ def _fingerprint_all(data: dict, object_type: str|None = None) -> str:
if k.startswith("_"):
continue
values.append(k)
if isinstance(v, (list, tuple)):
if isinstance(v, list | tuple):
values.extend(sorted(v))
elif isinstance(v, dict):
values.append(_fingerprint_all(v))
Expand Down
7 changes: 5 additions & 2 deletions netbox_diode_plugin/api/plugin_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,8 @@ class RefInfo:
'ipam.service': {
'device': RefInfo(object_type='dcim.device', field_name='device'),
'ipaddresses': RefInfo(object_type='ipam.ipaddress', field_name='ipaddresses', is_many=True),
'parent_object_device': RefInfo(object_type='dcim.device', field_name='parent_object', is_generic=True),
'parent_object_virtual_machine': RefInfo(object_type='virtualization.virtualmachine', field_name='parent_object', is_generic=True),
'tags': RefInfo(object_type='extras.tag', field_name='tags', is_many=True),
'virtual_machine': RefInfo(object_type='virtualization.virtualmachine', field_name='virtual_machine'),
},
Expand Down Expand Up @@ -576,6 +578,7 @@ class RefInfo:
},
'tenancy.contact': {
'group': RefInfo(object_type='tenancy.contactgroup', field_name='group'),
'groups': RefInfo(object_type='tenancy.contactgroup', field_name='groups', is_many=True),
'tags': RefInfo(object_type='extras.tag', field_name='tags', is_many=True),
},
'tenancy.contactassignment': {
Expand Down Expand Up @@ -948,13 +951,13 @@ def get_json_ref_info(object_type: str|Type[models.Model], json_field_name: str)
'ipam.rir': frozenset(['custom_fields', 'description', 'is_private', 'name', 'slug', 'tags']),
'ipam.role': frozenset(['custom_fields', 'description', 'name', 'slug', 'tags', 'weight']),
'ipam.routetarget': frozenset(['comments', 'custom_fields', 'description', 'name', 'tags', 'tenant']),
'ipam.service': frozenset(['comments', 'custom_fields', 'description', 'device', 'ipaddresses', 'name', 'ports', 'protocol', 'tags', 'virtual_machine']),
'ipam.service': frozenset(['comments', 'custom_fields', 'description', 'device', 'ipaddresses', 'name', 'parent_object_id', 'parent_object_type', 'ports', 'protocol', 'tags', 'virtual_machine']),
'ipam.vlan': frozenset(['comments', 'custom_fields', 'description', 'group', 'name', 'qinq_role', 'qinq_svlan', 'role', 'site', 'status', 'tags', 'tenant', 'vid']),
'ipam.vlangroup': frozenset(['custom_fields', 'description', 'name', 'scope_id', 'scope_type', 'slug', 'tags', 'vid_ranges']),
'ipam.vlantranslationpolicy': frozenset(['description', 'name']),
'ipam.vlantranslationrule': frozenset(['description', 'local_vid', 'policy', 'remote_vid']),
'ipam.vrf': frozenset(['comments', 'custom_fields', 'description', 'enforce_unique', 'export_targets', 'import_targets', 'name', 'rd', 'tags', 'tenant']),
'tenancy.contact': frozenset(['address', 'comments', 'custom_fields', 'description', 'email', 'group', 'link', 'name', 'phone', 'tags', 'title']),
'tenancy.contact': frozenset(['address', 'comments', 'custom_fields', 'description', 'email', 'group', 'groups', 'link', 'name', 'phone', 'tags', 'title']),
'tenancy.contactassignment': frozenset(['contact', 'custom_fields', 'object_id', 'object_type', 'priority', 'role', 'tags']),
'tenancy.contactgroup': frozenset(['custom_fields', 'description', 'name', 'parent', 'slug', 'tags']),
'tenancy.contactrole': frozenset(['custom_fields', 'description', 'name', 'slug', 'tags']),
Expand Down
7 changes: 3 additions & 4 deletions netbox_diode_plugin/api/supported_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import logging
import time
from functools import lru_cache
from typing import List, Type

from django.apps import apps
from django.db import models
Expand Down Expand Up @@ -82,9 +81,9 @@ def extract_supported_models() -> dict[str, dict]:
return extracted_models


def get_prerequisites(model_class, fields) -> List[dict[str, str]]:
def get_prerequisites(model_class, fields) -> list[dict[str, str]]:
"""Get the prerequisite models for the model."""
prerequisites: List[dict[str, str]] = []
prerequisites: list[dict[str, str]] = []
prerequisite_models = getattr(model_class, "prerequisite_models", [])

for prereq in prerequisite_models:
Expand Down Expand Up @@ -252,7 +251,7 @@ def get_serializer_for_model(model, prefix=""):
return netbox_get_serializer_for_model(model, prefix)


def discover_models(root_packages: List[str]) -> list[Type[models.Model]]:
def discover_models(root_packages: list[str]) -> list[type[models.Model]]:
"""Discovers all model classes in specified root packages."""
discovered_models = []

Expand Down
Loading