diff --git a/modules/core/__init__.py b/modules/core/__init__.py index be6b581..6b734c4 100644 --- a/modules/core/__init__.py +++ b/modules/core/__init__.py @@ -1,12 +1,20 @@ """Core infrastructure modules for the AI Secretary System.""" -from modules.core.events import BaseEvent, DatasetSynced, EventBus, SessionRevoked, UserRoleChanged +from modules.core.events import ( + BaseEvent, + ConfigChanged, + DatasetSynced, + EventBus, + SessionRevoked, + UserRoleChanged, +) from modules.core.health import HealthRegistry, HealthStatus from modules.core.tasks import TaskInfo, TaskRegistry __all__ = [ "BaseEvent", + "ConfigChanged", "DatasetSynced", "EventBus", "HealthRegistry", diff --git a/modules/core/events.py b/modules/core/events.py index a658428..a4a9e43 100644 --- a/modules/core/events.py +++ b/modules/core/events.py @@ -83,6 +83,21 @@ class DatasetSynced(BaseEvent): delete_collection: bool = False +@dataclass +class ConfigChanged(BaseEvent): + """Emitted when a global config key is updated via ConfigService. + + Subscribers should filter by *namespace* (derived from the key, + e.g. ``"widget"``, ``"telegram"``, ``"tts"``) and ignore irrelevant + changes. Handlers must be idempotent. + """ + + key: str = "" + value: Any = None + previous_value: Any = None + namespace: str = "" # first segment of dotted key, or key itself + + class EventBus: """Simple in-process pub/sub for async event handlers. diff --git a/modules/core/service.py b/modules/core/service.py index bd3cc77..a9f2453 100644 --- a/modules/core/service.py +++ b/modules/core/service.py @@ -20,6 +20,16 @@ logger = logging.getLogger(__name__) +def _get_event_bus(): + """Return the EventBus from the app container, or None during startup.""" + try: + from app.dependencies import get_container + + return get_container().event_bus + except Exception: + return None + + class DatabaseService: """Singleton manager for database operations. Provides async context managers for repository access. @@ -513,12 +523,35 @@ async def get(self, key: str, default: Any = None) -> Any: return await repo.get_config(key, default) async def set(self, key: str, value: Any) -> bool: - """Set config value.""" + """Set config value and publish ConfigChanged event.""" async with AsyncSessionLocal() as session: repo = ConfigRepository(session) + previous_value = await repo.get_config(key) result = await repo.set_config(key, value) await session.commit() - return result + + await self._publish_config_changed(key, value, previous_value) + return result + + async def _publish_config_changed(self, key: str, value: Any, previous_value: Any) -> None: + """Publish ConfigChanged via EventBus if available.""" + try: + from modules.core.events import ConfigChanged + + event_bus = _get_event_bus() + if event_bus is None: + return + namespace = key.split(".", maxsplit=1)[0] if key else "" + await event_bus.publish( + ConfigChanged( + key=key, + value=value, + previous_value=previous_value, + namespace=namespace, + ) + ) + except Exception: + logger.debug("ConfigChanged publish skipped", exc_info=True) async def get_telegram(self) -> dict: """Get Telegram config.""" @@ -530,9 +563,13 @@ async def set_telegram(self, config: dict) -> bool: """Set Telegram config.""" async with AsyncSessionLocal() as session: repo = ConfigRepository(session) + previous_value = await repo.get_telegram_config() result = await repo.set_telegram_config(config) await session.commit() - return result + + merged = {**previous_value, **config} + await self._publish_config_changed("telegram", merged, previous_value) + return result async def get_widget(self) -> dict: """Get widget config.""" @@ -544,9 +581,13 @@ async def set_widget(self, config: dict) -> bool: """Set widget config.""" async with AsyncSessionLocal() as session: repo = ConfigRepository(session) + previous_value = await repo.get_widget_config() result = await repo.set_widget_config(config) await session.commit() - return result + + merged = {**previous_value, **config} + await self._publish_config_changed("widget", merged, previous_value) + return result class UserIdentityService: diff --git a/modules/core/startup.py b/modules/core/startup.py index 8b196a5..70a51e6 100644 --- a/modules/core/startup.py +++ b/modules/core/startup.py @@ -191,6 +191,30 @@ async def on_session_revoked(event: SessionRevoked) -> None: event_bus.subscribe(UserRoleChanged, on_user_role_changed) event_bus.subscribe(SessionRevoked, on_session_revoked) + # Config change audit logging + from modules.core.events import ConfigChanged + from modules.monitoring.service import audit_service + + async def on_config_changed(event: ConfigChanged) -> None: + """Log config changes to audit trail.""" + await audit_service.log( + action="config_changed", + resource="config", + resource_id=event.key, + details={ + "namespace": event.namespace, + "previous_value": event.previous_value, + "new_value": event.value, + }, + ) + logger.info( + "ConfigChanged handled: key=%s namespace=%s", + event.key, + event.namespace, + ) + + event_bus.subscribe(ConfigChanged, on_config_changed) + # Domain-specific subscriptions from modules.channels.startup import setup_channel_event_subscriptions from modules.crm.startup import setup_crm_event_subscriptions diff --git a/tests/unit/test_config_changed.py b/tests/unit/test_config_changed.py new file mode 100644 index 0000000..3cd7cc2 --- /dev/null +++ b/tests/unit/test_config_changed.py @@ -0,0 +1,199 @@ +"""Tests for ConfigChanged event — publish from ConfigService + audit subscriber.""" + +from unittest.mock import AsyncMock, MagicMock, patch + +from modules.core.events import ConfigChanged, EventBus +from modules.core.startup import setup_event_subscriptions + + +# --------------------------------------------------------------------------- +# Event dataclass +# --------------------------------------------------------------------------- + + +def test_config_changed_fields(): + """ConfigChanged carries key, value, previous_value, namespace.""" + event = ConfigChanged( + key="widget", + value={"title": "New"}, + previous_value={"title": "Old"}, + namespace="widget", + ) + assert event.key == "widget" + assert event.value == {"title": "New"} + assert event.previous_value == {"title": "Old"} + assert event.namespace == "widget" + assert event.timestamp > 0 + + +def test_config_changed_defaults(): + """Default field values are sensible.""" + event = ConfigChanged() + assert event.key == "" + assert event.value is None + assert event.previous_value is None + assert event.namespace == "" + + +# --------------------------------------------------------------------------- +# Namespace derivation +# --------------------------------------------------------------------------- + + +def test_namespace_from_dotted_key(): + """Namespace is derived as first segment of dotted key.""" + key = "widget.colors.primary" + namespace = key.split(".", maxsplit=1)[0] + assert namespace == "widget" + + +def test_namespace_from_simple_key(): + """Simple key uses itself as namespace.""" + key = "telegram" + namespace = key.split(".", maxsplit=1)[0] + assert namespace == "telegram" + + +# --------------------------------------------------------------------------- +# ConfigService.set() publishes event +# --------------------------------------------------------------------------- + + +async def test_set_publishes_config_changed(): + """ConfigService.set() publishes ConfigChanged after commit.""" + bus = EventBus() + received: list[ConfigChanged] = [] + + async def handler(event: ConfigChanged) -> None: + received.append(event) + + bus.subscribe(ConfigChanged, handler) + + mock_repo = MagicMock() + mock_repo.get_config = AsyncMock(return_value="old_val") + mock_repo.set_config = AsyncMock(return_value=True) + + mock_session = AsyncMock() + mock_session.commit = AsyncMock() + + mock_container = MagicMock() + mock_container.event_bus = bus + + from modules.core.service import ConfigService + + svc = ConfigService() + + with ( + patch("modules.core.service.AsyncSessionLocal") as mock_asl, + patch("modules.core.service._get_event_bus", return_value=bus), + ): + mock_asl.return_value.__aenter__ = AsyncMock(return_value=mock_session) + mock_asl.return_value.__aexit__ = AsyncMock(return_value=False) + mock_session.__class__ = type(mock_session) + # Patch ConfigRepository constructor + with patch("modules.core.service.ConfigRepository", return_value=mock_repo): + result = await svc.set("tts", {"engine": "piper"}) + + assert result is True + assert len(received) == 1 + assert received[0].key == "tts" + assert received[0].value == {"engine": "piper"} + assert received[0].previous_value == "old_val" + assert received[0].namespace == "tts" + + +async def test_set_works_without_event_bus(): + """ConfigService.set() works even when EventBus is unavailable.""" + mock_repo = MagicMock() + mock_repo.get_config = AsyncMock(return_value=None) + mock_repo.set_config = AsyncMock(return_value=True) + + mock_session = AsyncMock() + mock_session.commit = AsyncMock() + + from modules.core.service import ConfigService + + svc = ConfigService() + + with ( + patch("modules.core.service.AsyncSessionLocal") as mock_asl, + patch("modules.core.service._get_event_bus", return_value=None), + patch("modules.core.service.ConfigRepository", return_value=mock_repo), + ): + mock_asl.return_value.__aenter__ = AsyncMock(return_value=mock_session) + mock_asl.return_value.__aexit__ = AsyncMock(return_value=False) + + result = await svc.set("llm", {"backend": "gemini"}) + + assert result is True + + +# --------------------------------------------------------------------------- +# Audit subscriber +# --------------------------------------------------------------------------- + + +async def test_audit_subscriber_logs_config_change(): + """on_config_changed handler writes audit log entry.""" + bus = EventBus() + mock_audit = AsyncMock() + + with ( + patch("auth_manager._member_role_cache", MagicMock()), + patch("auth_manager.revoke_all_user_sessions", AsyncMock()), + patch("modules.monitoring.service.audit_service", mock_audit), + ): + await setup_event_subscriptions(bus) + await bus.publish( + ConfigChanged( + key="tts", + value={"engine": "piper"}, + previous_value={"engine": "xtts"}, + namespace="tts", + ) + ) + + mock_audit.log.assert_awaited_once_with( + action="config_changed", + resource="config", + resource_id="tts", + details={ + "namespace": "tts", + "previous_value": {"engine": "xtts"}, + "new_value": {"engine": "piper"}, + }, + ) + + +async def test_audit_subscriber_handles_repeated_events_idempotently(): + """Publishing same event twice creates two independent audit entries.""" + bus = EventBus() + mock_audit = AsyncMock() + + with ( + patch("auth_manager._member_role_cache", MagicMock()), + patch("auth_manager.revoke_all_user_sessions", AsyncMock()), + patch("modules.monitoring.service.audit_service", mock_audit), + ): + await setup_event_subscriptions(bus) + event = ConfigChanged(key="widget", value="v1", namespace="widget") + await bus.publish(event) + await bus.publish(event) + + assert mock_audit.log.await_count == 2 + + +async def test_config_changed_handler_error_does_not_propagate(): + """Audit handler failure should not raise to publisher.""" + bus = EventBus() + mock_audit = MagicMock() + mock_audit.log = AsyncMock(side_effect=RuntimeError("db gone")) + + with ( + patch("auth_manager._member_role_cache", MagicMock()), + patch("auth_manager.revoke_all_user_sessions", AsyncMock()), + patch("modules.monitoring.service.audit_service", mock_audit), + ): + await setup_event_subscriptions(bus) + # Should not raise + await bus.publish(ConfigChanged(key="tts", value="new", namespace="tts"))