From fdfce81e6099129dc8965f6d79f14228967e27c9 Mon Sep 17 00:00:00 2001 From: shaerware Date: Sun, 29 Mar 2026 01:21:25 +0000 Subject: [PATCH] feat(core): AuthService facade wrapping auth_manager.py (#642) Phase 7.1: class-based AuthService implementing the Protocol from modules/core/protocols.py. Delegates to existing auth_manager functions (Strangler Fig) and to UserService/RoleService/WorkspaceService for user/role/workspace lookups. Migrates startup.py event handlers to use auth_service instead of importing _member_role_cache directly. Co-Authored-By: Claude Opus 4.6 (1M context) --- modules/core/auth_service.py | 262 ++++++++++++++++++++++++++++++++ modules/core/startup.py | 12 +- tests/unit/test_auth_service.py | 167 ++++++++++++++++++++ 3 files changed, 435 insertions(+), 6 deletions(-) create mode 100644 modules/core/auth_service.py create mode 100644 tests/unit/test_auth_service.py diff --git a/modules/core/auth_service.py b/modules/core/auth_service.py new file mode 100644 index 0000000..badd68f --- /dev/null +++ b/modules/core/auth_service.py @@ -0,0 +1,262 @@ +"""AuthService facade — class-based wrapper over auth_manager functions. + +Implements the ``AuthService`` Protocol from ``modules.core.protocols``. +Delegates to existing ``auth_manager`` functions (Strangler Fig pattern) +and to ``UserService`` / ``RoleService`` / ``WorkspaceService`` for +user/role/workspace lookups. + +FastAPI dependencies (``require_permission``, ``get_current_user``, etc.) +are **not** part of this facade — they stay in ``auth_manager.py``. +""" + +import logging +from typing import Optional + +from modules.core.schemas import ( + LoginResult, + RoleInfo, + UserInfo, + WorkspaceInfo, + WorkspaceMemberInfo, +) + + +logger = logging.getLogger(__name__) + + +def _user_dict_to_info(d: dict) -> UserInfo: + """Convert a dict from UserService/UserRepository to UserInfo TypedDict.""" + return UserInfo( + id=d["id"], + username=d["username"], + role=d.get("role", "user"), + display_name=d.get("display_name"), + is_active=d.get("is_active", True), + workspace_id=d.get("workspace_id", 1), + created=d.get("created") or d.get("created_at"), + last_login=d.get("last_login"), + ) + + +def _role_dict_to_info(d: dict) -> RoleInfo: + """Convert a dict from RoleService to RoleInfo TypedDict.""" + return RoleInfo( + id=d["id"], + name=d["name"], + display_name=d.get("display_name"), + description=d.get("description"), + is_system=d.get("is_system", False), + permissions=d.get("permissions", {}), + ) + + +class AuthService: + """Facade implementing the AuthService Protocol. + + Thin wrapper delegating to ``auth_manager`` functions and domain + services. Internal caches (session, permissions, member-role) + remain in ``auth_manager``; this class doesn't duplicate them. + """ + + # -- Authentication ------------------------------------------------------- + + async def authenticate( + self, + username: str, + password: str, + *, + ip: Optional[str] = None, + user_agent: Optional[str] = None, + ) -> LoginResult: + """Validate credentials, create session, return token + user info. + + Raises ``ValueError`` if credentials are invalid. + """ + from auth_manager import authenticate_user, create_session + + user = await authenticate_user(username, password) + if user is None: + raise ValueError("Invalid credentials") + + login_resp = await create_session( + username=user.username, + role=user.role, + user_id=user.id, + ip=ip, + user_agent=user_agent, + workspace_id=user.workspace_id, + ) + + # Build UserInfo for the response + user_info = await self.get_user(user.id) + if user_info is None: + # Fallback for legacy env-var user (id=0) + user_info = UserInfo( + id=user.id, + username=user.username, + role=user.role, + display_name=None, + is_active=True, + workspace_id=user.workspace_id, + created=None, + last_login=None, + ) + + return LoginResult( + access_token=login_resp.access_token, + token_type=login_resp.token_type, + expires_in=login_resp.expires_in, + user=user_info, + ) + + async def validate_token(self, token: str) -> Optional[UserInfo]: + """Decode and validate a JWT. Returns None if invalid/expired.""" + from auth_manager import _validate_session, decode_token + + payload = decode_token(token) + if payload is None: + return None + + user = await _validate_session(payload) + if user is None: + return None + + return UserInfo( + id=user.id, + username=user.username, + role=user.role, + display_name=None, + is_active=True, + workspace_id=user.workspace_id, + created=None, + last_login=None, + ) + + async def revoke_session(self, jti: str) -> bool: + """Revoke a single session by its JWT ID.""" + from auth_manager import revoke_session + + return await revoke_session(jti) + + async def revoke_all_sessions(self, user_id: int) -> int: + """Revoke all active sessions for a user. Returns count revoked.""" + from auth_manager import revoke_all_user_sessions + + return await revoke_all_user_sessions(user_id) + + # -- Permissions ---------------------------------------------------------- + + async def get_permissions(self, user_id: int) -> dict[str, str]: + """Return the effective permission map for a user.""" + from auth_manager import User as AuthUser + from auth_manager import get_user_permissions + from modules.core.service import user_service + + user_data = await user_service.get_by_id(user_id) + if user_data is None: + return {} + + # Build a minimal auth_manager.User for the existing function + auth_user = AuthUser( + id=user_data["id"], + username=user_data["username"], + role=user_data.get("role", "user"), + workspace_id=user_data.get("workspace_id", 1), + ) + return await get_user_permissions(auth_user) + + async def has_permission( + self, + user_id: int, + module: str, + min_level: str = "view", + ) -> bool: + """Check whether a user meets the minimum access level for a module.""" + from auth_manager import level_gte + + perms = await self.get_permissions(user_id) + return level_gte(perms.get(module, ""), min_level) + + # -- User management ------------------------------------------------------ + + async def get_user(self, user_id: int) -> Optional[UserInfo]: + """Look up a user by ID.""" + from modules.core.service import user_service + + data = await user_service.get_by_id(user_id) + if data is None: + return None + return _user_dict_to_info(data) + + async def list_users( + self, + *, + workspace_id: Optional[int] = None, + include_inactive: bool = False, + ) -> list[UserInfo]: + """List users, optionally filtered by workspace.""" + from modules.core.service import user_service + + users = await user_service.list_users(include_inactive=include_inactive) + result = [_user_dict_to_info(u) for u in users] + if workspace_id is not None: + result = [u for u in result if u["workspace_id"] == workspace_id] + return result + + # -- Roles ---------------------------------------------------------------- + + async def get_roles(self) -> list[RoleInfo]: + """List all RBAC roles with their permission maps.""" + from modules.core.service import role_service + + roles = await role_service.get_all_with_permissions() + return [_role_dict_to_info(r) for r in roles] + + # -- Workspaces ----------------------------------------------------------- + + async def get_workspace(self, workspace_id: int) -> Optional[WorkspaceInfo]: + """Look up a workspace by ID.""" + from modules.core.service import workspace_service + + data = await workspace_service.get_workspace_info(workspace_id) + if data is None: + return None + return WorkspaceInfo( + id=data["id"], + name=data["name"], + slug=data.get("slug", ""), + owner_id=data.get("owner_id"), + created=data.get("created") or data.get("created_at"), + ) + + async def list_members(self, workspace_id: int) -> list[WorkspaceMemberInfo]: + """List members of a workspace.""" + from modules.core.service import workspace_service + + members = await workspace_service.list_members(workspace_id) + return [ + WorkspaceMemberInfo( + user_id=m["user_id"], + username=m.get("username", ""), + role_name=m.get("role_name", ""), + joined_at=m.get("joined_at"), + ) + for m in members + ] + + # -- Cache management (not in Protocol) ----------------------------------- + + def invalidate_permissions_cache(self, role_name: Optional[str] = None) -> None: + """Clear permissions cache. Delegates to auth_manager.""" + from auth_manager import invalidate_permissions_cache + + invalidate_permissions_cache(role_name) + + def invalidate_member_role_cache(self, user_id: int) -> None: + """Clear member-role cache for a user. Delegates to auth_manager.""" + from auth_manager import _member_role_cache + + _member_role_cache.invalidate_user(user_id) + + +auth_service = AuthService() diff --git a/modules/core/startup.py b/modules/core/startup.py index 70a51e6..625841f 100644 --- a/modules/core/startup.py +++ b/modules/core/startup.py @@ -165,10 +165,10 @@ async def setup_event_subscriptions(event_bus) -> None: async def on_user_role_changed(event: UserRoleChanged) -> None: """Invalidate caches and revoke sessions on role change.""" - from auth_manager import _member_role_cache, revoke_all_user_sessions + from modules.core.auth_service import auth_service - _member_role_cache.invalidate_user(event.user_id) - await revoke_all_user_sessions(event.user_id) + auth_service.invalidate_member_role_cache(event.user_id) + await auth_service.revoke_all_sessions(event.user_id) logger.info( "UserRoleChanged handled: user=%d role=%s->%s", event.user_id, @@ -178,10 +178,10 @@ async def on_user_role_changed(event: UserRoleChanged) -> None: async def on_session_revoked(event: SessionRevoked) -> None: """Revoke all sessions and invalidate caches for a user.""" - from auth_manager import _member_role_cache, revoke_all_user_sessions + from modules.core.auth_service import auth_service - _member_role_cache.invalidate_user(event.user_id) - await revoke_all_user_sessions(event.user_id) + auth_service.invalidate_member_role_cache(event.user_id) + await auth_service.revoke_all_sessions(event.user_id) logger.info( "SessionRevoked handled: user=%d reason=%s", event.user_id, diff --git a/tests/unit/test_auth_service.py b/tests/unit/test_auth_service.py new file mode 100644 index 0000000..9e0b4f0 --- /dev/null +++ b/tests/unit/test_auth_service.py @@ -0,0 +1,167 @@ +"""Tests for AuthService facade. + +Verifies that: +- AuthService satisfies the Protocol structurally +- All Protocol methods exist with correct signatures +- Converter helpers produce correct TypedDict shapes +""" + +import inspect + +from modules.core.auth_service import AuthService, _role_dict_to_info, _user_dict_to_info +from modules.core.protocols import AuthService as AuthServiceProtocol + + +class TestAuthServiceProtocolCompliance: + """Verify AuthService class matches the Protocol.""" + + def test_has_all_protocol_methods(self): + """AuthService must implement every method from the Protocol.""" + protocol_methods = { + name + for name, _ in inspect.getmembers(AuthServiceProtocol, predicate=inspect.isfunction) + if not name.startswith("_") + } + impl_methods = { + name + for name in dir(AuthService) + if not name.startswith("_") and callable(getattr(AuthService, name)) + } + missing = protocol_methods - impl_methods + assert not missing, f"AuthService missing Protocol methods: {missing}" + + def test_authenticate_signature(self): + sig = inspect.signature(AuthService.authenticate) + params = list(sig.parameters.keys()) + assert "self" in params + assert "username" in params + assert "password" in params + + def test_validate_token_signature(self): + sig = inspect.signature(AuthService.validate_token) + params = list(sig.parameters.keys()) + assert "self" in params + assert "token" in params + + def test_revoke_session_signature(self): + sig = inspect.signature(AuthService.revoke_session) + params = list(sig.parameters.keys()) + assert "self" in params + assert "jti" in params + + def test_revoke_all_sessions_signature(self): + sig = inspect.signature(AuthService.revoke_all_sessions) + params = list(sig.parameters.keys()) + assert "self" in params + assert "user_id" in params + + def test_get_permissions_signature(self): + sig = inspect.signature(AuthService.get_permissions) + params = list(sig.parameters.keys()) + assert "self" in params + assert "user_id" in params + + def test_has_permission_signature(self): + sig = inspect.signature(AuthService.has_permission) + params = list(sig.parameters.keys()) + assert "self" in params + assert "user_id" in params + assert "module" in params + assert "min_level" in params + + def test_get_user_signature(self): + sig = inspect.signature(AuthService.get_user) + params = list(sig.parameters.keys()) + assert "self" in params + assert "user_id" in params + + def test_list_users_signature(self): + sig = inspect.signature(AuthService.list_users) + params = list(sig.parameters.keys()) + assert "self" in params + + def test_get_roles_signature(self): + sig = inspect.signature(AuthService.get_roles) + params = list(sig.parameters.keys()) + assert "self" in params + + def test_get_workspace_signature(self): + sig = inspect.signature(AuthService.get_workspace) + params = list(sig.parameters.keys()) + assert "self" in params + assert "workspace_id" in params + + def test_list_members_signature(self): + sig = inspect.signature(AuthService.list_members) + params = list(sig.parameters.keys()) + assert "self" in params + assert "workspace_id" in params + + +class TestConverters: + """Test TypedDict converter helpers.""" + + def test_user_dict_to_info(self): + data = { + "id": 1, + "username": "admin", + "role": "admin", + "display_name": "Admin User", + "is_active": True, + "workspace_id": 1, + "created_at": "2026-01-01T00:00:00", + "last_login": None, + } + info = _user_dict_to_info(data) + assert info["id"] == 1 + assert info["username"] == "admin" + assert info["role"] == "admin" + assert info["display_name"] == "Admin User" + assert info["is_active"] is True + assert info["workspace_id"] == 1 + assert info["created"] == "2026-01-01T00:00:00" + assert info["last_login"] is None + + def test_user_dict_to_info_defaults(self): + data = {"id": 2, "username": "user2"} + info = _user_dict_to_info(data) + assert info["role"] == "user" + assert info["is_active"] is True + assert info["workspace_id"] == 1 + assert info["display_name"] is None + + def test_role_dict_to_info(self): + data = { + "id": 1, + "name": "admin", + "display_name": "Administrator", + "description": "Full access", + "is_system": True, + "permissions": {"chat": "manage", "llm": "manage"}, + } + info = _role_dict_to_info(data) + assert info["id"] == 1 + assert info["name"] == "admin" + assert info["is_system"] is True + assert info["permissions"]["chat"] == "manage" + + def test_role_dict_to_info_defaults(self): + data = {"id": 2, "name": "custom"} + info = _role_dict_to_info(data) + assert info["is_system"] is False + assert info["permissions"] == {} + assert info["display_name"] is None + + +class TestSingleton: + """Verify module-level singleton.""" + + def test_singleton_exported(self): + from modules.core.auth_service import auth_service + + assert isinstance(auth_service, AuthService) + + def test_singleton_is_same_instance(self): + from modules.core.auth_service import auth_service + + assert auth_service is auth_service # trivial but documents intent