|
| 1 | +""" |
| 2 | +Runtime plugin registration support for Basilisk. |
| 3 | +
|
| 4 | +Only Python-based registration is implemented today. C++ factories are stubbed |
| 5 | +out to support future extensions without breaking the public API. |
| 6 | +""" |
| 7 | + |
| 8 | +from __future__ import annotations |
| 9 | + |
| 10 | +from importlib import metadata |
| 11 | +from typing import Any, Callable, Iterable, Optional |
| 12 | + |
| 13 | +from Basilisk.architecture import sysModel |
| 14 | + |
| 15 | +ENTRY_POINT_GROUP = "basilisk.plugins" |
| 16 | +"""Python entry point group used to discover Basilisk plugins.""" |
| 17 | + |
| 18 | + |
| 19 | +class PluginRegistry: |
| 20 | + """Container for Basilisk plugin registrations.""" |
| 21 | + |
| 22 | + def __init__(self) -> None: |
| 23 | + self.py_modules: dict[str, type[sysModel.SysModel]] = {} |
| 24 | + self.factories: dict[str, Any] = {} |
| 25 | + |
| 26 | + def register_python_module(self, name: str, cls: type[sysModel.SysModel]) -> None: |
| 27 | + """Register a Python :class:`~Basilisk.architecture.sysModel.SysModel` subclass.""" |
| 28 | + if not isinstance(name, str) or not name: |
| 29 | + raise TypeError("Module name must be a non-empty string") |
| 30 | + if not isinstance(cls, type): |
| 31 | + raise TypeError("Only classes can be registered as Python modules") |
| 32 | + |
| 33 | + try: |
| 34 | + is_sysmodel = issubclass(cls, sysModel.SysModel) |
| 35 | + except TypeError as exc: # cls is not a class or similar edge cases |
| 36 | + raise TypeError("Only SysModel subclasses can be registered") from exc |
| 37 | + |
| 38 | + if not is_sysmodel: |
| 39 | + raise TypeError( |
| 40 | + f"Cannot register {cls!r} as '{name}': not a SysModel subclass" |
| 41 | + ) |
| 42 | + |
| 43 | + existing = self.py_modules.get(name) |
| 44 | + if existing is not None and existing is not cls: |
| 45 | + raise ValueError( |
| 46 | + f"Module name '{name}' already registered with {existing!r}" |
| 47 | + ) |
| 48 | + |
| 49 | + self.py_modules[name] = cls |
| 50 | + |
| 51 | + def register_factory(self, name: str, factory: Any) -> None: |
| 52 | + """ |
| 53 | + Register a future C++ factory. |
| 54 | +
|
| 55 | + No validation is performed yet; factories act as opaque callables or |
| 56 | + objects until C++ support is implemented. |
| 57 | + """ |
| 58 | + if not isinstance(name, str) or not name: |
| 59 | + raise TypeError("Factory name must be a non-empty string") |
| 60 | + |
| 61 | + existing = self.factories.get(name) |
| 62 | + if existing is not None and existing is not factory: |
| 63 | + raise ValueError(f"Factory name '{name}' already registered") |
| 64 | + |
| 65 | + self.factories[name] = factory |
| 66 | + |
| 67 | + |
| 68 | +GLOBAL_REGISTRY = PluginRegistry() |
| 69 | +"""Shared registry instance used across the Basilisk runtime.""" |
| 70 | + |
| 71 | +_PLUGINS_LOADED = False |
| 72 | + |
| 73 | + |
| 74 | +def _iter_plugin_entry_points() -> Iterable[metadata.EntryPoint]: |
| 75 | + """Return an iterable over all registered plugin entry points.""" |
| 76 | + entry_points = metadata.entry_points() |
| 77 | + if hasattr(entry_points, "select"): |
| 78 | + return entry_points.select(group=ENTRY_POINT_GROUP) |
| 79 | + return entry_points.get(ENTRY_POINT_GROUP, []) |
| 80 | + |
| 81 | + |
| 82 | +def _resolve_register_callable(obj: Any) -> Callable[[PluginRegistry], None]: |
| 83 | + """Normalize the value advertised by an entry point into a register callable.""" |
| 84 | + if callable(obj): |
| 85 | + return obj # The entry point points directly to register() |
| 86 | + |
| 87 | + register = getattr(obj, "register", None) |
| 88 | + if callable(register): |
| 89 | + return register |
| 90 | + |
| 91 | + raise TypeError( |
| 92 | + "Basilisk plugin entry points must reference a callable or an object with " |
| 93 | + "a callable 'register' attribute" |
| 94 | + ) |
| 95 | + |
| 96 | + |
| 97 | +def load_all_plugins(registry: Optional[PluginRegistry] = None) -> PluginRegistry: |
| 98 | + """ |
| 99 | + Discover and register all Basilisk plugins using ``importlib.metadata``. |
| 100 | +
|
| 101 | + The discovery process is idempotent; repeated calls do not re-register |
| 102 | + plugins. |
| 103 | + """ |
| 104 | + global _PLUGINS_LOADED |
| 105 | + |
| 106 | + if registry is None: |
| 107 | + registry = GLOBAL_REGISTRY |
| 108 | + |
| 109 | + if _PLUGINS_LOADED: |
| 110 | + return registry |
| 111 | + |
| 112 | + for entry_point in _iter_plugin_entry_points(): |
| 113 | + register = _resolve_register_callable(entry_point.load()) |
| 114 | + register(registry) |
| 115 | + |
| 116 | + _PLUGINS_LOADED = True |
| 117 | + return registry |
| 118 | + |
| 119 | + |
| 120 | +__all__ = ["GLOBAL_REGISTRY", "PluginRegistry", "load_all_plugins"] |
0 commit comments