Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion modules/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
"""Core infrastructure modules for the AI Secretary System."""

from modules.core.events import BaseEvent, DatasetSynced, EventBus, SessionRevoked, UserRoleChanged
from modules.core.events import (
BaseEvent,
ConfigChanged,
DatasetSynced,
EventBus,
SessionRevoked,
UserRoleChanged,
)
from modules.core.health import HealthRegistry, HealthStatus
from modules.core.tasks import TaskInfo, TaskRegistry


__all__ = [
"BaseEvent",
"ConfigChanged",
"DatasetSynced",
"EventBus",
"HealthRegistry",
Expand Down
15 changes: 15 additions & 0 deletions modules/core/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ class DatasetSynced(BaseEvent):
delete_collection: bool = False


@dataclass
class ConfigChanged(BaseEvent):
"""Emitted when a global config key is updated via ConfigService.

Subscribers should filter by *namespace* (derived from the key,
e.g. ``"widget"``, ``"telegram"``, ``"tts"``) and ignore irrelevant
changes. Handlers must be idempotent.
"""

key: str = ""
value: Any = None
previous_value: Any = None
namespace: str = "" # first segment of dotted key, or key itself


class EventBus:
"""Simple in-process pub/sub for async event handlers.

Expand Down
49 changes: 45 additions & 4 deletions modules/core/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@
logger = logging.getLogger(__name__)


def _get_event_bus():
"""Return the EventBus from the app container, or None during startup."""
try:
from app.dependencies import get_container

return get_container().event_bus
except Exception:
return None


class DatabaseService:
"""Singleton manager for database operations.
Provides async context managers for repository access.
Expand Down Expand Up @@ -513,12 +523,35 @@ async def get(self, key: str, default: Any = None) -> Any:
return await repo.get_config(key, default)

async def set(self, key: str, value: Any) -> bool:
"""Set config value."""
"""Set config value and publish ConfigChanged event."""
async with AsyncSessionLocal() as session:
repo = ConfigRepository(session)
previous_value = await repo.get_config(key)
result = await repo.set_config(key, value)
await session.commit()
return result

await self._publish_config_changed(key, value, previous_value)
return result

async def _publish_config_changed(self, key: str, value: Any, previous_value: Any) -> None:
"""Publish ConfigChanged via EventBus if available."""
try:
from modules.core.events import ConfigChanged

event_bus = _get_event_bus()
if event_bus is None:
return
namespace = key.split(".", maxsplit=1)[0] if key else ""
await event_bus.publish(
ConfigChanged(
key=key,
value=value,
previous_value=previous_value,
namespace=namespace,
)
)
except Exception:
logger.debug("ConfigChanged publish skipped", exc_info=True)

async def get_telegram(self) -> dict:
"""Get Telegram config."""
Expand All @@ -530,9 +563,13 @@ async def set_telegram(self, config: dict) -> bool:
"""Set Telegram config."""
async with AsyncSessionLocal() as session:
repo = ConfigRepository(session)
previous_value = await repo.get_telegram_config()
result = await repo.set_telegram_config(config)
await session.commit()
return result

merged = {**previous_value, **config}
await self._publish_config_changed("telegram", merged, previous_value)
return result

async def get_widget(self) -> dict:
"""Get widget config."""
Expand All @@ -544,9 +581,13 @@ async def set_widget(self, config: dict) -> bool:
"""Set widget config."""
async with AsyncSessionLocal() as session:
repo = ConfigRepository(session)
previous_value = await repo.get_widget_config()
result = await repo.set_widget_config(config)
await session.commit()
return result

merged = {**previous_value, **config}
await self._publish_config_changed("widget", merged, previous_value)
return result


class UserIdentityService:
Expand Down
24 changes: 24 additions & 0 deletions modules/core/startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,30 @@ async def on_session_revoked(event: SessionRevoked) -> None:
event_bus.subscribe(UserRoleChanged, on_user_role_changed)
event_bus.subscribe(SessionRevoked, on_session_revoked)

# Config change audit logging
from modules.core.events import ConfigChanged
from modules.monitoring.service import audit_service

async def on_config_changed(event: ConfigChanged) -> None:
"""Log config changes to audit trail."""
await audit_service.log(
action="config_changed",
resource="config",
resource_id=event.key,
details={
"namespace": event.namespace,
"previous_value": event.previous_value,
"new_value": event.value,
},
)
logger.info(
"ConfigChanged handled: key=%s namespace=%s",
event.key,
event.namespace,
)

event_bus.subscribe(ConfigChanged, on_config_changed)

# Domain-specific subscriptions
from modules.channels.startup import setup_channel_event_subscriptions
from modules.crm.startup import setup_crm_event_subscriptions
Expand Down
199 changes: 199 additions & 0 deletions tests/unit/test_config_changed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
"""Tests for ConfigChanged event — publish from ConfigService + audit subscriber."""

from unittest.mock import AsyncMock, MagicMock, patch

from modules.core.events import ConfigChanged, EventBus
from modules.core.startup import setup_event_subscriptions


# ---------------------------------------------------------------------------
# Event dataclass
# ---------------------------------------------------------------------------


def test_config_changed_fields():
"""ConfigChanged carries key, value, previous_value, namespace."""
event = ConfigChanged(
key="widget",
value={"title": "New"},
previous_value={"title": "Old"},
namespace="widget",
)
assert event.key == "widget"
assert event.value == {"title": "New"}
assert event.previous_value == {"title": "Old"}
assert event.namespace == "widget"
assert event.timestamp > 0


def test_config_changed_defaults():
"""Default field values are sensible."""
event = ConfigChanged()
assert event.key == ""
assert event.value is None
assert event.previous_value is None
assert event.namespace == ""


# ---------------------------------------------------------------------------
# Namespace derivation
# ---------------------------------------------------------------------------


def test_namespace_from_dotted_key():
"""Namespace is derived as first segment of dotted key."""
key = "widget.colors.primary"
namespace = key.split(".", maxsplit=1)[0]
assert namespace == "widget"


def test_namespace_from_simple_key():
"""Simple key uses itself as namespace."""
key = "telegram"
namespace = key.split(".", maxsplit=1)[0]
assert namespace == "telegram"


# ---------------------------------------------------------------------------
# ConfigService.set() publishes event
# ---------------------------------------------------------------------------


async def test_set_publishes_config_changed():
"""ConfigService.set() publishes ConfigChanged after commit."""
bus = EventBus()
received: list[ConfigChanged] = []

async def handler(event: ConfigChanged) -> None:
received.append(event)

bus.subscribe(ConfigChanged, handler)

mock_repo = MagicMock()
mock_repo.get_config = AsyncMock(return_value="old_val")
mock_repo.set_config = AsyncMock(return_value=True)

mock_session = AsyncMock()
mock_session.commit = AsyncMock()

mock_container = MagicMock()
mock_container.event_bus = bus

from modules.core.service import ConfigService

svc = ConfigService()

with (
patch("modules.core.service.AsyncSessionLocal") as mock_asl,
patch("modules.core.service._get_event_bus", return_value=bus),
):
mock_asl.return_value.__aenter__ = AsyncMock(return_value=mock_session)
mock_asl.return_value.__aexit__ = AsyncMock(return_value=False)
mock_session.__class__ = type(mock_session)
# Patch ConfigRepository constructor
with patch("modules.core.service.ConfigRepository", return_value=mock_repo):
result = await svc.set("tts", {"engine": "piper"})

assert result is True
assert len(received) == 1
assert received[0].key == "tts"
assert received[0].value == {"engine": "piper"}
assert received[0].previous_value == "old_val"
assert received[0].namespace == "tts"


async def test_set_works_without_event_bus():
"""ConfigService.set() works even when EventBus is unavailable."""
mock_repo = MagicMock()
mock_repo.get_config = AsyncMock(return_value=None)
mock_repo.set_config = AsyncMock(return_value=True)

mock_session = AsyncMock()
mock_session.commit = AsyncMock()

from modules.core.service import ConfigService

svc = ConfigService()

with (
patch("modules.core.service.AsyncSessionLocal") as mock_asl,
patch("modules.core.service._get_event_bus", return_value=None),
patch("modules.core.service.ConfigRepository", return_value=mock_repo),
):
mock_asl.return_value.__aenter__ = AsyncMock(return_value=mock_session)
mock_asl.return_value.__aexit__ = AsyncMock(return_value=False)

result = await svc.set("llm", {"backend": "gemini"})

assert result is True


# ---------------------------------------------------------------------------
# Audit subscriber
# ---------------------------------------------------------------------------


async def test_audit_subscriber_logs_config_change():
"""on_config_changed handler writes audit log entry."""
bus = EventBus()
mock_audit = AsyncMock()

with (
patch("auth_manager._member_role_cache", MagicMock()),
patch("auth_manager.revoke_all_user_sessions", AsyncMock()),
patch("modules.monitoring.service.audit_service", mock_audit),
):
await setup_event_subscriptions(bus)
await bus.publish(
ConfigChanged(
key="tts",
value={"engine": "piper"},
previous_value={"engine": "xtts"},
namespace="tts",
)
)

mock_audit.log.assert_awaited_once_with(
action="config_changed",
resource="config",
resource_id="tts",
details={
"namespace": "tts",
"previous_value": {"engine": "xtts"},
"new_value": {"engine": "piper"},
},
)


async def test_audit_subscriber_handles_repeated_events_idempotently():
"""Publishing same event twice creates two independent audit entries."""
bus = EventBus()
mock_audit = AsyncMock()

with (
patch("auth_manager._member_role_cache", MagicMock()),
patch("auth_manager.revoke_all_user_sessions", AsyncMock()),
patch("modules.monitoring.service.audit_service", mock_audit),
):
await setup_event_subscriptions(bus)
event = ConfigChanged(key="widget", value="v1", namespace="widget")
await bus.publish(event)
await bus.publish(event)

assert mock_audit.log.await_count == 2


async def test_config_changed_handler_error_does_not_propagate():
"""Audit handler failure should not raise to publisher."""
bus = EventBus()
mock_audit = MagicMock()
mock_audit.log = AsyncMock(side_effect=RuntimeError("db gone"))

with (
patch("auth_manager._member_role_cache", MagicMock()),
patch("auth_manager.revoke_all_user_sessions", AsyncMock()),
patch("modules.monitoring.service.audit_service", mock_audit),
):
await setup_event_subscriptions(bus)
# Should not raise
await bus.publish(ConfigChanged(key="tts", value="new", namespace="tts"))
Loading