Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/copaw/app/_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from .routers.voice import voice_router
from ..envs import load_envs_into_environ
from ..providers.provider_manager import ProviderManager
from .auth_middleware import BasicAuthMiddleware

# Apply log level on load so reload child process gets same level as CLI.
logger = setup_logger(os.environ.get(LOG_LEVEL_ENV, "info"))
Expand Down Expand Up @@ -451,6 +452,24 @@ async def _do_restart_services(
allow_headers=["*"],
)

# Apply Basic Auth middleware if configured
BASIC_AUTH_USERNAME = os.environ.get("BASIC_AUTH_USERNAME", "")
BASIC_AUTH_PASSWORD = os.environ.get("BASIC_AUTH_PASSWORD", "")
BASIC_AUTH_EXCLUDED = os.environ.get("BASIC_AUTH_EXCLUDED", "")
if BASIC_AUTH_PASSWORD:
excluded_paths = [
p.strip() for p in BASIC_AUTH_EXCLUDED.split(",") if p.strip()
]
app.add_middleware(
BasicAuthMiddleware,
username=BASIC_AUTH_USERNAME,
password=BASIC_AUTH_PASSWORD,
excluded_paths=excluded_paths,
)
logger.info(
f"Basic Auth middleware enabled (excluded: {excluded_paths})"
)


# Console static dir: env, or copaw package data (console), or cwd.
_CONSOLE_STATIC_ENV = "COPAW_CONSOLE_STATIC_DIR"
Expand Down
72 changes: 72 additions & 0 deletions src/copaw/app/auth_middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
"""HTTP Basic Auth middleware for FastAPI."""

import secrets
from typing import Optional

from fastapi import Request, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response


class BasicAuthMiddleware(BaseHTTPMiddleware):
"""Middleware to enforce HTTP Basic Auth on all routes
except excluded paths."""

def __init__(
self,
app,
username: str,
password: str,
excluded_paths: Optional[list] = None,
):
super().__init__(app)
self.username = username
self.password = password
self.excluded_paths = excluded_paths or []
self.security = HTTPBasic(auto_error=False)

def _is_excluded(self, path: str) -> bool:
"""Check if path is excluded from auth."""
for excluded in self.excluded_paths:
if path == excluded or path.startswith(excluded + "/"):
return True
return False

def _verify_credentials(self, credentials: HTTPBasicCredentials) -> bool:
"""Verify username and password using constant-time comparison."""
if not credentials:
return False
is_username_ok = secrets.compare_digest(
credentials.username,
self.username,
)
is_password_ok = secrets.compare_digest(
credentials.password,
self.password,
)
return is_username_ok and is_password_ok

async def dispatch(self, request: Request, call_next) -> Response:
# Skip auth if password is not set
if not self.password:
return await call_next(request)

path = request.url.path

# Skip excluded paths
if self._is_excluded(path):
return await call_next(request)

# Check for basic auth credentials
credentials = await self.security(request)

if not credentials or not self._verify_credentials(credentials):
return Response(
status_code=status.HTTP_401_UNAUTHORIZED,
headers={"WWW-Authenticate": "Basic"},
content="Unauthorized",
)

return await call_next(request)
Loading