diff --git a/docs/docs/manage/rbac.md b/docs/docs/manage/rbac.md index 1024a66708..432174da66 100644 --- a/docs/docs/manage/rbac.md +++ b/docs/docs/manage/rbac.md @@ -1,47 +1,336 @@ # RBAC Configuration -Role-based access control (RBAC) defines which actions users or teams can perform in MCP Gateway. This document outlines the model, current capabilities, and the roadmap toward finer-grained controls. +Role-based access control (RBAC) defines which actions users or teams can perform in MCP Gateway. This document covers the security model, token scoping semantics, and best practices for access control. --- -## Model +## Overview -- Subjects: users authenticated via SSO or basic/JWT -- Grouping: subjects can belong to one or more gateway teams -- Roles: admin, maintainer, viewer (initial baseline); future: per-entity scoped roles -- Resources: servers, tools, prompts, resources, gateway settings +MCP Gateway implements a multi-layered access control system: + +1. **Authentication**: Verify user identity via JWT, SSO, or API tokens +2. **Team Membership**: Group users for collective access policies +3. **Token Scoping**: Restrict token capabilities to specific resources +4. **Visibility Filtering**: Control which resources users can discover + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ Access Control Layers │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ Request → Authentication → Team Resolution → Token Scoping → Visibility │ +│ │ +│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ +│ │ JWT │ │ User │ │ Token │ │ Resource │ │ Filtered │ │ +│ │ Token │──▶│ Identity │──▶│ Teams │──▶│ Access │──▶│ Results │ │ +│ │ │ │ │ │ │ │ │ │ │ │ +│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` --- -## Current State +## Core Concepts + +### Subjects +Users authenticated via: +- JWT tokens (session or API) +- SSO providers (OAuth 2.0/OIDC) +- Basic authentication (development only) + +### Teams +Logical groups that: +- Organize users for access boundaries +- Own resources (tools, prompts, resources) +- Map from external identity providers (SSO groups) + +### Roles +Current role types: +- **Admin**: Full management access, can bypass team restrictions +- **Maintainer**: Manage servers, tools, prompts, configurations +- **Viewer**: Read-only access and metrics -- Authentication and administrative endpoints are protected; production deployments should enable auth and use JWTs for API calls. -- Team mapping on SSO login allows grouping users into stable teams that can be referenced by policy. -- Visibility per server and composition via virtual servers provide pragmatic control of what tools are exposed to clients. +### Resources +Protected entities: +- Servers (MCP gateways and virtual servers) +- Tools, Prompts, Resources (MCP primitives) +- System configuration and audit logs --- -## Planned Enhancements +## Token Scoping Model + +Token scoping controls what resources a token can access based on the `teams` claim in the JWT payload. This provides fine-grained access control for automation tokens, service accounts, and restricted user sessions. + +### Teams Claim Semantics + +The `teams` claim in JWT tokens determines resource visibility: + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ Token Teams Claim Handling │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ JWT Claim State │ Admin User │ Non-Admin User │ +├───────────────────────────┼───────────────────────┼─────────────────────────┤ +│ No "teams" key │ UNRESTRICTED │ PUBLIC-ONLY (secure) │ +│ teams: null │ UNRESTRICTED │ PUBLIC-ONLY (secure) │ +│ teams: [] │ PUBLIC-ONLY │ PUBLIC-ONLY │ +│ teams: ["team-id"] │ Team + Public │ Team + Public │ +│ teams: ["t1", "t2"] │ Both Teams + Public │ Both Teams + Public │ +└───────────────────────────┴───────────────────────┴─────────────────────────┘ +``` + +### Security Design Principles + +1. **Principle of Least Privilege** + - Non-admin tokens without explicit team scope default to public-only access + - This prevents accidental exposure of team resources + +2. **Scoped Automation Tokens** + - Admin tokens with `teams: []` are intentionally restricted to public resources + - Use case: CI/CD pipelines, monitoring systems, public API clients + +3. **Backward Compatible Admin Access** + - Admin session tokens (from UI login) omit the teams claim entirely + - This grants unrestricted access for administrative operations + +### Token Scoping Flow + +``` + ┌──────────────────┐ + │ JWT Token │ + │ Received │ + └────────┬─────────┘ + │ + ▼ + ┌───────────────────────┐ + │ Extract "teams" │ + │ claim from JWT │ + └───────────┬───────────┘ + │ + ┌───────────────┴───────────────┐ + │ │ + ▼ ▼ + ┌─────────────────────┐ ┌─────────────────────┐ + │ "teams" key exists │ │ "teams" key missing │ + │ with non-null value │ │ OR teams: null │ + └──────────┬──────────┘ └──────────┬──────────┘ + │ │ + ▼ ▼ + ┌─────────────────────┐ ┌─────────────────────┐ + │ Use explicit scope │ │ Check is_admin flag │ + │ teams = [...] or [] │ └──────────┬──────────┘ + └──────────┬──────────┘ │ + │ ┌───────────┴───────────┐ + │ │ │ + │ ▼ ▼ + │ ┌──────────────────┐ ┌──────────────────┐ + │ │ Admin: teams = │ │ Non-Admin: │ + │ │ None (bypass) │ │ teams = [] │ + │ │ UNRESTRICTED │ │ PUBLIC-ONLY │ + │ └────────┬─────────┘ └────────┬─────────┘ + │ │ │ + └───────────────┴──────────────────────┘ + │ + ▼ + ┌───────────────────────┐ + │ Apply visibility │ + │ filter to query │ + └───────────────────────┘ +``` + +### Visibility Levels + +Resources in MCP Gateway have three visibility levels: -- Fine-grained roles for create/update/delete vs. read-only per resource type. -- Policy definitions that bind roles to teams and/or individual users. -- UI flows for assigning roles to teams and auditing access. +| Visibility | Description | Who Can See | +|------------|-------------|-------------| +| `public` | Accessible to all authenticated users | Everyone with valid token | +| `team` | Accessible to team members only | Team members + admins (unrestricted) | +| `private` | Accessible to owner only | Resource owner + admins (unrestricted) | + +### Enforcement Points + +Token scoping is enforced consistently across all access paths: + +| Layer | Location | Description | +|-------|----------|-------------| +| Middleware | `token_scoping.py` | Request-level access control | +| REST API | `main.py` | `/tools`, `/resources`, `/prompts` endpoints | +| RPC Handler | `main.py` | `tools/list`, `resources/list`, `prompts/list` | +| MCP Transport | `streamablehttp_transport.py` | Streamable HTTP protocol filtering | +| Service Layer | `*_service.py` | Database query filtering | --- -## Recommended Practices +## Token Types and Use Cases + +### Session Tokens (UI Login) + +Generated when users log in via the Admin UI: + +```json +{ + "sub": "user@example.com", + "is_admin": true, + "iss": "mcpgateway", + "aud": "mcpgateway-api", + "exp": 1234567890 + // Note: No "teams" key for admin users = unrestricted access +} +``` + +**Behavior**: Admin session tokens omit the `teams` claim, granting unrestricted access to all resources. + +### API Tokens (Programmatic Access) + +Generated via the Admin UI or API for automation: + +```json +{ + "sub": "service-account@example.com", + "is_admin": false, + "teams": ["team-uuid-1", "team-uuid-2"], + "iss": "mcpgateway", + "aud": "mcpgateway-api", + "exp": 1234567890 +} +``` + +**Behavior**: Access restricted to public resources plus resources owned by specified teams. + +### Scoped Automation Tokens + +For CI/CD, monitoring, or public API access: + +```json +{ + "sub": "ci-pipeline@example.com", + "is_admin": true, + "teams": [], // Explicitly empty = public-only + "iss": "mcpgateway", + "aud": "mcpgateway-api", + "exp": 1234567890 +} +``` + +**Behavior**: Even admin tokens with `teams: []` are restricted to public resources only. This enables creating limited-scope tokens for automation that shouldn't access team-internal resources. + +--- + +## Generating Scoped Tokens + +### Using the CLI Tool + +```bash +# Unrestricted admin token (no teams key) +python3 -m mcpgateway.utils.create_jwt_token \ + --username admin@example.com \ + --exp 60 \ + --secret $JWT_SECRET_KEY + +# Team-scoped token +python3 -m mcpgateway.utils.create_jwt_token \ + --username user@example.com \ + --exp 60 \ + --secret $JWT_SECRET_KEY \ + --teams '["team-uuid-1"]' + +# Public-only scoped token (for automation) +python3 -m mcpgateway.utils.create_jwt_token \ + --username ci@example.com \ + --exp 60 \ + --secret $JWT_SECRET_KEY \ + --teams '[]' +``` + +### Using the Admin UI + +1. Navigate to **Admin UI → Tokens** +2. Click **Create Token** +3. Select team scope: + - **No team selected**: Public resources only (secure default) + - **Specific team(s)**: Team + public resources +4. Configure additional restrictions (IP, permissions, expiry) + +!!! warning "Token Scope Warning" + Tokens created without selecting a team will have access to **public resources only**. + This is the secure default to prevent accidental exposure of team resources. + +--- + +## Best Practices + +### Token Lifecycle + +1. **Use short expiration times** for interactive sessions (hours) +2. **Use longer expiration** for service accounts with IP restrictions +3. **Rotate tokens regularly** (recommended: 90 days for long-lived tokens) +4. **Revoke tokens immediately** when access should be removed + +### Team Organization + +1. **Create purpose-specific teams**: + - `platform-admins` - Full administrative access + - `developers` - Development and testing resources + - `ci-automation` - CI/CD pipeline access + - `monitoring` - Read-only observability access + +2. **Map SSO groups to teams** for automatic membership management + +3. **Use personal teams** for individual resource ownership + +### Scoping Strategy + +| Use Case | Recommended Token Scope | +|----------|------------------------| +| Admin UI access | Session token (no teams key) | +| CI/CD pipeline | `teams: []` (public-only) | +| Service integration | Specific team(s) | +| Developer access | Personal team + project teams | +| Monitoring/alerting | `teams: []` with read permissions | + +--- + +## Troubleshooting + +### Token Not Seeing Expected Resources + +1. **Check token claims**: Decode the JWT to verify `teams` claim + ```bash + # Decode JWT payload (middle section) + echo "$TOKEN" | cut -d. -f2 | base64 -d | jq . + ``` + +2. **Verify resource visibility**: Check the resource's `visibility` and `team_id` + ```bash + curl -H "Authorization: Bearer $ADMIN_TOKEN" /tools/{id} | jq '{visibility, teamId}' + ``` + +3. **Check user admin status**: Non-admin users without teams get public-only access + +### Admin Token Being Restricted + +If an admin token is unexpectedly restricted: + +1. **Check for explicit `teams` claim**: `teams: []` restricts even admins +2. **Verify `is_admin` flag**: Must be `true` in JWT or database user +3. **Check middleware logs**: Look for "token_teams" in debug output + +### Inconsistent Results Between Endpoints + +If REST and RPC endpoints return different results: -- Start with three tiers of access: - - Admin: full management access - - Maintainer: manage servers, tools, prompts and configurations - - Viewer: read-only access and metrics -- Use SSO group-to-team mappings to automate membership and reduce manual changes. -- Keep virtual servers scoped per project/team so client-facing exposure is intentional. +1. **Check for caching**: REST list endpoints may have cached data +2. **Wait for cache TTL**: Default is 60 seconds for registry cache +3. **Use direct GET**: `/tools/{id}` bypasses list cache --- -## Related +## Related Documentation -- [Team Management](teams.md) -- [Security Features](../architecture/security-features.md) -- [Configuration Reference](configuration.md) +- [Team Management](teams.md) - Setting up teams and SSO mapping +- [Security Features](securing.md) - Comprehensive security configuration +- [Configuration Reference](configuration.md) - Environment variables +- [API Usage](api-usage.md) - Token usage in API calls diff --git a/docs/docs/manage/securing.md b/docs/docs/manage/securing.md index 3add7f51e4..c3b7c79acc 100644 --- a/docs/docs/manage/securing.md +++ b/docs/docs/manage/securing.md @@ -168,6 +168,22 @@ volumes: The gateway supports fine-grained token scoping to restrict token access to specific servers, permissions, IP ranges, and time windows. This provides defense-in-depth security for API access. +!!! tip "Detailed RBAC Documentation" + For comprehensive documentation on token scoping semantics, team-based access control, and visibility filtering, see the [RBAC Configuration Guide](rbac.md). + +#### Team-Based Token Scoping + +Tokens can be scoped to specific teams using the `teams` JWT claim: + +| Token Configuration | Admin User | Non-Admin User | +|---------------------|------------|----------------| +| No `teams` key | Unrestricted | Public-only | +| `teams: null` | Unrestricted | Public-only | +| `teams: []` | Public-only | Public-only | +| `teams: ["team-id"]` | Team + Public | Team + Public | + +**Security Default**: Non-admin tokens without explicit team scope default to public-only access (principle of least privilege). + #### Server-Scoped Tokens Server-scoped tokens are restricted to specific MCP servers and cannot access admin endpoints: diff --git a/mcpgateway/cache/session_registry.py b/mcpgateway/cache/session_registry.py index 730e991201..f4f8579910 100644 --- a/mcpgateway/cache/session_registry.py +++ b/mcpgateway/cache/session_registry.py @@ -1479,15 +1479,17 @@ async def generate_response(self, message: Dict[str, Any], transport: SSETranspo "id": req_id, } # Get the token from the current authentication context - # The user object doesn't contain the token directly, we need to reconstruct it - # Since we don't have access to the original headers here, we need a different approach - # We'll extract the token from the session or create a new admin token + # The user object should contain auth_token, token_teams, and is_admin from the SSE endpoint token = None + token_teams = user.get("token_teams", []) # Default to empty list, never None + is_admin = user.get("is_admin", False) # Preserve admin status from SSE endpoint + try: - if hasattr(user, "get") and "auth_token" in user: + if hasattr(user, "get") and user.get("auth_token"): token = user["auth_token"] else: - # Fallback: create an admin token for internal RPC calls + # Fallback: create token preserving the user's context (including admin status) + logger.warning("No auth token available for SSE RPC call - creating fallback token") now = datetime.now(timezone.utc) payload = { "sub": user.get("email", "system"), @@ -1495,10 +1497,11 @@ async def generate_response(self, message: Dict[str, Any], transport: SSETranspo "aud": settings.jwt_audience, "iat": int(now.timestamp()), "jti": str(uuid.uuid4()), + "teams": token_teams, # Always a list - preserves token scope "user": { "email": user.get("email", "system"), "full_name": user.get("full_name", "System"), - "is_admin": True, # Internal calls should have admin access + "is_admin": is_admin, # Preserve admin status for cookie-authenticated admins "auth_provider": "internal", }, } diff --git a/mcpgateway/main.py b/mcpgateway/main.py index 54a6d30505..451fe462fe 100644 --- a/mcpgateway/main.py +++ b/mcpgateway/main.py @@ -141,7 +141,7 @@ from mcpgateway.utils.redis_client import close_redis_client, get_redis_client from mcpgateway.utils.redis_isready import wait_for_redis_ready from mcpgateway.utils.retry_manager import ResilientHttpClient -from mcpgateway.utils.verify_credentials import require_auth, require_docs_auth_override, verify_jwt_token +from mcpgateway.utils.verify_credentials import require_docs_auth_override, verify_jwt_token from mcpgateway.validation.jsonrpc import JSONRPCError # Import the admin routes from the new module @@ -281,6 +281,158 @@ def get_user_email(user): return str(user) if user else "unknown" +def _normalize_token_teams(teams: Optional[List]) -> List[str]: + """ + Normalize token teams to list of team IDs. + + SSO tokens may contain team dicts like {"id": "...", "name": "..."}. + This normalizes to just IDs for consistent filtering. + + Args: + teams: Raw teams from token payload (may be None, list of IDs, or list of dicts) + + Returns: + List of team ID strings (empty list if None) + + Examples: + >>> from mcpgateway import main + >>> main._normalize_token_teams(None) + [] + >>> main._normalize_token_teams([]) + [] + >>> main._normalize_token_teams(["team_a", "team_b"]) + ['team_a', 'team_b'] + >>> main._normalize_token_teams([{"id": "team_a", "name": "Team A"}]) + ['team_a'] + >>> main._normalize_token_teams([{"id": "t1"}, "t2", {"name": "no_id"}]) + ['t1', 't2'] + """ + if not teams: + return [] + + normalized = [] + for team in teams: + if isinstance(team, dict): + team_id = team.get("id") + if team_id: + normalized.append(team_id) + elif isinstance(team, str): + normalized.append(team) + return normalized + + +def _get_token_teams_from_request(request: Request) -> Optional[List[str]]: + """ + Extract and normalize teams from verified JWT token. + + Uses cached verified payload from request.state to avoid re-decoding. + + Semantics: + - teams key with non-None value -> normalized list (even if empty []) + - teams key absent OR teams: null -> None (unrestricted for admin, public-only for non-admin) + - No JWT payload -> None + + Args: + request: FastAPI request object + + Returns: + List of normalized team IDs if teams key exists with non-None value, + or None if no JWT payload, teams key absent, or teams is null. + Callers use None to determine access: admin gets unrestricted, non-admin gets public-only. + + Examples: + >>> from mcpgateway import main + >>> from unittest.mock import MagicMock + >>> req = MagicMock() + >>> req.state = MagicMock() + >>> req.state._jwt_verified_payload = ("token", {"teams": ["team_a"]}) + >>> main._get_token_teams_from_request(req) + ['team_a'] + >>> req.state._jwt_verified_payload = ("token", {"teams": []}) + >>> main._get_token_teams_from_request(req) + [] + >>> req.state._jwt_verified_payload = ("token", {"sub": "user@example.com"}) + >>> main._get_token_teams_from_request(req) is None # No teams key + True + >>> req.state._jwt_verified_payload = ("token", {"teams": None}) + >>> main._get_token_teams_from_request(req) is None # teams: null + True + >>> req.state._jwt_verified_payload = None + >>> main._get_token_teams_from_request(req) is None # No JWT + True + """ + # Use cached verified payload (set by verify_jwt_token_cached) + cached = getattr(request.state, "_jwt_verified_payload", None) + if cached and isinstance(cached, tuple) and len(cached) == 2: + _, payload = cached + if payload: + # Check if "teams" key exists and is not None + # - Key exists with non-None value (even empty []) -> return normalized list + # - Key absent OR key is None -> return None (unrestricted for admin, public-only for non-admin) + if "teams" in payload and payload.get("teams") is not None: + return _normalize_token_teams(payload.get("teams")) + # No "teams" key or teams is null - treat as unrestricted (None) + return None + + # No JWT payload - return None to trigger DB team lookup + return None + + +def _get_rpc_filter_context(request: Request, user) -> tuple: + """ + Extract user_email, token_teams, and is_admin for RPC filtering. + + Args: + request: FastAPI request object + user: User object from auth dependency + + Returns: + Tuple of (user_email, token_teams, is_admin) + + Examples: + >>> from mcpgateway import main + >>> from unittest.mock import MagicMock + >>> req = MagicMock() + >>> req.state = MagicMock() + >>> req.state._jwt_verified_payload = ("token", {"teams": ["t1"], "is_admin": True}) + >>> user = {"email": "test@x.com", "is_admin": True} # User's is_admin is ignored + >>> email, teams, is_admin = main._get_rpc_filter_context(req, user) + >>> email + 'test@x.com' + >>> teams + ['t1'] + >>> is_admin # From token payload, not user dict + True + """ + # Get user email + if hasattr(user, "email"): + user_email = getattr(user, "email", None) + elif isinstance(user, dict): + user_email = user.get("sub") or user.get("email") + else: + user_email = str(user) if user else None + + # Get normalized teams from verified token + token_teams = _get_token_teams_from_request(request) + + # Check if user is admin - MUST come from token, not DB user + # This ensures that tokens with restricted scope (empty teams) don't inherit admin bypass + is_admin = False + cached = getattr(request.state, "_jwt_verified_payload", None) + if cached and isinstance(cached, tuple) and len(cached) == 2: + _, payload = cached + if payload: + # Check both top-level is_admin and nested user.is_admin in token + is_admin = payload.get("is_admin", False) or payload.get("user", {}).get("is_admin", False) + + # If token has empty teams array (public-only token), admin bypass is disabled + # This allows admins to create properly scoped tokens for restricted access + if token_teams is not None and len(token_teams) == 0: + is_admin = False + + return user_email, token_teams, is_admin + + # Initialize cache resource_cache = ResourceCache(max_size=settings.resource_cache_size, ttl=settings.resource_cache_ttl) @@ -2200,7 +2352,35 @@ async def sse_endpoint(request: Request, server_id: str, user=Depends(get_curren await session_registry.add_session(transport.session_id, transport) response = await transport.create_sse_response(request) - asyncio.create_task(session_registry.respond(server_id, user, session_id=transport.session_id, base_url=base_url)) + # Extract auth token from request (header OR cookie, like get_current_user_with_permissions) + auth_token = None + auth_header = request.headers.get("authorization", "") + if auth_header.lower().startswith("bearer "): + auth_token = auth_header[7:] + elif hasattr(request, "cookies") and request.cookies: + # Cookie auth (admin UI sessions) + auth_token = request.cookies.get("jwt_token") or request.cookies.get("access_token") + + # Extract and normalize token teams + # Returns None if no JWT payload (non-JWT auth), or list if JWT exists + token_teams_or_none = _get_token_teams_from_request(request) + # Coerce to list for downstream consumers that expect a list + token_teams = token_teams_or_none if token_teams_or_none is not None else [] + + # Preserve is_admin from user object (for cookie-authenticated admins) + is_admin = False + if hasattr(user, "is_admin"): + is_admin = getattr(user, "is_admin", False) + elif isinstance(user, dict): + is_admin = user.get("is_admin", False) or user.get("user", {}).get("is_admin", False) + + # Create enriched user dict + user_with_token = dict(user) if isinstance(user, dict) else {"email": getattr(user, "email", str(user))} + user_with_token["auth_token"] = auth_token + user_with_token["token_teams"] = token_teams # Always a list, never None + user_with_token["is_admin"] = is_admin # Preserve admin status for fallback token + + asyncio.create_task(session_registry.respond(server_id, user_with_token, session_id=transport.session_id, base_url=base_url)) tasks = BackgroundTasks() tasks.add_task(session_registry.remove_session, transport.session_id) @@ -2281,6 +2461,7 @@ async def message_endpoint(request: Request, server_id: str, user=Depends(get_cu @server_router.get("/{server_id}/tools", response_model=List[ToolRead]) @require_permission("servers.read") async def server_get_tools( + request: Request, server_id: str, include_inactive: bool = False, include_metrics: bool = False, @@ -2295,6 +2476,7 @@ async def server_get_tools( that have been deactivated but not deleted from the system. Args: + request (Request): FastAPI request object. server_id (str): ID of the server include_inactive (bool): Whether to include inactive tools in the results. include_metrics (bool): Whether to include metrics in the tools results. @@ -2305,13 +2487,22 @@ async def server_get_tools( List[ToolRead]: A list of tool records formatted with by_alias=True. """ logger.debug(f"User: {user} has listed tools for the server_id: {server_id}") - tools = await tool_service.list_server_tools(db, server_id=server_id, include_inactive=include_inactive, include_metrics=include_metrics) + user_email, token_teams, is_admin = _get_rpc_filter_context(request, user) + # Admin bypass - only when token has NO team restrictions (token_teams is None) + # If token has explicit team scope (even empty [] for public-only), respect it + if is_admin and token_teams is None: + user_email = None + token_teams = None # Admin unrestricted + elif token_teams is None: + token_teams = [] # Non-admin without teams = public-only (secure default) + tools = await tool_service.list_server_tools(db, server_id=server_id, include_inactive=include_inactive, include_metrics=include_metrics, user_email=user_email, token_teams=token_teams) return [tool.model_dump(by_alias=True) for tool in tools] @server_router.get("/{server_id}/resources", response_model=List[ResourceRead]) @require_permission("servers.read") async def server_get_resources( + request: Request, server_id: str, include_inactive: bool = False, db: Session = Depends(get_db), @@ -2325,6 +2516,7 @@ async def server_get_resources( to view or manage resources that have been deactivated but not deleted. Args: + request (Request): FastAPI request object. server_id (str): ID of the server include_inactive (bool): Whether to include inactive resources in the results. db (Session): Database session dependency. @@ -2334,13 +2526,22 @@ async def server_get_resources( List[ResourceRead]: A list of resource records formatted with by_alias=True. """ logger.debug(f"User: {user} has listed resources for the server_id: {server_id}") - resources = await resource_service.list_server_resources(db, server_id=server_id, include_inactive=include_inactive) + user_email, token_teams, is_admin = _get_rpc_filter_context(request, user) + # Admin bypass - only when token has NO team restrictions (token_teams is None) + # If token has explicit team scope (even empty [] for public-only), respect it + if is_admin and token_teams is None: + user_email = None + token_teams = None # Admin unrestricted + elif token_teams is None: + token_teams = [] # Non-admin without teams = public-only (secure default) + resources = await resource_service.list_server_resources(db, server_id=server_id, include_inactive=include_inactive, user_email=user_email, token_teams=token_teams) return [resource.model_dump(by_alias=True) for resource in resources] @server_router.get("/{server_id}/prompts", response_model=List[PromptRead]) @require_permission("servers.read") async def server_get_prompts( + request: Request, server_id: str, include_inactive: bool = False, db: Session = Depends(get_db), @@ -2354,6 +2555,7 @@ async def server_get_prompts( prompts that have been deactivated but not deleted from the system. Args: + request (Request): FastAPI request object. server_id (str): ID of the server include_inactive (bool): Whether to include inactive prompts in the results. db (Session): Database session dependency. @@ -2363,7 +2565,15 @@ async def server_get_prompts( List[PromptRead]: A list of prompt records formatted with by_alias=True. """ logger.debug(f"User: {user} has listed prompts for the server_id: {server_id}") - prompts = await prompt_service.list_server_prompts(db, server_id=server_id, include_inactive=include_inactive) + user_email, token_teams, is_admin = _get_rpc_filter_context(request, user) + # Admin bypass - only when token has NO team restrictions (token_teams is None) + # If token has explicit team scope (even empty [] for public-only), respect it + if is_admin and token_teams is None: + user_email = None + token_teams = None # Admin unrestricted + elif token_teams is None: + token_teams = [] # Non-admin without teams = public-only (secure default) + prompts = await prompt_service.list_server_prompts(db, server_id=server_id, include_inactive=include_inactive, user_email=user_email, token_teams=token_teams) return [prompt.model_dump(by_alias=True) for prompt in prompts] @@ -2775,24 +2985,36 @@ async def list_tools( if tags: tags_list = [tag.strip() for tag in tags.split(",") if tag.strip()] - # Get user email for team filtering - user_email = get_user_email(user) + # Get filtering context from token (respects token scope) + user_email, token_teams, is_admin = _get_rpc_filter_context(request, user) - # Check team_id from token as well + # Admin bypass - only when token has NO team restrictions (token_teams is None) + # If token has explicit team scope (even for admins), respect it for least-privilege + if is_admin and token_teams is None: + user_email = None + token_teams = None # Admin unrestricted + elif token_teams is None: + token_teams = [] # Non-admin without teams = public-only (secure default) + + # Check team_id from request.state (set during auth) + # Only use for non-empty-team tokens; empty-team tokens should rely on visibility filtering token_team_id = getattr(request.state, "team_id", None) + is_empty_team_token = token_teams is not None and len(token_teams) == 0 - # Check for team ID mismatch - if team_id is not None and token_team_id is not None and team_id != token_team_id: + # Check for team ID mismatch (only applies when both are specified and token has teams) + if team_id is not None and token_team_id is not None and team_id != token_team_id and not is_empty_team_token: return ORJSONResponse( content={"message": "Access issue: This API token does not have the required permissions for this team."}, status_code=status.HTTP_403_FORBIDDEN, ) - # Determine final team ID - team_id = team_id or token_team_id + # Determine final team ID - don't use token_team_id for empty-team tokens + # Empty-team tokens should filter by public + owned, not by personal team + if not is_empty_team_token: + team_id = team_id or token_team_id - # Use unified list_tools() with optional team filtering - # When team_id or visibility is specified, user_email enables team-based access control + # Use unified list_tools() with token-based team filtering + # Always apply visibility filtering based on token scope data, next_cursor = await tool_service.list_tools( db=db, cursor=cursor, @@ -2800,9 +3022,10 @@ async def list_tools( tags=tags_list, gateway_id=gateway_id, limit=limit, - user_email=user_email if (team_id or visibility) else None, + user_email=user_email, team_id=team_id, visibility=visibility, + token_teams=token_teams, ) if apijsonpath is None: @@ -3176,24 +3399,37 @@ async def list_resources( tags_list = None if tags: tags_list = [tag.strip() for tag in tags.split(",") if tag.strip()] - # Get user email for team filtering - user_email = get_user_email(user) - # Check team_id from token as well + # Get filtering context from token (respects token scope) + user_email, token_teams, is_admin = _get_rpc_filter_context(request, user) + + # Admin bypass - only when token has NO team restrictions (token_teams is None) + # If token has explicit team scope (even for admins), respect it for least-privilege + if is_admin and token_teams is None: + user_email = None + token_teams = None # Admin unrestricted + elif token_teams is None: + token_teams = [] # Non-admin without teams = public-only (secure default) + + # Check team_id from request.state (set during auth) + # Only use for non-empty-team tokens; empty-team tokens should rely on visibility filtering token_team_id = getattr(request.state, "team_id", None) + is_empty_team_token = token_teams is not None and len(token_teams) == 0 - # Check for team ID mismatch - if team_id is not None and token_team_id is not None and team_id != token_team_id: + # Check for team ID mismatch (only applies when both are specified and token has teams) + if team_id is not None and token_team_id is not None and team_id != token_team_id and not is_empty_team_token: return ORJSONResponse( content={"message": "Access issue: This API token does not have the required permissions for this team."}, status_code=status.HTTP_403_FORBIDDEN, ) - # Determine final team ID - team_id = team_id or token_team_id + # Determine final team ID - don't use token_team_id for empty-team tokens + # Empty-team tokens should filter by public + owned, not by personal team + if not is_empty_team_token: + team_id = team_id or token_team_id - # Use unified list_resources() with optional team filtering - # When team_id or visibility is specified, user_email enables team-based access control + # Use unified list_resources() with token-based team filtering + # Always apply visibility filtering based on token scope logger.debug(f"User {user_email} requested resource list with cursor {cursor}, include_inactive={include_inactive}, tags={tags_list}, team_id={team_id}, visibility={visibility}") data, next_cursor = await resource_service.list_resources( db=db, @@ -3201,9 +3437,10 @@ async def list_resources( limit=limit, include_inactive=include_inactive, tags=tags_list, - user_email=user_email if (team_id or visibility) else None, + user_email=user_email, team_id=team_id, visibility=visibility, + token_teams=token_teams, ) if include_pagination: @@ -3553,23 +3790,37 @@ async def list_prompts( tags_list = None if tags: tags_list = [tag.strip() for tag in tags.split(",") if tag.strip()] - # Get user email for team filtering - user_email = get_user_email(user) - # Check team_id from token as well + # Get filtering context from token (respects token scope) + user_email, token_teams, is_admin = _get_rpc_filter_context(request, user) + + # Admin bypass - only when token has NO team restrictions (token_teams is None) + # If token has explicit team scope (even for admins), respect it for least-privilege + if is_admin and token_teams is None: + user_email = None + token_teams = None # Admin unrestricted + elif token_teams is None: + token_teams = [] # Non-admin without teams = public-only (secure default) + + # Check team_id from request.state (set during auth) + # Only use for non-empty-team tokens; empty-team tokens should rely on visibility filtering token_team_id = getattr(request.state, "team_id", None) + is_empty_team_token = token_teams is not None and len(token_teams) == 0 - # Check for team ID mismatch - if team_id is not None and token_team_id is not None and team_id != token_team_id: + # Check for team ID mismatch (only applies when both are specified and token has teams) + if team_id is not None and token_team_id is not None and team_id != token_team_id and not is_empty_team_token: return ORJSONResponse( content={"message": "Access issue: This API token does not have the required permissions for this team."}, status_code=status.HTTP_403_FORBIDDEN, ) - # Determine final team ID - team_id = team_id or token_team_id + # Determine final team ID - don't use token_team_id for empty-team tokens + # Empty-team tokens should filter by public + owned, not by personal team + if not is_empty_team_token: + team_id = team_id or token_team_id - # Use consolidated prompt listing with optional team filtering + # Use consolidated prompt listing with token-based team filtering + # Always apply visibility filtering based on token scope logger.debug(f"User: {user_email} requested prompt list with include_inactive={include_inactive}, cursor={cursor}, tags={tags_list}, team_id={team_id}, visibility={visibility}") data, next_cursor = await prompt_service.list_prompts( db=db, @@ -3577,9 +3828,10 @@ async def list_prompts( limit=limit, include_inactive=include_inactive, tags=tags_list, - user_email=user_email if (team_id or visibility) else None, + user_email=user_email, team_id=team_id, visibility=visibility, + token_teams=token_teams, ) if include_pagination: @@ -4289,7 +4541,7 @@ async def generate_events(): ################## @utility_router.post("/rpc/") @utility_router.post("/rpc") -async def handle_rpc(request: Request, db: Session = Depends(get_db), user=Depends(require_auth)): +async def handle_rpc(request: Request, db: Session = Depends(get_db), user=Depends(get_current_user_with_permissions)): """Handle RPC requests. Args: @@ -4333,20 +4585,35 @@ async def handle_rpc(request: Request, db: Session = Depends(get_db), user=Depen if hasattr(result, "model_dump"): result = result.model_dump(by_alias=True, exclude_none=True) elif method == "tools/list": + user_email, token_teams, is_admin = _get_rpc_filter_context(request, user) + # Admin bypass - only when token has NO team restrictions + if is_admin and token_teams is None: + user_email = None + token_teams = None # Admin unrestricted + elif token_teams is None: + token_teams = [] # Non-admin without teams = public-only (secure default) if server_id: - tools = await tool_service.list_server_tools(db, server_id, cursor=cursor) + tools = await tool_service.list_server_tools(db, server_id, cursor=cursor, user_email=user_email, token_teams=token_teams) result = {"tools": [t.model_dump(by_alias=True, exclude_none=True) for t in tools]} else: - tools, next_cursor = await tool_service.list_tools(db, cursor=cursor, limit=0) + tools, next_cursor = await tool_service.list_tools(db, cursor=cursor, limit=0, user_email=user_email, token_teams=token_teams) result = {"tools": [t.model_dump(by_alias=True, exclude_none=True) for t in tools]} if next_cursor: result["nextCursor"] = next_cursor elif method == "list_tools": # Legacy endpoint + user_email, token_teams, is_admin = _get_rpc_filter_context(request, user) + # Admin bypass - only when token has NO team restrictions (token_teams is None) + # If token has explicit team scope (even empty [] for public-only), respect it + if is_admin and token_teams is None: + user_email = None + token_teams = None # Admin unrestricted + elif token_teams is None: + token_teams = [] # Non-admin without teams = public-only (secure default) if server_id: - tools = await tool_service.list_server_tools(db, server_id, cursor=cursor) + tools = await tool_service.list_server_tools(db, server_id, cursor=cursor, user_email=user_email, token_teams=token_teams) result = {"tools": [t.model_dump(by_alias=True, exclude_none=True) for t in tools]} else: - tools, next_cursor = await tool_service.list_tools(db, cursor=cursor, limit=0) + tools, next_cursor = await tool_service.list_tools(db, cursor=cursor, limit=0, user_email=user_email, token_teams=token_teams) result = {"tools": [t.model_dump(by_alias=True, exclude_none=True) for t in tools]} if next_cursor: result["nextCursor"] = next_cursor @@ -4357,11 +4624,18 @@ async def handle_rpc(request: Request, db: Session = Depends(get_db), user=Depen roots = await root_service.list_roots() result = {"roots": [r.model_dump(by_alias=True, exclude_none=True) for r in roots]} elif method == "resources/list": + user_email, token_teams, is_admin = _get_rpc_filter_context(request, user) + # Admin bypass - only when token has NO team restrictions + if is_admin and token_teams is None: + user_email = None + token_teams = None # Admin unrestricted + elif token_teams is None: + token_teams = [] # Non-admin without teams = public-only (secure default) if server_id: - resources = await resource_service.list_server_resources(db, server_id) + resources = await resource_service.list_server_resources(db, server_id, user_email=user_email, token_teams=token_teams) result = {"resources": [r.model_dump(by_alias=True, exclude_none=True) for r in resources]} else: - resources, next_cursor = await resource_service.list_resources(db, cursor=cursor, limit=0) + resources, next_cursor = await resource_service.list_resources(db, cursor=cursor, limit=0, user_email=user_email, token_teams=token_teams) result = {"resources": [r.model_dump(by_alias=True, exclude_none=True) for r in resources]} if next_cursor: result["nextCursor"] = next_cursor @@ -4414,11 +4688,18 @@ async def handle_rpc(request: Request, db: Session = Depends(get_db), user=Depen await resource_service.unsubscribe_resource(db, subscription) result = {} elif method == "prompts/list": + user_email, token_teams, is_admin = _get_rpc_filter_context(request, user) + # Admin bypass - only when token has NO team restrictions + if is_admin and token_teams is None: + user_email = None + token_teams = None # Admin unrestricted + elif token_teams is None: + token_teams = [] # Non-admin without teams = public-only (secure default) if server_id: - prompts = await prompt_service.list_server_prompts(db, server_id, cursor=cursor) + prompts = await prompt_service.list_server_prompts(db, server_id, cursor=cursor, user_email=user_email, token_teams=token_teams) result = {"prompts": [p.model_dump(by_alias=True, exclude_none=True) for p in prompts]} else: - prompts, next_cursor = await prompt_service.list_prompts(db, cursor=cursor, limit=0) + prompts, next_cursor = await prompt_service.list_prompts(db, cursor=cursor, limit=0, user_email=user_email, token_teams=token_teams) result = {"prompts": [p.model_dump(by_alias=True, exclude_none=True) for p in prompts]} if next_cursor: result["nextCursor"] = next_cursor @@ -4755,7 +5036,35 @@ async def utility_sse_endpoint(request: Request, user=Depends(get_current_user_w await transport.connect() await session_registry.add_session(transport.session_id, transport) - asyncio.create_task(session_registry.respond(None, user, session_id=transport.session_id, base_url=base_url)) + # Extract auth token from request (header OR cookie, like get_current_user_with_permissions) + auth_token = None + auth_header = request.headers.get("authorization", "") + if auth_header.lower().startswith("bearer "): + auth_token = auth_header[7:] + elif hasattr(request, "cookies") and request.cookies: + # Cookie auth (admin UI sessions) + auth_token = request.cookies.get("jwt_token") or request.cookies.get("access_token") + + # Extract and normalize token teams + # Returns None if no JWT payload (non-JWT auth), or list if JWT exists + token_teams_or_none = _get_token_teams_from_request(request) + # Coerce to list for downstream consumers that expect a list + token_teams = token_teams_or_none if token_teams_or_none is not None else [] + + # Preserve is_admin from user object (for cookie-authenticated admins) + is_admin = False + if hasattr(user, "is_admin"): + is_admin = getattr(user, "is_admin", False) + elif isinstance(user, dict): + is_admin = user.get("is_admin", False) or user.get("user", {}).get("is_admin", False) + + # Create enriched user dict + user_with_token = dict(user) if isinstance(user, dict) else {"email": getattr(user, "email", str(user))} + user_with_token["auth_token"] = auth_token + user_with_token["token_teams"] = token_teams # Always a list, never None + user_with_token["is_admin"] = is_admin # Preserve admin status for fallback token + + asyncio.create_task(session_registry.respond(None, user_with_token, session_id=transport.session_id, base_url=base_url)) response = await transport.create_sse_response(request) tasks = BackgroundTasks() diff --git a/mcpgateway/middleware/token_scoping.py b/mcpgateway/middleware/token_scoping.py index 61ef46d774..1d166924dc 100644 --- a/mcpgateway/middleware/token_scoping.py +++ b/mcpgateway/middleware/token_scoping.py @@ -104,6 +104,32 @@ def __init__(self): True """ + def _normalize_teams(self, teams) -> list: + """Normalize teams from token payload to list of team IDs. + + Handles various team formats: + - None -> [] + - List of strings -> as-is + - List of dicts with 'id' key -> extract IDs + + Args: + teams: Raw teams value from JWT payload + + Returns: + List of team ID strings + """ + if not teams: + return [] + normalized = [] + for team in teams: + if isinstance(team, dict): + team_id = team.get("id") + if team_id: + normalized.append(team_id) + elif isinstance(team, str): + normalized.append(team) + return normalized + async def _extract_token_scopes(self, request: Request) -> Optional[dict]: """Extract token scopes from JWT in request. @@ -441,7 +467,7 @@ def _check_team_membership(self, payload: dict, db=None) -> bool: finally: db.close() - def _check_resource_team_ownership(self, request_path: str, token_teams: list, db=None) -> bool: # pylint: disable=too-many-return-statements + def _check_resource_team_ownership(self, request_path: str, token_teams: list, db=None, _user_email: str = None) -> bool: # pylint: disable=too-many-return-statements """ Check if the requested resource is accessible by the token. @@ -451,7 +477,7 @@ def _check_resource_team_ownership(self, request_path: str, token_teams: list, d - PRIVATE: Accessible only by tokens scoped to that specific team Token Access Rules: - - Public-only tokens (empty token_teams): Can ONLY access public resources + - Public-only tokens (empty token_teams): Can access public resources + their own resources - Team-scoped tokens: Can access their team's resources + public resources Handles URLs like: @@ -535,7 +561,8 @@ def _check_resource_team_ownership(self, request_path: str, token_teams: list, d logger.debug(f"Access granted: Server {resource_id} is PUBLIC") return True - # PUBLIC-ONLY TOKEN: Can ONLY access public servers + # PUBLIC-ONLY TOKEN: Can ONLY access public servers (strict public-only policy) + # No owner access - if user needs own resources, use a personal team-scoped token if is_public_token: logger.warning(f"Access denied: Public-only token cannot access {server_visibility} server {resource_id}") return False @@ -578,7 +605,8 @@ def _check_resource_team_ownership(self, request_path: str, token_teams: list, d logger.debug(f"Access granted: Tool {resource_id} is PUBLIC") return True - # PUBLIC-ONLY TOKEN: Can ONLY access public tools + # PUBLIC-ONLY TOKEN: Can ONLY access public tools (strict public-only policy) + # No owner access - if user needs own resources, use a personal team-scoped token if is_public_token: logger.warning(f"Access denied: Public-only token cannot access {tool_visibility} tool {resource_id}") return False @@ -623,7 +651,8 @@ def _check_resource_team_ownership(self, request_path: str, token_teams: list, d logger.debug(f"Access granted: Resource {resource_id} is PUBLIC") return True - # PUBLIC-ONLY TOKEN: Can ONLY access public resources + # PUBLIC-ONLY TOKEN: Can ONLY access public resources (strict public-only policy) + # No owner access - if user needs own resources, use a personal team-scoped token if is_public_token: logger.warning(f"Access denied: Public-only token cannot access {resource_visibility} resource {resource_id}") return False @@ -668,7 +697,8 @@ def _check_resource_team_ownership(self, request_path: str, token_teams: list, d logger.debug(f"Access granted: Prompt {resource_id} is PUBLIC") return True - # PUBLIC-ONLY TOKEN: Can ONLY access public prompts + # PUBLIC-ONLY TOKEN: Can ONLY access public prompts (strict public-only policy) + # No owner access - if user needs own resources, use a personal team-scoped token if is_public_token: logger.warning(f"Access denied: Public-only token cannot access {prompt_visibility} prompt {resource_id}") return False @@ -755,10 +785,28 @@ async def __call__(self, request: Request, call_next): # TEAM VALIDATION: Use single DB session for both team checks # This reduces connection pool overhead from 2 sessions to 1 for resource endpoints - token_teams = payload.get("teams", []) - needs_db_session = bool(token_teams) # Only need DB if token has teams - - if needs_db_session: + user_email = payload.get("sub") or payload.get("email") # Extract user email for ownership check + is_admin = payload.get("is_admin", False) or payload.get("user", {}).get("is_admin", False) + + # Determine token_teams based on whether "teams" key exists and is not None + # - Key absent OR null + admin = None (unrestricted bypass) + # - Key absent OR null + non-admin = [] (public-only, secure default) + # - Key present with non-None value = normalize the value + teams_value = payload.get("teams") if "teams" in payload else None + if teams_value is not None: + token_teams = self._normalize_teams(teams_value) + elif is_admin: + # Admin without teams key (or teams: null) = unrestricted (skip team checks) + token_teams = None + else: + # Non-admin without teams key (or teams: null) = public-only (secure default) + token_teams = [] + + # Admin with no team restrictions bypasses team validation entirely + if is_admin and token_teams is None: + logger.debug(f"Admin bypass: skipping team validation for {user_email}") + # Skip to other checks (server_id, IP, etc.) + elif token_teams: # First-Party from mcpgateway.db import get_db # pylint: disable=import-outside-toplevel @@ -770,7 +818,7 @@ async def __call__(self, request: Request, call_next): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token is invalid: User is no longer a member of the associated team") # Check resource team ownership with shared session - if not self._check_resource_team_ownership(request.url.path, token_teams, db=db): + if not self._check_resource_team_ownership(request.url.path, token_teams, db=db, _user_email=user_email): logger.warning(f"Access denied: Resource does not belong to token's teams {token_teams}") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied: You do not have permission to access this resource using the current token") finally: @@ -785,7 +833,7 @@ async def __call__(self, request: Request, call_next): logger.warning("Token rejected: User no longer member of associated team(s)") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token is invalid: User is no longer a member of the associated team") - if not self._check_resource_team_ownership(request.url.path, token_teams): + if not self._check_resource_team_ownership(request.url.path, token_teams, _user_email=user_email): logger.warning(f"Access denied: Resource does not belong to token's teams {token_teams}") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied: You do not have permission to access this resource using the current token") diff --git a/mcpgateway/routers/email_auth.py b/mcpgateway/routers/email_auth.py index a5b70f06de..b493b8fcf7 100644 --- a/mcpgateway/routers/email_auth.py +++ b/mcpgateway/routers/email_auth.py @@ -145,17 +145,20 @@ async def create_access_token(user: EmailUser, token_scopes: Optional[dict] = No "is_admin": user.is_admin, "auth_provider": user.auth_provider, }, - # Team memberships for authorization - "teams": [ - {"id": team.id, "name": team.name, "slug": team.slug, "is_personal": team.is_personal, "role": next((m.role for m in user.team_memberships if m.team_id == team.id), "member")} - for team in teams - ], # Namespace access (backwards compatible) "namespaces": [f"user:{user.email}", *[f"team:{team.slug}" for team in teams], "public"], # Token scoping (if provided) "scopes": token_scopes or {"server_id": None, "permissions": ["*"], "ip_restrictions": [], "time_restrictions": {}}, # Full access for regular user tokens } + # For admin users: omit "teams" key entirely to enable unrestricted access bypass + # For regular users: include teams for proper team-based scoping + if not user.is_admin: + payload["teams"] = [ + {"id": team.id, "name": team.name, "slug": team.slug, "is_personal": team.is_personal, "role": next((m.role for m in user.team_memberships if m.team_id == team.id), "member")} + for team in teams + ] + # Generate token using centralized token creation token = await create_jwt_token(payload) diff --git a/mcpgateway/schemas.py b/mcpgateway/schemas.py index 9f83497818..716fe20928 100644 --- a/mcpgateway/schemas.py +++ b/mcpgateway/schemas.py @@ -5619,6 +5619,90 @@ class TokenScopeRequest(BaseModel): time_restrictions: Dict[str, Any] = Field(default_factory=dict, description="Time-based restrictions") usage_limits: Dict[str, Any] = Field(default_factory=dict, description="Usage limits and quotas") + @field_validator("ip_restrictions") + @classmethod + def validate_ip_restrictions(cls, v: List[str]) -> List[str]: + """Validate IP addresses and CIDR notation. + + Args: + v: List of IP address or CIDR strings to validate. + + Returns: + List of validated IP/CIDR strings with whitespace stripped. + + Raises: + ValueError: If any IP address or CIDR notation is invalid. + + Examples: + >>> TokenScopeRequest.validate_ip_restrictions(["192.168.1.0/24"]) + ['192.168.1.0/24'] + >>> TokenScopeRequest.validate_ip_restrictions(["10.0.0.1"]) + ['10.0.0.1'] + """ + import ipaddress # pylint: disable=import-outside-toplevel + + if not v: + return v + + validated = [] + for ip_str in v: + ip_str = ip_str.strip() + if not ip_str: + continue + try: + # Try parsing as network (CIDR notation) + if "/" in ip_str: + ipaddress.ip_network(ip_str, strict=False) + else: + # Try parsing as single IP address + ipaddress.ip_address(ip_str) + validated.append(ip_str) + except ValueError as e: + raise ValueError(f"Invalid IP address or CIDR notation '{ip_str}': {e}") from e + return validated + + @field_validator("permissions") + @classmethod + def validate_permissions(cls, v: List[str]) -> List[str]: + """Validate permission scope format. + + Permissions must be in format 'resource.action' or wildcard '*'. + + Args: + v: List of permission strings to validate. + + Returns: + List of validated permission strings with whitespace stripped. + + Raises: + ValueError: If any permission does not match 'resource.action' format or '*'. + + Examples: + >>> TokenScopeRequest.validate_permissions(["tools.read", "resources.write"]) + ['tools.read', 'resources.write'] + >>> TokenScopeRequest.validate_permissions(["*"]) + ['*'] + """ + if not v: + return v + + # Permission pattern: resource.action (alphanumeric with underscores) + permission_pattern = re.compile(r"^[a-zA-Z][a-zA-Z0-9_]*\.[a-zA-Z][a-zA-Z0-9_]*$") + + validated = [] + for perm in v: + perm = perm.strip() + if not perm: + continue + # Allow wildcard + if perm == "*": + validated.append(perm) + continue + if not permission_pattern.match(perm): + raise ValueError(f"Invalid permission format '{perm}'. Use 'resource.action' format (e.g., 'tools.read') or '*' for full access") + validated.append(perm) + return validated + class TokenCreateRequest(BaseModel): """Schema for creating a new API token. diff --git a/mcpgateway/services/prompt_service.py b/mcpgateway/services/prompt_service.py index 762ac14acb..88da1613a4 100644 --- a/mcpgateway/services/prompt_service.py +++ b/mcpgateway/services/prompt_service.py @@ -917,6 +917,7 @@ async def list_prompts( user_email: Optional[str] = None, team_id: Optional[str] = None, visibility: Optional[str] = None, + token_teams: Optional[List[str]] = None, ) -> Union[tuple[List[PromptRead], Optional[str]], Dict[str, Any]]: """ Retrieve a list of prompt templates from the database with pagination support. @@ -939,6 +940,8 @@ async def list_prompts( user_email (Optional[str]): User email for team-based access control. If None, no access control is applied. team_id (Optional[str]): Filter by specific team ID. Requires user_email for access validation. visibility (Optional[str]): Filter by visibility (private, team, public). + token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API token access + where the token scope should be respected instead of the user's full team memberships. Returns: If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}} @@ -976,9 +979,17 @@ async def list_prompts( # Apply team-based access control if user_email is provided if user_email: - team_service = TeamManagementService(db) - user_teams = await team_service.get_user_teams(user_email) - team_ids = [team.id for team in user_teams] + # Use token_teams if provided (for MCP/API token access), otherwise look up from DB + if token_teams is not None: + team_ids = token_teams + else: + team_service = TeamManagementService(db) + user_teams = await team_service.get_user_teams(user_email) + team_ids = [team.id for team in user_teams] + + # Check if this is a public-only token (empty teams array) + # Public-only tokens can ONLY see public resources - no owner access + is_public_only_token = token_teams is not None and len(token_teams) == 0 if team_id: # User requesting specific team - verify access @@ -986,15 +997,19 @@ async def list_prompts( return ([], None) access_conditions = [ and_(DbPrompt.team_id == team_id, DbPrompt.visibility.in_(["team", "public"])), - and_(DbPrompt.team_id == team_id, DbPrompt.owner_email == user_email), ] + # Only include owner access for non-public-only tokens + if not is_public_only_token: + access_conditions.append(and_(DbPrompt.team_id == team_id, DbPrompt.owner_email == user_email)) query = query.where(or_(*access_conditions)) else: - # General access: user's prompts + public prompts + team prompts + # General access: public prompts + team prompts (+ owner prompts if not public-only token) access_conditions = [ - DbPrompt.owner_email == user_email, DbPrompt.visibility == "public", ] + # Only include owner access for non-public-only tokens + if not is_public_only_token: + access_conditions.append(DbPrompt.owner_email == user_email) if team_ids: access_conditions.append(and_(DbPrompt.team_id.in_(team_ids), DbPrompt.visibility.in_(["team", "public"]))) query = query.where(or_(*access_conditions)) @@ -1147,7 +1162,15 @@ async def list_prompts_for_user( result.append(self.convert_prompt_to_read(t, include_metrics=False)) return result - async def list_server_prompts(self, db: Session, server_id: str, include_inactive: bool = False, cursor: Optional[str] = None) -> List[PromptRead]: + async def list_server_prompts( + self, + db: Session, + server_id: str, + include_inactive: bool = False, + cursor: Optional[str] = None, + user_email: Optional[str] = None, + token_teams: Optional[List[str]] = None, + ) -> List[PromptRead]: """ Retrieve a list of prompt templates from the database. @@ -1163,6 +1186,9 @@ async def list_server_prompts(self, db: Session, server_id: str, include_inactiv Defaults to False. cursor (Optional[str], optional): An opaque cursor token for pagination. Currently, this parameter is ignored. Defaults to None. + user_email (Optional[str]): User email for visibility filtering. If None, no filtering applied. + token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API + token access where the token scope should be respected. Returns: List[PromptRead]: A list of prompt templates represented as PromptRead objects. @@ -1190,6 +1216,31 @@ async def list_server_prompts(self, db: Session, server_id: str, include_inactiv ) if not include_inactive: query = query.where(DbPrompt.enabled) + + # Add visibility filtering if user context provided + if user_email: + # Use token_teams if provided (for MCP/API token access), otherwise look up from DB + if token_teams is not None: + team_ids = token_teams + else: + team_service = TeamManagementService(db) + user_teams = await team_service.get_user_teams(user_email) + team_ids = [team.id for team in user_teams] + + # Check if this is a public-only token (empty teams array) + # Public-only tokens can ONLY see public resources - no owner access + is_public_only_token = token_teams is not None and len(token_teams) == 0 + + access_conditions = [ + DbPrompt.visibility == "public", + ] + # Only include owner access for non-public-only tokens + if not is_public_only_token: + access_conditions.append(DbPrompt.owner_email == user_email) + if team_ids: + access_conditions.append(and_(DbPrompt.team_id.in_(team_ids), DbPrompt.visibility.in_(["team", "public"]))) + query = query.where(or_(*access_conditions)) + # Cursor-based pagination logic can be implemented here in the future. logger.debug(cursor) prompts = db.execute(query).scalars().all() diff --git a/mcpgateway/services/resource_service.py b/mcpgateway/services/resource_service.py index 6c57c36159..7b27998bc2 100644 --- a/mcpgateway/services/resource_service.py +++ b/mcpgateway/services/resource_service.py @@ -822,6 +822,7 @@ async def list_resources( user_email: Optional[str] = None, team_id: Optional[str] = None, visibility: Optional[str] = None, + token_teams: Optional[List[str]] = None, ) -> Union[tuple[List[ResourceRead], Optional[str]], Dict[str, Any]]: """ Retrieve a list of registered resources from the database with pagination support. @@ -844,6 +845,8 @@ async def list_resources( user_email (Optional[str]): User email for team-based access control. If None, no access control is applied. team_id (Optional[str]): Filter by specific team ID. Requires user_email for access validation. visibility (Optional[str]): Filter by visibility (private, team, public). + token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API token access + where the token scope should be respected instead of the user's full team memberships. Returns: If page is provided: Dict with {"data": [...], "pagination": {...}, "links": {...}} @@ -893,12 +896,20 @@ async def list_resources( # Apply team-based access control if user_email is provided if user_email: - # First-Party - from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel + # Use token_teams if provided (for MCP/API token access), otherwise look up from DB + if token_teams is not None: + team_ids = token_teams + else: + # First-Party + from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel + + team_service = TeamManagementService(db) + user_teams = await team_service.get_user_teams(user_email) + team_ids = [team.id for team in user_teams] - team_service = TeamManagementService(db) - user_teams = await team_service.get_user_teams(user_email) - team_ids = [team.id for team in user_teams] + # Check if this is a public-only token (empty teams array) + # Public-only tokens can ONLY see public resources - no owner access + is_public_only_token = token_teams is not None and len(token_teams) == 0 if team_id: # User requesting specific team - verify access @@ -907,15 +918,19 @@ async def list_resources( access_conditions = [ and_(DbResource.team_id == team_id, DbResource.visibility.in_(["team", "public"])), - and_(DbResource.team_id == team_id, DbResource.owner_email == user_email), ] + # Only include owner access for non-public-only tokens + if not is_public_only_token: + access_conditions.append(and_(DbResource.team_id == team_id, DbResource.owner_email == user_email)) query = query.where(or_(*access_conditions)) else: - # General access: user's resources + public resources + team resources + # General access: public resources + team resources (+ owner resources if not public-only token) access_conditions = [ - DbResource.owner_email == user_email, DbResource.visibility == "public", ] + # Only include owner access for non-public-only tokens + if not is_public_only_token: + access_conditions.append(DbResource.owner_email == user_email) if team_ids: access_conditions.append(and_(DbResource.team_id.in_(team_ids), DbResource.visibility.in_(["team", "public"]))) @@ -1103,7 +1118,14 @@ async def list_resources_for_user( result.append(self.convert_resource_to_read(t, include_metrics=False)) return result - async def list_server_resources(self, db: Session, server_id: str, include_inactive: bool = False) -> List[ResourceRead]: + async def list_server_resources( + self, + db: Session, + server_id: str, + include_inactive: bool = False, + user_email: Optional[str] = None, + token_teams: Optional[List[str]] = None, + ) -> List[ResourceRead]: """ Retrieve a list of registered resources from the database. @@ -1117,6 +1139,9 @@ async def list_server_resources(self, db: Session, server_id: str, include_inact server_id (str): Server ID include_inactive (bool): If True, include inactive resources in the result. Defaults to False. + user_email (Optional[str]): User email for visibility filtering. If None, no filtering applied. + token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API + token access where the token scope should be respected. Returns: List[ResourceRead]: A list of resources represented as ResourceRead objects. @@ -1147,6 +1172,33 @@ async def list_server_resources(self, db: Session, server_id: str, include_inact ) if not include_inactive: query = query.where(DbResource.enabled) + + # Add visibility filtering if user context provided + if user_email: + # Use token_teams if provided (for MCP/API token access), otherwise look up from DB + if token_teams is not None: + team_ids = token_teams + else: + from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel + + team_service = TeamManagementService(db) + user_teams = await team_service.get_user_teams(user_email) + team_ids = [team.id for team in user_teams] + + # Check if this is a public-only token (empty teams array) + # Public-only tokens can ONLY see public resources - no owner access + is_public_only_token = token_teams is not None and len(token_teams) == 0 + + access_conditions = [ + DbResource.visibility == "public", + ] + # Only include owner access for non-public-only tokens + if not is_public_only_token: + access_conditions.append(DbResource.owner_email == user_email) + if team_ids: + access_conditions.append(and_(DbResource.team_id.in_(team_ids), DbResource.visibility.in_(["team", "public"]))) + query = query.where(or_(*access_conditions)) + # Cursor-based pagination logic can be implemented here in the future. resources = db.execute(query).scalars().all() diff --git a/mcpgateway/services/tool_service.py b/mcpgateway/services/tool_service.py index 2377b06d2a..34a54fe300 100644 --- a/mcpgateway/services/tool_service.py +++ b/mcpgateway/services/tool_service.py @@ -1573,6 +1573,7 @@ async def list_tools( user_email: Optional[str] = None, team_id: Optional[str] = None, visibility: Optional[str] = None, + token_teams: Optional[List[str]] = None, _request_headers: Optional[Dict[str, str]] = None, ) -> Union[tuple[List[ToolRead], Optional[str]], Dict[str, Any]]: """ @@ -1593,6 +1594,8 @@ async def list_tools( user_email (Optional[str]): User email for team-based access control. If None, no access control is applied. team_id (Optional[str]): Filter by specific team ID. Requires user_email for access validation. visibility (Optional[str]): Filter by visibility (private, team, public). + token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API token access + where the token scope should be respected instead of the user's full team memberships. _request_headers (Optional[Dict[str, str]], optional): Headers from the request to pass through. Currently unused but kept for API consistency. Defaults to None. @@ -1633,9 +1636,17 @@ async def list_tools( query = query.where(DbTool.enabled) # Apply team-based access control if user_email is provided if user_email: - team_service = TeamManagementService(db) - user_teams = await team_service.get_user_teams(user_email) - team_ids = [team.id for team in user_teams] + # Use token_teams if provided (for MCP/API token access), otherwise look up from DB + if token_teams is not None: + team_ids = token_teams + else: + team_service = TeamManagementService(db) + user_teams = await team_service.get_user_teams(user_email) + team_ids = [team.id for team in user_teams] + + # Check if this is a public-only token (empty teams array) + # Public-only tokens can ONLY see public resources - no owner access + is_public_only_token = token_teams is not None and len(token_teams) == 0 if team_id: # User requesting specific team - verify access @@ -1643,15 +1654,19 @@ async def list_tools( return ([], None) access_conditions = [ and_(DbTool.team_id == team_id, DbTool.visibility.in_(["team", "public"])), - and_(DbTool.team_id == team_id, DbTool.owner_email == user_email), ] + # Only include owner access for non-public-only tokens + if not is_public_only_token: + access_conditions.append(and_(DbTool.team_id == team_id, DbTool.owner_email == user_email)) query = query.where(or_(*access_conditions)) else: - # General access: user's tools + public tools + team tools + # General access: public tools + team tools (+ owner tools if not public-only token) access_conditions = [ - DbTool.owner_email == user_email, DbTool.visibility == "public", ] + # Only include owner access for non-public-only tokens + if not is_public_only_token: + access_conditions.append(DbTool.owner_email == user_email) if team_ids: access_conditions.append(and_(DbTool.team_id.in_(team_ids), DbTool.visibility.in_(["team", "public"]))) query = query.where(or_(*access_conditions)) @@ -1721,7 +1736,15 @@ async def list_tools( return (result, next_cursor) async def list_server_tools( - self, db: Session, server_id: str, include_inactive: bool = False, include_metrics: bool = False, cursor: Optional[str] = None, _request_headers: Optional[Dict[str, str]] = None + self, + db: Session, + server_id: str, + include_inactive: bool = False, + include_metrics: bool = False, + cursor: Optional[str] = None, + user_email: Optional[str] = None, + token_teams: Optional[List[str]] = None, + _request_headers: Optional[Dict[str, str]] = None, ) -> List[ToolRead]: """ Retrieve a list of registered tools from the database. @@ -1735,6 +1758,9 @@ async def list_server_tools( Defaults to False. cursor (Optional[str], optional): An opaque cursor token for pagination. Currently, this parameter is ignored. Defaults to None. + user_email (Optional[str]): User email for visibility filtering. If None, no filtering applied. + token_teams (Optional[List[str]]): Override DB team lookup with token's teams. Used for MCP/API + token access where the token scope should be respected. _request_headers (Optional[Dict[str, str]], optional): Headers from the request to pass through. Currently unused but kept for API consistency. Defaults to None. @@ -1777,6 +1803,30 @@ async def list_server_tools( if not include_inactive: query = query.where(DbTool.enabled) + # Add visibility filtering if user context provided + if user_email: + # Use token_teams if provided (for MCP/API token access), otherwise look up from DB + if token_teams is not None: + team_ids = token_teams + else: + team_service = TeamManagementService(db) + user_teams = await team_service.get_user_teams(user_email) + team_ids = [team.id for team in user_teams] + + # Check if this is a public-only token (empty teams array) + # Public-only tokens can ONLY see public resources - no owner access + is_public_only_token = token_teams is not None and len(token_teams) == 0 + + access_conditions = [ + DbTool.visibility == "public", + ] + # Only include owner access for non-public-only tokens + if not is_public_only_token: + access_conditions.append(DbTool.owner_email == user_email) + if team_ids: + access_conditions.append(and_(DbTool.team_id.in_(team_ids), DbTool.visibility.in_(["team", "public"]))) + query = query.where(or_(*access_conditions)) + # Execute the query - team names are loaded via joinedload(DbTool.email_team) tools = db.execute(query).scalars().all() diff --git a/mcpgateway/static/admin.js b/mcpgateway/static/admin.js index e81fd31d8d..35d0b581d2 100644 --- a/mcpgateway/static/admin.js +++ b/mcpgateway/static/admin.js @@ -189,6 +189,50 @@ function escapeHtml(unsafe) { .replace(/\//g, "/"); // Extra protection against script injection } +/** + * Extract a human-readable error message from an API error response. + * Handles both string errors and Pydantic validation error arrays. + * @param {Object} error - The parsed JSON error response + * @param {string} fallback - Fallback message if no detail found + * @returns {string} Human-readable error message + */ +function extractApiError(error, fallback = "An error occurred") { + if (!error || !error.detail) { + return fallback; + } + if (typeof error.detail === "string") { + return error.detail; + } + if (Array.isArray(error.detail)) { + // Pydantic validation errors - extract messages + return error.detail + .map((err) => err.msg || JSON.stringify(err)) + .join("; "); + } + return fallback; +} + +/** + * Safely parse an error response, handling both JSON and plain text bodies. + * @param {Response} response - The fetch Response object + * @param {string} fallback - Fallback message if parsing fails + * @returns {Promise} Human-readable error message + */ +async function parseErrorResponse(response, fallback = "An error occurred") { + try { + const contentType = response.headers.get("content-type") || ""; + if (contentType.includes("application/json")) { + const error = await response.json(); + return extractApiError(error, fallback); + } + // Non-JSON response - try to get text + const text = await response.text(); + return text || fallback; + } catch { + return fallback; + } +} + /** * Header validation constants and functions */ @@ -19129,13 +19173,34 @@ function displayTokensList(tokens) { ? 'Active' : 'Inactive'; + // Build scope badges + const teamName = token.team_id ? getTeamNameById(token.team_id) : null; + const teamBadge = teamName + ? `Team: ${escapeHtml(teamName)}` + : 'Public-only'; + + const ipBadge = + token.ip_restrictions && token.ip_restrictions.length > 0 + ? `${token.ip_restrictions.length} IP${token.ip_restrictions.length > 1 ? "s" : ""}` + : ""; + + const serverBadge = token.server_id + ? 'Server-scoped' + : ""; + + // Safely encode token data for data attribute (URL encoding preserves all characters) + const tokenDataEncoded = encodeURIComponent(JSON.stringify(token)); + tokensHTML += `
-
+

${escapeHtml(token.name)}

${statusBadge} + ${teamBadge} + ${serverBadge} + ${ipBadge}
${token.description ? `

${escapeHtml(token.description)}

` : ""}
@@ -19152,15 +19217,25 @@ function displayTokensList(tokens) { ${token.server_id ? `
Scoped to Server: ${escapeHtml(token.server_id)}
` : ""} ${token.resource_scopes && token.resource_scopes.length > 0 ? `
Permissions: ${token.resource_scopes.map((p) => escapeHtml(p)).join(", ")}
` : ""}
-
+
+ +
+ + +
+

Basic Information

+
+
+ ID: + ${escapeHtml(token.id)} + +
+
+ Name: + ${escapeHtml(token.name)} +
+
+ Description: + ${token.description ? escapeHtml(token.description) : "None"} +
+
+ Created by: + ${escapeHtml(token.user_email || "Unknown")} +
+
+ Team: + ${teamName ? `${escapeHtml(teamName)} (${escapeHtml(token.team_id.substring(0, 8))}...)` : "None (Public-only)"} +
+
+ Created: + ${formatDate(token.created_at)} +
+
+ Expires: + ${formatDate(token.expires_at)} +
+
+ Last Used: + ${formatDate(token.last_used)} +
+
+ Status: + ${statusText} +
+
+
+ + +
+

Scope & Restrictions

+
+
+ Server: + ${token.server_id ? escapeHtml(token.server_id) : "All servers"} +
+
+ Permissions: + ${ + token.resource_scopes && + token.resource_scopes.length > 0 + ? `
    ${formatList(token.resource_scopes)}
` + : 'All (no restrictions)' + } +
+
+ IP Restrictions: + ${ + token.ip_restrictions && + token.ip_restrictions.length > 0 + ? `
    ${formatList(token.ip_restrictions)}
` + : 'None' + } +
+
+ Time Restrictions: +
${formatJson(token.time_restrictions)}
+
+
+ Usage Limits: +
${formatJson(token.usage_limits)}
+
+
+
+ + + ${ + token.tags && token.tags.length > 0 + ? ` +
+

Tags

+
+ ${token.tags.map((tag) => `${escapeHtml(tag)}`).join("")} +
+
+ ` + : "" + } + + + ${ + token.is_revoked + ? ` +
+

Revocation Details

+
+
+ Revoked at: + ${formatDate(token.revoked_at)} +
+
+ Revoked by: + ${token.revoked_by ? escapeHtml(token.revoked_by) : "Unknown"} +
+
+ Reason: + ${token.revocation_reason ? escapeHtml(token.revocation_reason) : "No reason provided"} +
+
+
+ ` + : "" + } + +
+ +
+
+ `; + + document.body.appendChild(modal); + + // Attach event handlers (avoids inline JS and XSS risks) + modal.addEventListener("click", (event) => { + const button = event.target.closest("button[data-action]"); + if (!button) { + return; + } + + const action = button.dataset.action; + + if (action === "close-modal") { + modal.remove(); + } else if (action === "copy-id") { + const value = button.dataset.copyValue; + if (value) { + navigator.clipboard.writeText(value).then(() => { + button.textContent = "Copied!"; + setTimeout(() => { + button.textContent = "Copy"; + }, 1500); + }); + } + } + }); +} + /** * Get auth token from storage or user input */ @@ -28061,8 +28529,11 @@ async function saveLLMProvider(event) { }); if (!response.ok) { - const error = await response.json(); - throw new Error(error.detail || "Failed to save provider"); + const errorMsg = await parseErrorResponse( + response, + "Failed to save provider", + ); + throw new Error(errorMsg); } closeLLMProviderModal(); @@ -28103,8 +28574,11 @@ async function deleteLLMProvider(providerId, providerName) { ); if (!response.ok) { - const error = await response.json(); - throw new Error(error.detail || "Failed to delete provider"); + const errorMsg = await parseErrorResponse( + response, + "Failed to delete provider", + ); + throw new Error(errorMsg); } showToast("Provider deleted successfully", "success"); @@ -28442,8 +28916,11 @@ async function saveLLMModel(event) { }); if (!response.ok) { - const error = await response.json(); - throw new Error(error.detail || "Failed to save model"); + const errorMsg = await parseErrorResponse( + response, + "Failed to save model", + ); + throw new Error(errorMsg); } closeLLMModelModal(); @@ -28480,8 +28957,11 @@ async function deleteLLMModel(modelId, modelName) { ); if (!response.ok) { - const error = await response.json(); - throw new Error(error.detail || "Failed to delete model"); + const errorMsg = await parseErrorResponse( + response, + "Failed to delete model", + ); + throw new Error(errorMsg); } showToast("Model deleted successfully", "success"); diff --git a/mcpgateway/templates/admin.html b/mcpgateway/templates/admin.html index c7d90a4945..036b176dea 100644 --- a/mcpgateway/templates/admin.html +++ b/mcpgateway/templates/admin.html @@ -5847,16 +5847,16 @@

- +

- ⚠️ Public Access Token + ⚠️ Token Scope

- You are currently viewing "All Teams". Tokens created in this context will have public-only access. + Tokens created without selecting a team will have access to public resources only.

✅ This token CAN access:

    @@ -5865,7 +5865,8 @@

❌ This token CANNOT access:

    -
  • Team-scoped or private servers, tools, resources, and prompts
  • +
  • Your own private or team-scoped resources
  • +
  • Team-scoped servers, tools, resources, and prompts
@@ -5874,7 +5875,7 @@

💡

- To create a token with full Team Access, select a specific team from the header above. + For team-level access, select a specific team from the header above before creating a token.

diff --git a/mcpgateway/transports/streamablehttp_transport.py b/mcpgateway/transports/streamablehttp_transport.py index 930c421889..50a0384698 100644 --- a/mcpgateway/transports/streamablehttp_transport.py +++ b/mcpgateway/transports/streamablehttp_transport.py @@ -536,11 +536,26 @@ async def list_tools() -> List[types.Tool]: """ server_id = server_id_var.get() request_headers = request_headers_var.get() + user_context = user_context_var.get() + + # Extract filtering parameters from user context + user_email = user_context.get("email") if user_context else None + # Use None as default to distinguish "no teams specified" from "empty teams array" + token_teams = user_context.get("teams") if user_context else None + is_admin = user_context.get("is_admin", False) if user_context else False + + # Admin bypass - only when token has NO team restrictions (token_teams is None) + # If token has explicit team scope (even empty [] for public-only), respect it + if is_admin and token_teams is None: + user_email = None + # token_teams stays None (unrestricted) + elif token_teams is None: + token_teams = [] # Non-admin without teams = public-only (secure default) if server_id: try: async with get_db() as db: - tools = await tool_service.list_server_tools(db, server_id, _request_headers=request_headers) + tools = await tool_service.list_server_tools(db, server_id, user_email=user_email, token_teams=token_teams, _request_headers=request_headers) return [types.Tool(name=tool.name, description=tool.description, inputSchema=tool.input_schema, outputSchema=tool.output_schema, annotations=tool.annotations) for tool in tools] except Exception as e: logger.exception(f"Error listing tools:{e}") @@ -548,7 +563,7 @@ async def list_tools() -> List[types.Tool]: else: try: async with get_db() as db: - tools, _ = await tool_service.list_tools(db, include_inactive=False, limit=0, _request_headers=request_headers) + tools, _ = await tool_service.list_tools(db, include_inactive=False, limit=0, user_email=user_email, token_teams=token_teams, _request_headers=request_headers) return [types.Tool(name=tool.name, description=tool.description, inputSchema=tool.input_schema, outputSchema=tool.output_schema, annotations=tool.annotations) for tool in tools] except Exception as e: logger.exception(f"Error listing tools:{e}") @@ -572,13 +587,27 @@ async def list_prompts() -> List[types.Prompt]: >>> sig.return_annotation typing.List[mcp.types.Prompt] """ - server_id = server_id_var.get() + user_context = user_context_var.get() + + # Extract filtering parameters from user context + user_email = user_context.get("email") if user_context else None + # Use None as default to distinguish "no teams specified" from "empty teams array" + token_teams = user_context.get("teams") if user_context else None + is_admin = user_context.get("is_admin", False) if user_context else False + + # Admin bypass - only when token has NO team restrictions (token_teams is None) + # If token has explicit team scope (even empty [] for public-only), respect it + if is_admin and token_teams is None: + user_email = None + # token_teams stays None (unrestricted) + elif token_teams is None: + token_teams = [] # Non-admin without teams = public-only (secure default) if server_id: try: async with get_db() as db: - prompts = await prompt_service.list_server_prompts(db, server_id) + prompts = await prompt_service.list_server_prompts(db, server_id, user_email=user_email, token_teams=token_teams) return [types.Prompt(name=prompt.name, description=prompt.description, arguments=prompt.arguments) for prompt in prompts] except Exception as e: logger.exception(f"Error listing Prompts:{e}") @@ -586,7 +615,7 @@ async def list_prompts() -> List[types.Prompt]: else: try: async with get_db() as db: - prompts, _ = await prompt_service.list_prompts(db, include_inactive=False, limit=0) + prompts, _ = await prompt_service.list_prompts(db, include_inactive=False, limit=0, user_email=user_email, token_teams=token_teams) return [types.Prompt(name=prompt.name, description=prompt.description, arguments=prompt.arguments) for prompt in prompts] except Exception as e: logger.exception(f"Error listing prompts:{e}") @@ -650,13 +679,27 @@ async def list_resources() -> List[types.Resource]: >>> sig.return_annotation typing.List[mcp.types.Resource] """ - server_id = server_id_var.get() + user_context = user_context_var.get() + + # Extract filtering parameters from user context + user_email = user_context.get("email") if user_context else None + # Use None as default to distinguish "no teams specified" from "empty teams array" + token_teams = user_context.get("teams") if user_context else None + is_admin = user_context.get("is_admin", False) if user_context else False + + # Admin bypass - only when token has NO team restrictions (token_teams is None) + # If token has explicit team scope (even empty [] for public-only), respect it + if is_admin and token_teams is None: + user_email = None + # token_teams stays None (unrestricted) + elif token_teams is None: + token_teams = [] # Non-admin without teams = public-only (secure default) if server_id: try: async with get_db() as db: - resources = await resource_service.list_server_resources(db, server_id) + resources = await resource_service.list_server_resources(db, server_id, user_email=user_email, token_teams=token_teams) return [types.Resource(uri=resource.uri, name=resource.name, description=resource.description, mimeType=resource.mime_type) for resource in resources] except Exception as e: logger.exception(f"Error listing Resources:{e}") @@ -664,7 +707,7 @@ async def list_resources() -> List[types.Resource]: else: try: async with get_db() as db: - resources, _ = await resource_service.list_resources(db, include_inactive=False, limit=0) + resources, _ = await resource_service.list_resources(db, include_inactive=False, limit=0, user_email=user_email, token_teams=token_teams) return [types.Resource(uri=resource.uri, name=resource.name, description=resource.description, mimeType=resource.mime_type) for resource in resources] except Exception as e: logger.exception(f"Error listing resources:{e}") @@ -1036,8 +1079,13 @@ async def streamable_http_auth(scope: Any, receive: Any, send: Any) -> bool: if not settings.mcp_client_auth_enabled and settings.trust_proxy_auth: # Client auth disabled → allow proxy header if proxy_user: - # Set user context for proxy-authenticated sessions - user_context_var.set({"email": proxy_user}) + # Set enriched user context for proxy-authenticated sessions + user_context_var.set({ + "email": proxy_user, + "teams": [], # Proxy auth has no team context + "is_authenticated": True, + "is_admin": False, + }) return True # Trusted proxy supplied user # --- Standard JWT authentication flow (client auth enabled) --- @@ -1051,18 +1099,53 @@ async def streamable_http_auth(scope: Any, receive: Any, send: Any) -> bool: if token is None: raise Exception() user_payload = await verify_credentials(token) - # Store user context for later use in tool invocations + # Store enriched user context with normalized teams if isinstance(user_payload, dict): - user_context_var.set(user_payload) + # Check if "teams" key exists and is not None to distinguish: + # - Key exists with non-None value (even empty []) -> normalized list (scoped token) + # - Key absent OR key is None -> None (unrestricted for admin, public-only for non-admin) + teams_value = user_payload.get("teams") if "teams" in user_payload else None + if teams_value is not None: + normalized_teams = [] + for team in teams_value or []: + if isinstance(team, dict): + team_id = team.get("id") + if team_id: + normalized_teams.append(team_id) + elif isinstance(team, str): + normalized_teams.append(team) + final_teams = normalized_teams + else: + # No "teams" key or teams is null - treat as unrestricted (None) + final_teams = None + + user_context_var.set({ + "email": user_payload.get("sub") or user_payload.get("email"), # Some tokens only have email + "teams": final_teams, + "is_authenticated": True, + # Check both top-level is_admin (legacy tokens) and nested user.is_admin + "is_admin": user_payload.get("is_admin", False) or user_payload.get("user", {}).get("is_admin", False), + }) elif proxy_user: # If using proxy auth, store the proxy user - user_context_var.set({"email": proxy_user}) + user_context_var.set({ + "email": proxy_user, + "teams": [], + "is_authenticated": True, + "is_admin": False, + }) except Exception: # If JWT auth fails but we have a trusted proxy user, use that if settings.trust_proxy_auth and proxy_user: - user_context_var.set({"email": proxy_user}) + user_context_var.set({ + "email": proxy_user, + "teams": [], + "is_authenticated": True, + "is_admin": False, + }) return True # Fall back to proxy authentication + # No valid auth - return 401 (PRESERVES EXISTING BEHAVIOR) response = ORJSONResponse( {"detail": "Authentication failed"}, status_code=HTTP_401_UNAUTHORIZED, diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index a2471b5729..f617f78542 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -31,7 +31,8 @@ import pytest # First-Party -from mcpgateway.main import app, require_auth +from mcpgateway.main import app +from mcpgateway.utils.verify_credentials import require_auth from mcpgateway.common.models import InitializeResult, ResourceContent, ServerCapabilities from mcpgateway.schemas import ResourceRead, ServerRead, ToolMetrics, ToolRead @@ -358,7 +359,7 @@ def test_rpc_tool_invocation_flow( resp = test_client.post("/rpc/", json=rpc_body, headers=auth_headers) assert resp.status_code == 200 assert resp.json()["result"]["content"][0]["text"] == "ok" - mock_invoke.assert_awaited_once_with(db=ANY, name="test_tool", arguments={"foo": "bar"}, request_headers=ANY, app_user_email="integration-test-user") + mock_invoke.assert_awaited_once_with(db=ANY, name="test_tool", arguments={"foo": "bar"}, request_headers=ANY, app_user_email="integration-test-user@example.com", plugin_context_table=None, plugin_global_context=None) # --------------------------------------------------------------------- # # 5. Metrics aggregation endpoint # diff --git a/tests/integration/test_rbac_ownership_http.py b/tests/integration/test_rbac_ownership_http.py index 7c4f0a6836..5a819b2dc0 100644 --- a/tests/integration/test_rbac_ownership_http.py +++ b/tests/integration/test_rbac_ownership_http.py @@ -27,7 +27,8 @@ from _pytest.monkeypatch import MonkeyPatch # First-Party -from mcpgateway.main import app, require_auth +from mcpgateway.main import app +from mcpgateway.utils.verify_credentials import require_auth from mcpgateway.auth import get_current_user from mcpgateway.middleware.rbac import get_current_user_with_permissions, get_db as rbac_get_db, get_permission_service from mcpgateway.schemas import ToolRead, ServerRead, ResourceRead, PromptRead, GatewayRead, A2AAgentRead diff --git a/tests/integration/test_tag_endpoints.py b/tests/integration/test_tag_endpoints.py index d0d7c65324..948ab79f3e 100644 --- a/tests/integration/test_tag_endpoints.py +++ b/tests/integration/test_tag_endpoints.py @@ -15,7 +15,8 @@ import pytest # First-Party -from mcpgateway.main import app, require_auth +from mcpgateway.main import app +from mcpgateway.utils.verify_credentials import require_auth from mcpgateway.schemas import TaggedEntity, TagInfo, TagStats # Local diff --git a/tests/integration/test_tools_pagination.py b/tests/integration/test_tools_pagination.py index 1da7ea8c68..902536d043 100644 --- a/tests/integration/test_tools_pagination.py +++ b/tests/integration/test_tools_pagination.py @@ -26,7 +26,8 @@ # First-Party from mcpgateway.auth import get_current_user from mcpgateway.db import Tool as DbTool -from mcpgateway.main import app, require_auth +from mcpgateway.main import app +from mcpgateway.utils.verify_credentials import require_auth from mcpgateway.middleware.rbac import get_current_user_with_permissions, get_db as rbac_get_db # Local diff --git a/tests/unit/mcpgateway/routers/test_tokens.py b/tests/unit/mcpgateway/routers/test_tokens.py index 4de29ef001..75850182a5 100644 --- a/tests/unit/mcpgateway/routers/test_tokens.py +++ b/tests/unit/mcpgateway/routers/test_tokens.py @@ -132,7 +132,7 @@ async def test_create_token_with_scope(self, mock_db, mock_current_user, mock_to """Test token creation with scope restrictions.""" scope_data = { "server_id": "server-123", - "permissions": ["read", "write"], + "permissions": ["tools.read", "tools.write"], "ip_restrictions": ["192.168.1.0/24"], "time_restrictions": {"start_time": "09:00", "end_time": "17:00"}, "usage_limits": {"max_calls": 1000}, @@ -294,7 +294,7 @@ async def test_update_token_with_scope(self, mock_db, mock_current_user, mock_to """Test token update with new scope.""" scope_data = { "server_id": "new-server", - "permissions": ["admin"], + "permissions": ["tools.admin"], } request = TokenUpdateRequest( name="Updated Token", @@ -615,7 +615,7 @@ async def test_create_token_with_complex_scope(self, mock_db, mock_current_user, """Test token creation with all scope fields.""" scope_data = { "server_id": "srv-123", - "permissions": ["read", "write", "delete"], + "permissions": ["tools.read", "tools.write", "tools.delete"], "ip_restrictions": ["192.168.1.0/24", "10.0.0.0/8"], "time_restrictions": {"start_time": "08:00", "end_time": "18:00", "timezone": "UTC", "days": ["mon", "tue", "wed", "thu", "fri"]}, "usage_limits": {"max_calls": 10000, "max_bytes": 1048576, "rate_limit": "100/hour"}, diff --git a/tests/unit/mcpgateway/services/test_tool_service.py b/tests/unit/mcpgateway/services/test_tool_service.py index 4f842068d7..6f4bb862eb 100644 --- a/tests/unit/mcpgateway/services/test_tool_service.py +++ b/tests/unit/mcpgateway/services/test_tool_service.py @@ -3073,3 +3073,110 @@ def test_correlation_id_not_added_when_correlation_id_none(self): # Headers should remain unchanged assert "X-Correlation-ID" not in headers + + +# ----------------------------------------------------- # +# Token Teams Filtering Tests (Issue #1915) # +# ----------------------------------------------------- # +class TestToolServiceTokenTeamsFiltering: + """Tests for token_teams parameter in list_tools and list_server_tools.""" + + @pytest.mark.asyncio + async def test_list_tools_with_token_teams_uses_token_teams(self, tool_service, test_db): + """Test that list_tools uses token_teams when provided instead of DB lookup.""" + mock_tool = MagicMock(spec=DbTool, id="1", team_id="team_a") + + # Mock DB execute chain + test_db.execute = Mock(return_value=MagicMock(scalars=Mock(return_value=MagicMock(all=Mock(return_value=[mock_tool]))))) + test_db.commit = Mock() + + tool_read = MagicMock() + tool_service.convert_tool_to_read = Mock(return_value=tool_read) + + # When token_teams is provided, TeamManagementService should NOT be called + with patch("mcpgateway.services.tool_service.TeamManagementService") as mock_team_service: + mock_team_service.return_value.get_user_teams = AsyncMock() + result, _ = await tool_service.list_tools(test_db, user_email="user@example.com", token_teams=["team_a"]) + + # TeamManagementService should NOT be instantiated since token_teams was provided + mock_team_service.return_value.get_user_teams.assert_not_called() + + @pytest.mark.asyncio + async def test_list_tools_with_empty_token_teams_sees_own_and_public(self, tool_service, test_db): + """Test that empty token_teams list sees own resources and public resources.""" + mock_tool_public = MagicMock(spec=DbTool, id="1", team_id=None, visibility="public", owner_email="other@example.com") + mock_tool_own = MagicMock(spec=DbTool, id="2", team_id=None, visibility="private", owner_email="user@example.com") + + # Mock DB execute chain to return tools + test_db.execute = Mock(return_value=MagicMock(scalars=Mock(return_value=MagicMock(all=Mock(return_value=[mock_tool_public, mock_tool_own]))))) + test_db.commit = Mock() + + tool_service.convert_tool_to_read = Mock(side_effect=[MagicMock(), MagicMock()]) + + # With empty token_teams, user should see their own and public resources + result, _ = await tool_service.list_tools(test_db, user_email="user@example.com", token_teams=[]) + + # verify DB was queried + assert test_db.execute.called + + @pytest.mark.asyncio + async def test_list_tools_without_token_teams_uses_db_lookup(self, tool_service, test_db): + """Test that list_tools performs DB team lookup when token_teams is None.""" + mock_tool = MagicMock(spec=DbTool, id="1", team_id="team_a") + + test_db.execute = Mock(return_value=MagicMock(scalars=Mock(return_value=MagicMock(all=Mock(return_value=[mock_tool]))))) + test_db.commit = Mock() + + tool_read = MagicMock() + tool_service.convert_tool_to_read = Mock(return_value=tool_read) + + mock_team = MagicMock(id="team_a", is_personal=False) + + # When token_teams is None, TeamManagementService SHOULD be called + with patch("mcpgateway.services.tool_service.TeamManagementService") as mock_team_service: + mock_team_service.return_value.get_user_teams = AsyncMock(return_value=[mock_team]) + result, _ = await tool_service.list_tools(test_db, user_email="user@example.com", token_teams=None) + + # TeamManagementService SHOULD be called for DB lookup + mock_team_service.return_value.get_user_teams.assert_called_once_with("user@example.com") + + @pytest.mark.asyncio + async def test_list_server_tools_with_token_teams(self, tool_service, test_db): + """Test list_server_tools uses token_teams for filtering.""" + mock_tool = MagicMock(spec=DbTool, id="1", team_id="team_x", enabled=True) + mock_server = MagicMock() + mock_server.tools = [mock_tool] + + test_db.execute = Mock(return_value=MagicMock(scalar_one_or_none=Mock(return_value=mock_server))) + test_db.commit = Mock() + + tool_read = MagicMock() + tool_service.convert_tool_to_read = Mock(return_value=tool_read) + + with patch("mcpgateway.services.tool_service.TeamManagementService") as mock_team_service: + mock_team_service.return_value.get_user_teams = AsyncMock() + result = await tool_service.list_server_tools( + test_db, server_id="server-1", include_inactive=False, user_email="user@example.com", token_teams=["team_x"] + ) + + # TeamManagementService should NOT be called since token_teams was provided + mock_team_service.return_value.get_user_teams.assert_not_called() + + @pytest.mark.asyncio + async def test_list_tools_token_teams_filters_by_membership(self, tool_service, test_db): + """Test that only tools matching token_teams are returned.""" + mock_tool_a = MagicMock(spec=DbTool, id="1", team_id="team_a") + mock_tool_b = MagicMock(spec=DbTool, id="2", team_id="team_b") + + # DB returns both tools, but filtering should occur + test_db.execute = Mock(return_value=MagicMock(scalars=Mock(return_value=MagicMock(all=Mock(return_value=[mock_tool_a, mock_tool_b]))))) + test_db.commit = Mock() + + tool_read_a = MagicMock() + tool_read_b = MagicMock() + tool_service.convert_tool_to_read = Mock(side_effect=[tool_read_a, tool_read_b]) + + # Only team_a in token_teams - should only see team_a tools + result, _ = await tool_service.list_tools(test_db, user_email="user@example.com", token_teams=["team_a"]) + + assert test_db.execute.called diff --git a/tests/unit/mcpgateway/test_main.py b/tests/unit/mcpgateway/test_main.py index f5754a45fb..5732f952ba 100644 --- a/tests/unit/mcpgateway/test_main.py +++ b/tests/unit/mcpgateway/test_main.py @@ -183,8 +183,8 @@ def test_client(app): # First-Party # Mock user object for RBAC system from mcpgateway.db import EmailUser - from mcpgateway.main import require_auth from mcpgateway.middleware.rbac import get_current_user_with_permissions + from mcpgateway.utils.verify_credentials import require_auth mock_user = EmailUser( email="test_user@example.com", @@ -1160,7 +1160,7 @@ def test_rpc_tool_invocation(self, mock_invoke_tool, test_client, auth_headers): name="test_tool", arguments={"param": "value"}, request_headers=ANY, - app_user_email="test_user", + app_user_email="test_user@example.com", # Updated: now uses email from JWT/RBAC plugin_context_table=None, plugin_global_context=ANY, ) @@ -1939,3 +1939,291 @@ def test_different_jsonpath_cached_separately(self): info = _parse_jsonpath.cache_info() assert info.misses == 2 + + +# ----------------------------------------------------- # +# Token Teams Helper Function Tests (Issue #1915) # +# ----------------------------------------------------- # +class TestNormalizeTokenTeams: + """Tests for _normalize_token_teams helper function.""" + + def test_normalize_token_teams_none(self): + """Test that None input returns empty list.""" + from mcpgateway.main import _normalize_token_teams + + assert _normalize_token_teams(None) == [] + + def test_normalize_token_teams_empty_list(self): + """Test that empty list input returns empty list.""" + from mcpgateway.main import _normalize_token_teams + + assert _normalize_token_teams([]) == [] + + def test_normalize_token_teams_string_ids(self): + """Test that string team IDs are passed through unchanged.""" + from mcpgateway.main import _normalize_token_teams + + result = _normalize_token_teams(["team_a", "team_b", "team_c"]) + assert result == ["team_a", "team_b", "team_c"] + + def test_normalize_token_teams_dict_format(self): + """Test that dict format with id key extracts the ID.""" + from mcpgateway.main import _normalize_token_teams + + result = _normalize_token_teams([{"id": "team_a", "name": "Team A"}, {"id": "team_b", "name": "Team B"}]) + assert result == ["team_a", "team_b"] + + def test_normalize_token_teams_mixed_format(self): + """Test that mixed string and dict formats are handled correctly.""" + from mcpgateway.main import _normalize_token_teams + + result = _normalize_token_teams([{"id": "t1", "name": "Team 1"}, "t2", {"id": "t3"}]) + assert result == ["t1", "t2", "t3"] + + def test_normalize_token_teams_dict_without_id(self): + """Test that dicts without id key are skipped.""" + from mcpgateway.main import _normalize_token_teams + + result = _normalize_token_teams([{"name": "No ID Team"}, {"id": "valid_team"}]) + assert result == ["valid_team"] + + def test_normalize_token_teams_dict_with_empty_id(self): + """Test that dicts with empty id value are skipped.""" + from mcpgateway.main import _normalize_token_teams + + result = _normalize_token_teams([{"id": "", "name": "Empty ID"}, {"id": "valid"}]) + assert result == ["valid"] + + def test_normalize_token_teams_preserves_order(self): + """Test that team order is preserved.""" + from mcpgateway.main import _normalize_token_teams + + result = _normalize_token_teams(["z_team", "a_team", "m_team"]) + assert result == ["z_team", "a_team", "m_team"] + + +class TestGetTokenTeamsFromRequest: + """Tests for _get_token_teams_from_request helper function.""" + + def test_get_token_teams_with_valid_cached_payload(self): + """Test extraction of teams from cached JWT payload.""" + from mcpgateway.main import _get_token_teams_from_request + + mock_request = MagicMock() + mock_request.state._jwt_verified_payload = ("token_string", {"sub": "user@example.com", "teams": ["team_a", "team_b"]}) + + result = _get_token_teams_from_request(mock_request) + assert result == ["team_a", "team_b"] + + def test_get_token_teams_with_dict_teams_payload(self): + """Test extraction and normalization of dict format teams.""" + from mcpgateway.main import _get_token_teams_from_request + + mock_request = MagicMock() + mock_request.state._jwt_verified_payload = ("token", {"teams": [{"id": "t1", "name": "Team 1"}]}) + + result = _get_token_teams_from_request(mock_request) + assert result == ["t1"] + + def test_get_token_teams_no_cached_payload_returns_none(self): + """Test that missing cached payload returns None (triggers DB lookup).""" + from mcpgateway.main import _get_token_teams_from_request + + mock_request = MagicMock() + mock_request.state._jwt_verified_payload = None + + result = _get_token_teams_from_request(mock_request) + assert result is None # None triggers DB team lookup in services + + def test_get_token_teams_no_teams_in_payload_returns_none(self): + """Test that payload without teams key returns None (unrestricted access).""" + from mcpgateway.main import _get_token_teams_from_request + + mock_request = MagicMock() + mock_request.state._jwt_verified_payload = ("token", {"sub": "user@example.com"}) + + result = _get_token_teams_from_request(mock_request) + assert result is None # None = JWT exists but no teams key (unrestricted) + + def test_get_token_teams_empty_teams_returns_empty_list(self): + """Test that payload with empty teams returns empty list (not None).""" + from mcpgateway.main import _get_token_teams_from_request + + mock_request = MagicMock() + mock_request.state._jwt_verified_payload = ("token", {"sub": "user@example.com", "teams": []}) + + result = _get_token_teams_from_request(mock_request) + assert result == [] # Empty list = JWT exists but no teams + + def test_get_token_teams_null_teams_returns_none(self): + """Test that payload with teams: null returns None (same as missing teams).""" + from mcpgateway.main import _get_token_teams_from_request + + mock_request = MagicMock() + mock_request.state._jwt_verified_payload = ("token", {"sub": "user@example.com", "teams": None}) + + result = _get_token_teams_from_request(mock_request) + assert result is None # None = teams is null, treated same as missing (unrestricted) + + def test_get_token_teams_invalid_tuple_format_returns_none(self): + """Test that non-tuple cached payload returns None.""" + from mcpgateway.main import _get_token_teams_from_request + + mock_request = MagicMock() + mock_request.state._jwt_verified_payload = "not_a_tuple" + + result = _get_token_teams_from_request(mock_request) + assert result is None + + def test_get_token_teams_short_tuple_returns_none(self): + """Test that tuple with wrong length returns None.""" + from mcpgateway.main import _get_token_teams_from_request + + mock_request = MagicMock() + mock_request.state._jwt_verified_payload = ("only_one_element",) + + result = _get_token_teams_from_request(mock_request) + assert result is None + + def test_get_token_teams_none_payload_in_tuple_returns_none(self): + """Test that None payload in tuple returns None.""" + from mcpgateway.main import _get_token_teams_from_request + + mock_request = MagicMock() + mock_request.state._jwt_verified_payload = ("token", None) + + result = _get_token_teams_from_request(mock_request) + assert result is None + + +class TestGetRpcFilterContext: + """Tests for _get_rpc_filter_context helper function.""" + + def test_get_rpc_filter_context_dict_user(self): + """Test with dict user containing email and is_admin.""" + from mcpgateway.main import _get_rpc_filter_context + + mock_request = MagicMock() + # is_admin must be in the token payload, not the user dict (security fix) + mock_request.state._jwt_verified_payload = ("token", {"teams": ["t1", "t2"], "is_admin": True}) + user = {"email": "test@example.com", "is_admin": True} # User's is_admin is ignored + + email, teams, is_admin = _get_rpc_filter_context(mock_request, user) + + assert email == "test@example.com" + assert teams == ["t1", "t2"] + assert is_admin is True # From token payload, not user dict + + def test_get_rpc_filter_context_dict_user_sub_field(self): + """Test that sub field is used if email is not present.""" + from mcpgateway.main import _get_rpc_filter_context + + mock_request = MagicMock() + mock_request.state._jwt_verified_payload = ("token", {"teams": []}) + user = {"sub": "user@sub.com"} + + email, teams, is_admin = _get_rpc_filter_context(mock_request, user) + + assert email == "user@sub.com" + assert teams == [] + assert is_admin is False + + def test_get_rpc_filter_context_object_user(self): + """Test with user object having email and is_admin attributes.""" + from mcpgateway.main import _get_rpc_filter_context + + mock_request = MagicMock() + mock_request.state._jwt_verified_payload = ("token", {"teams": ["team_x"]}) + + class UserObject: + email = "obj@example.com" + is_admin = False + + email, teams, is_admin = _get_rpc_filter_context(mock_request, UserObject()) + + assert email == "obj@example.com" + assert teams == ["team_x"] + assert is_admin is False + + def test_get_rpc_filter_context_nested_is_admin(self): + """Test that nested user.is_admin is extracted from token payload.""" + from mcpgateway.main import _get_rpc_filter_context + + mock_request = MagicMock() + # is_admin must be in token payload - use non-empty teams to allow admin bypass + mock_request.state._jwt_verified_payload = ("token", {"teams": ["team_x"], "user": {"is_admin": True}}) + user = {"email": "nested@example.com", "user": {"is_admin": True}} + + email, teams, is_admin = _get_rpc_filter_context(mock_request, user) + + assert email == "nested@example.com" + assert is_admin is True # From token payload's nested user.is_admin + + def test_get_rpc_filter_context_empty_teams_disables_admin(self): + """Test that empty teams array disables admin bypass even when is_admin is true.""" + from mcpgateway.main import _get_rpc_filter_context + + mock_request = MagicMock() + # Token has is_admin but empty teams - admin bypass should be disabled + mock_request.state._jwt_verified_payload = ("token", {"teams": [], "is_admin": True}) + user = {"email": "admin@example.com", "is_admin": True} + + email, teams, is_admin = _get_rpc_filter_context(mock_request, user) + + assert email == "admin@example.com" + assert teams == [] + assert is_admin is False # Disabled for empty-team tokens (public-only access) + + def test_get_rpc_filter_context_string_user(self): + """Test with string user (fallback to str conversion).""" + from mcpgateway.main import _get_rpc_filter_context + + mock_request = MagicMock() + mock_request.state._jwt_verified_payload = ("token", {"teams": ["t1"]}) + user = "plain_username" + + email, teams, is_admin = _get_rpc_filter_context(mock_request, user) + + assert email == "plain_username" + assert teams == ["t1"] + assert is_admin is False + + def test_get_rpc_filter_context_none_user(self): + """Test with None user.""" + from mcpgateway.main import _get_rpc_filter_context + + mock_request = MagicMock() + mock_request.state._jwt_verified_payload = ("token", {"teams": []}) + + email, teams, is_admin = _get_rpc_filter_context(mock_request, None) + + assert email is None + assert teams == [] + assert is_admin is False + + def test_get_rpc_filter_context_admin_not_in_dict(self): + """Test that is_admin defaults to False if not present.""" + from mcpgateway.main import _get_rpc_filter_context + + mock_request = MagicMock() + mock_request.state._jwt_verified_payload = ("token", {"teams": ["t1"]}) + user = {"email": "user@example.com"} + + email, teams, is_admin = _get_rpc_filter_context(mock_request, user) + + assert email == "user@example.com" + assert is_admin is False + + def test_get_rpc_filter_context_no_jwt_returns_none_teams(self): + """Test that missing JWT payload returns None for teams (triggers DB lookup).""" + from mcpgateway.main import _get_rpc_filter_context + + mock_request = MagicMock() + mock_request.state._jwt_verified_payload = None # No JWT - e.g., plugin auth + user = {"email": "plugin_user@example.com", "is_admin": False} + + email, teams, is_admin = _get_rpc_filter_context(mock_request, user) + + assert email == "plugin_user@example.com" + assert teams is None # None triggers DB team lookup in services + assert is_admin is False diff --git a/tests/unit/mcpgateway/test_main_extended.py b/tests/unit/mcpgateway/test_main_extended.py index 0baf06a7a5..3132399aff 100644 --- a/tests/unit/mcpgateway/test_main_extended.py +++ b/tests/unit/mcpgateway/test_main_extended.py @@ -341,8 +341,8 @@ def test_client(app): # First-Party from mcpgateway.auth import get_current_user from mcpgateway.db import EmailUser - from mcpgateway.main import require_auth from mcpgateway.middleware.rbac import get_current_user_with_permissions + from mcpgateway.utils.verify_credentials import require_auth # Mock user object for RBAC system mock_user = EmailUser( diff --git a/tests/unit/mcpgateway/transports/test_streamablehttp_transport.py b/tests/unit/mcpgateway/transports/test_streamablehttp_transport.py index 1eff8d4cb8..2c83201a23 100644 --- a/tests/unit/mcpgateway/transports/test_streamablehttp_transport.py +++ b/tests/unit/mcpgateway/transports/test_streamablehttp_transport.py @@ -1333,3 +1333,224 @@ async def test_stream_buffer_len(): assert len(buffer) == 0 buffer.count = 2 assert len(buffer) == 2 + + +# --------------------------------------------------------------------------- +# Token Teams Context Tests (Issue #1915) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_streamable_http_auth_sets_user_context_with_teams(monkeypatch): + """Auth sets user context with email, teams, and is_admin from JWT payload.""" + + async def fake_verify(token): + return { + "sub": "user@example.com", + "teams": ["team_a", "team_b"], + "user": {"is_admin": True}, + } + + monkeypatch.setattr(tr, "verify_credentials", fake_verify) + + scope = _make_scope("/servers/1/mcp", headers=[(b"authorization", b"Bearer good-token")]) + messages = [] + + async def send(msg): + messages.append(msg) + + result = await streamable_http_auth(scope, None, send) + + assert result is True + assert len(messages) == 0 # Should not send 401 + + # Verify user context was set correctly + user_ctx = tr.user_context_var.get() + assert user_ctx.get("email") == "user@example.com" + assert user_ctx.get("teams") == ["team_a", "team_b"] + assert user_ctx.get("is_admin") is True + assert user_ctx.get("is_authenticated") is True + + +@pytest.mark.asyncio +async def test_streamable_http_auth_normalizes_dict_teams(monkeypatch): + """Auth normalizes team dicts to string IDs.""" + + async def fake_verify(token): + return { + "sub": "user@example.com", + "teams": [{"id": "t1", "name": "Team 1"}, {"id": "t2", "name": "Team 2"}], + "user": {"is_admin": False}, + } + + monkeypatch.setattr(tr, "verify_credentials", fake_verify) + + scope = _make_scope("/servers/1/mcp", headers=[(b"authorization", b"Bearer good-token")]) + + async def send(msg): + pass + + result = await streamable_http_auth(scope, None, send) + + assert result is True + + # Verify teams were normalized to IDs + user_ctx = tr.user_context_var.get() + assert user_ctx.get("teams") == ["t1", "t2"] + + +@pytest.mark.asyncio +async def test_streamable_http_auth_handles_empty_teams(monkeypatch): + """Auth handles empty teams list correctly.""" + + async def fake_verify(token): + return { + "sub": "user@example.com", + "teams": [], + "user": {}, + } + + monkeypatch.setattr(tr, "verify_credentials", fake_verify) + + scope = _make_scope("/servers/1/mcp", headers=[(b"authorization", b"Bearer good-token")]) + + async def send(msg): + pass + + result = await streamable_http_auth(scope, None, send) + + assert result is True + + user_ctx = tr.user_context_var.get() + assert user_ctx.get("email") == "user@example.com" + assert user_ctx.get("teams") == [] + assert user_ctx.get("is_admin") is False + + +@pytest.mark.asyncio +async def test_streamable_http_auth_uses_email_field_fallback(monkeypatch): + """Auth uses email field when sub is not present.""" + + async def fake_verify(token): + return { + "email": "email_user@example.com", # Only email, no sub + "teams": ["team_x"], + } + + monkeypatch.setattr(tr, "verify_credentials", fake_verify) + + scope = _make_scope("/servers/1/mcp", headers=[(b"authorization", b"Bearer good-token")]) + + async def send(msg): + pass + + result = await streamable_http_auth(scope, None, send) + + assert result is True + + user_ctx = tr.user_context_var.get() + assert user_ctx.get("email") == "email_user@example.com" + + +@pytest.mark.asyncio +async def test_streamable_http_auth_handles_missing_teams_key(monkeypatch): + """Auth handles JWT payload without teams key - returns None for unrestricted access.""" + + async def fake_verify(token): + return { + "sub": "user@example.com", + # No teams key - legacy token without team scoping + } + + monkeypatch.setattr(tr, "verify_credentials", fake_verify) + + scope = _make_scope("/servers/1/mcp", headers=[(b"authorization", b"Bearer good-token")]) + + async def send(msg): + pass + + result = await streamable_http_auth(scope, None, send) + + assert result is True + + user_ctx = tr.user_context_var.get() + assert user_ctx.get("teams") is None # None = unrestricted (legacy token without teams key) + + +@pytest.mark.asyncio +async def test_streamable_http_auth_handles_null_teams(monkeypatch): + """Auth handles JWT payload with teams: null - same as missing teams key.""" + + async def fake_verify(token): + return { + "sub": "user@example.com", + "teams": None, # Explicit null - treated same as missing + } + + monkeypatch.setattr(tr, "verify_credentials", fake_verify) + + scope = _make_scope("/servers/1/mcp", headers=[(b"authorization", b"Bearer good-token")]) + + async def send(msg): + pass + + result = await streamable_http_auth(scope, None, send) + + assert result is True + + user_ctx = tr.user_context_var.get() + assert user_ctx.get("teams") is None # None = teams: null treated same as missing + + +@pytest.mark.asyncio +async def test_streamable_http_auth_top_level_is_admin(monkeypatch): + """Auth handles top-level is_admin (legacy token format).""" + + async def fake_verify(token): + return { + "sub": "admin@example.com", + "teams": [], + "is_admin": True, # Top-level is_admin (legacy format) + } + + monkeypatch.setattr(tr, "verify_credentials", fake_verify) + + scope = _make_scope("/servers/1/mcp", headers=[(b"authorization", b"Bearer good-token")]) + + async def send(msg): + pass + + result = await streamable_http_auth(scope, None, send) + + assert result is True + + user_ctx = tr.user_context_var.get() + assert user_ctx.get("is_admin") is True # Should recognize top-level is_admin + + +@pytest.mark.asyncio +async def test_streamable_http_auth_nested_is_admin_takes_precedence(monkeypatch): + """Auth checks both top-level and nested is_admin.""" + + async def fake_verify(token): + return { + "sub": "admin@example.com", + "teams": [], + "is_admin": False, # Top-level says not admin + "user": {"is_admin": True}, # Nested says admin + } + + monkeypatch.setattr(tr, "verify_credentials", fake_verify) + + scope = _make_scope("/servers/1/mcp", headers=[(b"authorization", b"Bearer good-token")]) + + async def send(msg): + pass + + result = await streamable_http_auth(scope, None, send) + + assert result is True + + user_ctx = tr.user_context_var.get() + # Either top-level OR nested is_admin should grant admin access + assert user_ctx.get("is_admin") is True