Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
339 changes: 314 additions & 25 deletions docs/docs/manage/rbac.md

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions docs/docs/manage/securing.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,22 @@ volumes:

The gateway supports fine-grained token scoping to restrict token access to specific servers, permissions, IP ranges, and time windows. This provides defense-in-depth security for API access.

!!! tip "Detailed RBAC Documentation"
For comprehensive documentation on token scoping semantics, team-based access control, and visibility filtering, see the [RBAC Configuration Guide](rbac.md).

#### Team-Based Token Scoping

Tokens can be scoped to specific teams using the `teams` JWT claim:

| Token Configuration | Admin User | Non-Admin User |
|---------------------|------------|----------------|
| No `teams` key | Unrestricted | Public-only |
| `teams: null` | Unrestricted | Public-only |
| `teams: []` | Public-only | Public-only |
| `teams: ["team-id"]` | Team + Public | Team + Public |

**Security Default**: Non-admin tokens without explicit team scope default to public-only access (principle of least privilege).

#### Server-Scoped Tokens

Server-scoped tokens are restricted to specific MCP servers and cannot access admin endpoints:
Expand Down
15 changes: 9 additions & 6 deletions mcpgateway/cache/session_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1479,26 +1479,29 @@ async def generate_response(self, message: Dict[str, Any], transport: SSETranspo
"id": req_id,
}
# Get the token from the current authentication context
# The user object doesn't contain the token directly, we need to reconstruct it
# Since we don't have access to the original headers here, we need a different approach
# We'll extract the token from the session or create a new admin token
# The user object should contain auth_token, token_teams, and is_admin from the SSE endpoint
token = None
token_teams = user.get("token_teams", []) # Default to empty list, never None
is_admin = user.get("is_admin", False) # Preserve admin status from SSE endpoint

try:
if hasattr(user, "get") and "auth_token" in user:
if hasattr(user, "get") and user.get("auth_token"):
token = user["auth_token"]
else:
# Fallback: create an admin token for internal RPC calls
# Fallback: create token preserving the user's context (including admin status)
logger.warning("No auth token available for SSE RPC call - creating fallback token")
now = datetime.now(timezone.utc)
payload = {
"sub": user.get("email", "system"),
"iss": settings.jwt_issuer,
"aud": settings.jwt_audience,
"iat": int(now.timestamp()),
"jti": str(uuid.uuid4()),
"teams": token_teams, # Always a list - preserves token scope
"user": {
"email": user.get("email", "system"),
"full_name": user.get("full_name", "System"),
"is_admin": True, # Internal calls should have admin access
"is_admin": is_admin, # Preserve admin status for cookie-authenticated admins
"auth_provider": "internal",
},
}
Expand Down
397 changes: 353 additions & 44 deletions mcpgateway/main.py

Large diffs are not rendered by default.

72 changes: 60 additions & 12 deletions mcpgateway/middleware/token_scoping.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,32 @@ def __init__(self):
True
"""

def _normalize_teams(self, teams) -> list:
"""Normalize teams from token payload to list of team IDs.

Handles various team formats:
- None -> []
- List of strings -> as-is
- List of dicts with 'id' key -> extract IDs

Args:
teams: Raw teams value from JWT payload

Returns:
List of team ID strings
"""
if not teams:
return []
normalized = []
for team in teams:
if isinstance(team, dict):
team_id = team.get("id")
if team_id:
normalized.append(team_id)
elif isinstance(team, str):
normalized.append(team)
return normalized

async def _extract_token_scopes(self, request: Request) -> Optional[dict]:
"""Extract token scopes from JWT in request.

Expand Down Expand Up @@ -441,7 +467,7 @@ def _check_team_membership(self, payload: dict, db=None) -> bool:
finally:
db.close()

def _check_resource_team_ownership(self, request_path: str, token_teams: list, db=None) -> bool: # pylint: disable=too-many-return-statements
def _check_resource_team_ownership(self, request_path: str, token_teams: list, db=None, _user_email: str = None) -> bool: # pylint: disable=too-many-return-statements
"""
Check if the requested resource is accessible by the token.

Expand All @@ -451,7 +477,7 @@ def _check_resource_team_ownership(self, request_path: str, token_teams: list, d
- PRIVATE: Accessible only by tokens scoped to that specific team

Token Access Rules:
- Public-only tokens (empty token_teams): Can ONLY access public resources
- Public-only tokens (empty token_teams): Can access public resources + their own resources
- Team-scoped tokens: Can access their team's resources + public resources

Handles URLs like:
Expand Down Expand Up @@ -535,7 +561,8 @@ def _check_resource_team_ownership(self, request_path: str, token_teams: list, d
logger.debug(f"Access granted: Server {resource_id} is PUBLIC")
return True

# PUBLIC-ONLY TOKEN: Can ONLY access public servers
# PUBLIC-ONLY TOKEN: Can ONLY access public servers (strict public-only policy)
# No owner access - if user needs own resources, use a personal team-scoped token
if is_public_token:
logger.warning(f"Access denied: Public-only token cannot access {server_visibility} server {resource_id}")
return False
Expand Down Expand Up @@ -578,7 +605,8 @@ def _check_resource_team_ownership(self, request_path: str, token_teams: list, d
logger.debug(f"Access granted: Tool {resource_id} is PUBLIC")
return True

# PUBLIC-ONLY TOKEN: Can ONLY access public tools
# PUBLIC-ONLY TOKEN: Can ONLY access public tools (strict public-only policy)
# No owner access - if user needs own resources, use a personal team-scoped token
if is_public_token:
logger.warning(f"Access denied: Public-only token cannot access {tool_visibility} tool {resource_id}")
return False
Expand Down Expand Up @@ -623,7 +651,8 @@ def _check_resource_team_ownership(self, request_path: str, token_teams: list, d
logger.debug(f"Access granted: Resource {resource_id} is PUBLIC")
return True

# PUBLIC-ONLY TOKEN: Can ONLY access public resources
# PUBLIC-ONLY TOKEN: Can ONLY access public resources (strict public-only policy)
# No owner access - if user needs own resources, use a personal team-scoped token
if is_public_token:
logger.warning(f"Access denied: Public-only token cannot access {resource_visibility} resource {resource_id}")
return False
Expand Down Expand Up @@ -668,7 +697,8 @@ def _check_resource_team_ownership(self, request_path: str, token_teams: list, d
logger.debug(f"Access granted: Prompt {resource_id} is PUBLIC")
return True

# PUBLIC-ONLY TOKEN: Can ONLY access public prompts
# PUBLIC-ONLY TOKEN: Can ONLY access public prompts (strict public-only policy)
# No owner access - if user needs own resources, use a personal team-scoped token
if is_public_token:
logger.warning(f"Access denied: Public-only token cannot access {prompt_visibility} prompt {resource_id}")
return False
Expand Down Expand Up @@ -755,10 +785,28 @@ async def __call__(self, request: Request, call_next):

# TEAM VALIDATION: Use single DB session for both team checks
# This reduces connection pool overhead from 2 sessions to 1 for resource endpoints
token_teams = payload.get("teams", [])
needs_db_session = bool(token_teams) # Only need DB if token has teams

if needs_db_session:
user_email = payload.get("sub") or payload.get("email") # Extract user email for ownership check
is_admin = payload.get("is_admin", False) or payload.get("user", {}).get("is_admin", False)

# Determine token_teams based on whether "teams" key exists and is not None
# - Key absent OR null + admin = None (unrestricted bypass)
# - Key absent OR null + non-admin = [] (public-only, secure default)
# - Key present with non-None value = normalize the value
teams_value = payload.get("teams") if "teams" in payload else None
if teams_value is not None:
token_teams = self._normalize_teams(teams_value)
elif is_admin:
# Admin without teams key (or teams: null) = unrestricted (skip team checks)
token_teams = None
else:
# Non-admin without teams key (or teams: null) = public-only (secure default)
token_teams = []

# Admin with no team restrictions bypasses team validation entirely
if is_admin and token_teams is None:
logger.debug(f"Admin bypass: skipping team validation for {user_email}")
# Skip to other checks (server_id, IP, etc.)
elif token_teams:
# First-Party
from mcpgateway.db import get_db # pylint: disable=import-outside-toplevel

Expand All @@ -770,7 +818,7 @@ async def __call__(self, request: Request, call_next):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token is invalid: User is no longer a member of the associated team")

# Check resource team ownership with shared session
if not self._check_resource_team_ownership(request.url.path, token_teams, db=db):
if not self._check_resource_team_ownership(request.url.path, token_teams, db=db, _user_email=user_email):
logger.warning(f"Access denied: Resource does not belong to token's teams {token_teams}")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied: You do not have permission to access this resource using the current token")
finally:
Expand All @@ -785,7 +833,7 @@ async def __call__(self, request: Request, call_next):
logger.warning("Token rejected: User no longer member of associated team(s)")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token is invalid: User is no longer a member of the associated team")

if not self._check_resource_team_ownership(request.url.path, token_teams):
if not self._check_resource_team_ownership(request.url.path, token_teams, _user_email=user_email):
logger.warning(f"Access denied: Resource does not belong to token's teams {token_teams}")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied: You do not have permission to access this resource using the current token")

Expand Down
13 changes: 8 additions & 5 deletions mcpgateway/routers/email_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,20 @@ async def create_access_token(user: EmailUser, token_scopes: Optional[dict] = No
"is_admin": user.is_admin,
"auth_provider": user.auth_provider,
},
# Team memberships for authorization
"teams": [
{"id": team.id, "name": team.name, "slug": team.slug, "is_personal": team.is_personal, "role": next((m.role for m in user.team_memberships if m.team_id == team.id), "member")}
for team in teams
],
# Namespace access (backwards compatible)
"namespaces": [f"user:{user.email}", *[f"team:{team.slug}" for team in teams], "public"],
# Token scoping (if provided)
"scopes": token_scopes or {"server_id": None, "permissions": ["*"], "ip_restrictions": [], "time_restrictions": {}}, # Full access for regular user tokens
}

# For admin users: omit "teams" key entirely to enable unrestricted access bypass
# For regular users: include teams for proper team-based scoping
if not user.is_admin:
payload["teams"] = [
{"id": team.id, "name": team.name, "slug": team.slug, "is_personal": team.is_personal, "role": next((m.role for m in user.team_memberships if m.team_id == team.id), "member")}
for team in teams
]

# Generate token using centralized token creation
token = await create_jwt_token(payload)

Expand Down
84 changes: 84 additions & 0 deletions mcpgateway/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -5619,6 +5619,90 @@ class TokenScopeRequest(BaseModel):
time_restrictions: Dict[str, Any] = Field(default_factory=dict, description="Time-based restrictions")
usage_limits: Dict[str, Any] = Field(default_factory=dict, description="Usage limits and quotas")

@field_validator("ip_restrictions")
@classmethod
def validate_ip_restrictions(cls, v: List[str]) -> List[str]:
"""Validate IP addresses and CIDR notation.

Args:
v: List of IP address or CIDR strings to validate.

Returns:
List of validated IP/CIDR strings with whitespace stripped.

Raises:
ValueError: If any IP address or CIDR notation is invalid.

Examples:
>>> TokenScopeRequest.validate_ip_restrictions(["192.168.1.0/24"])
['192.168.1.0/24']
>>> TokenScopeRequest.validate_ip_restrictions(["10.0.0.1"])
['10.0.0.1']
"""
import ipaddress # pylint: disable=import-outside-toplevel

if not v:
return v

validated = []
for ip_str in v:
ip_str = ip_str.strip()
if not ip_str:
continue
try:
# Try parsing as network (CIDR notation)
if "/" in ip_str:
ipaddress.ip_network(ip_str, strict=False)
else:
# Try parsing as single IP address
ipaddress.ip_address(ip_str)
validated.append(ip_str)
except ValueError as e:
raise ValueError(f"Invalid IP address or CIDR notation '{ip_str}': {e}") from e
return validated

@field_validator("permissions")
@classmethod
def validate_permissions(cls, v: List[str]) -> List[str]:
"""Validate permission scope format.

Permissions must be in format 'resource.action' or wildcard '*'.

Args:
v: List of permission strings to validate.

Returns:
List of validated permission strings with whitespace stripped.

Raises:
ValueError: If any permission does not match 'resource.action' format or '*'.

Examples:
>>> TokenScopeRequest.validate_permissions(["tools.read", "resources.write"])
['tools.read', 'resources.write']
>>> TokenScopeRequest.validate_permissions(["*"])
['*']
"""
if not v:
return v

# Permission pattern: resource.action (alphanumeric with underscores)
permission_pattern = re.compile(r"^[a-zA-Z][a-zA-Z0-9_]*\.[a-zA-Z][a-zA-Z0-9_]*$")

validated = []
for perm in v:
perm = perm.strip()
if not perm:
continue
# Allow wildcard
if perm == "*":
validated.append(perm)
continue
if not permission_pattern.match(perm):
raise ValueError(f"Invalid permission format '{perm}'. Use 'resource.action' format (e.g., 'tools.read') or '*' for full access")
validated.append(perm)
return validated


class TokenCreateRequest(BaseModel):
"""Schema for creating a new API token.
Expand Down
Loading
Loading