Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@ VERSION ?= 0.0.0-$(COMMIT)$(DIRTY)
TAG ?= $(VERSION)
IMAGE = $(REPO)/rancher-ai-agent:$(TAG)

help:
@echo "Usage: make <target>"
@echo ""
@echo "Targets:"
@echo " build-image Build production image locally"
@echo " push-image Build and push multi-arch production image"
@echo " test Run unit tests in container"
@echo " help Show this help"

build-image:
docker buildx build \
--file package/Dockerfile \
--platform=${TARGET_PLATFORMS} \
-t ${IMAGE} \
--load \
.

push-image:
docker buildx build \
${IID_FILE_FLAG} \
Expand All @@ -27,4 +44,14 @@ push-image:
--attest type=provenance,mode=max \
-t ${IMAGE} \
--push \
.
.

test:
docker buildx build \
--file package/Dockerfile.test \
--platform=${TARGET_PLATFORMS} \
-t $(IMAGE)-test \
--load \
. && docker run --rm $(IMAGE)-test

.PHONY: help build-image push-image test
72 changes: 56 additions & 16 deletions app/services/auth.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,95 @@
import logging
import httpx
import os
import ssl
from urllib.parse import urlparse
import httpx
from fastapi import Request

async def get_user_id(host: str, token: str) -> str:
SA_CA_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"


def _parse_insecure_skip_tls() -> bool:
raw = os.environ.get("INSECURE_SKIP_TLS", "false").strip().lower()
if raw in ("true", "false"):
return raw == "true"
raise ValueError(f"INSECURE_SKIP_TLS must be 'true' or 'false', got: {raw!r}")


def _build_ssl_context() -> ssl.SSLContext | bool:
if _parse_insecure_skip_tls():
logging.warning("TLS verification disabled by INSECURE_SKIP_TLS")
return False

ctx = ssl.create_default_context()

ssl_cert_file = os.environ.get("SSL_CERT_FILE")
if ssl_cert_file:
if not os.path.isfile(ssl_cert_file):
raise FileNotFoundError(
f"SSL_CERT_FILE is set but file does not exist: {ssl_cert_file}"
)
ctx.load_verify_locations(ssl_cert_file)

if os.path.isfile(SA_CA_PATH):
ctx.load_verify_locations(SA_CA_PATH)

return ctx


async def get_user_id(host: str, token: str) -> str | None:
"""
Retrieves the user ID from the Rancher API using the session token.
"""
url = f"{host}/v3/users?me=true"

try:
async with httpx.AsyncClient(timeout=5.0, verify=False) as client:
verify = _build_ssl_context()
except Exception as e:
logging.error("TLS configuration error: %s", e)
return None

try:
async with httpx.AsyncClient(timeout=5.0, verify=verify) as client:
resp = await client.get(url, headers={
"Cookie": f"R_SESS={token}",
"Accept": "application/json",
})
payload = resp.json()

if (payload.get("type") == "error") or (resp.status_code != 200) or ("data" not in payload) or (len(payload["data"]) == 0):

if (
payload.get("type") == "error"
or resp.status_code != 200
or "data" not in payload
or len(payload["data"]) == 0
):
logging.error("user API returned error: %s - %s", resp.status_code, payload)
raise Exception("Failed to retrieve user ID from Rancher API")

user_id = payload["data"][0]["id"]

if user_id:
logging.info("user API returned: %s - userId %s", resp.status_code, user_id)

return user_id

except Exception as e:
logging.error("user API call failed: %s", e)

return None

async def get_user_id_from_request(request: Request) -> str:

async def get_user_id_from_request(request: Request) -> str | None:
"""
Retrieves the user ID from the Rancher API using the session token from the request cookies.
"""
rancher_url = os.environ.get("RANCHER_URL", "")
token = request.cookies.get("R_SESS")

host = ""
if not token:
logging.warning("R_SESS cookie not found")
return None

rancher_url = os.environ.get("RANCHER_URL", "")
if rancher_url:
parsed = urlparse(rancher_url)
scheme = parsed.scheme or "https"
netloc = parsed.netloc
host = f"{scheme}://{netloc}"
host = f"{scheme}://{parsed.netloc or rancher_url}"
else:
host = "https://rancher.cattle-system.svc"

return await get_user_id(host, token)
return await get_user_id(host, token)
19 changes: 19 additions & 0 deletions package/Dockerfile.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
ARG PYTHON_VERSION=3.14

FROM registry.suse.com/bci/python:${PYTHON_VERSION}
ARG PYTHON_VERSION

RUN pip install --no-cache-dir uv
COPY pyproject.toml .
RUN uv pip install --no-cache --system \
--python /usr/bin/python${PYTHON_VERSION} \
-r pyproject.toml \
--group test

WORKDIR /app
COPY . .

ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1

CMD ["python3", "-m", "pytest", "tests/unit", "-v"]
129 changes: 129 additions & 0 deletions tests/unit/services/test_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import os
import ssl
import pytest
from unittest.mock import patch, AsyncMock, MagicMock

from app.services.auth import _build_ssl_context, _parse_insecure_skip_tls, get_user_id, get_user_id_from_request


class TestParseInsecureSkipTls:
def test_true(self):
with patch.dict(os.environ, {"INSECURE_SKIP_TLS": "true"}):
assert _parse_insecure_skip_tls() is True

def test_false(self):
with patch.dict(os.environ, {"INSECURE_SKIP_TLS": "false"}):
assert _parse_insecure_skip_tls() is False

def test_unset_defaults_false(self):
with patch.dict(os.environ, {}, clear=True):
assert _parse_insecure_skip_tls() is False

def test_garbage_raises(self):
with patch.dict(os.environ, {"INSECURE_SKIP_TLS": "yes"}):
with pytest.raises(ValueError, match="must be 'true' or 'false'"):
_parse_insecure_skip_tls()


class TestBuildSSLContext:
def test_default_returns_verified_context(self):
with patch.dict(os.environ, {}, clear=True):
ctx = _build_ssl_context()
assert isinstance(ctx, ssl.SSLContext)
assert ctx.check_hostname is True
assert ctx.verify_mode == ssl.CERT_REQUIRED
assert len(ctx.get_ca_certs()) > 0

def test_insecure_skip_returns_false(self):
with patch.dict(os.environ, {"INSECURE_SKIP_TLS": "true"}, clear=True):
assert _build_ssl_context() is False

def test_ssl_cert_file_loads_custom_ca(self, tmp_path):
ca = tmp_path / "ca.pem"
ca.write_text(TEST_CA_PEM)
with patch.dict(os.environ, {"SSL_CERT_FILE": str(ca)}, clear=True):
assert isinstance(_build_ssl_context(), ssl.SSLContext)

def test_ssl_cert_file_missing_raises(self):
with patch.dict(os.environ, {"SSL_CERT_FILE": "/no/such/file.pem"}, clear=True):
with pytest.raises(FileNotFoundError, match="does not exist"):
_build_ssl_context()


class TestGetUserId:
@pytest.mark.asyncio
async def test_returns_user_id_on_success(self):
result = await get_user_id("https://rancher.example.com", "tok")
assert result == "user-abc123"

@pytest.mark.asyncio
async def test_returns_none_on_api_error(self):
self._mock_response.status_code = 401
self._mock_response.json.return_value = {"type": "error"}
result = await get_user_id("https://rancher.example.com", "bad")
assert result is None

@pytest.mark.asyncio
async def test_returns_none_on_connection_error(self):
self._mock_client.get.side_effect = Exception("Connection refused")
result = await get_user_id("https://rancher.example.com", "tok")
assert result is None

@pytest.fixture(autouse=True)
def _patch_httpx(self):
self._mock_response = MagicMock(
status_code=200,
json=MagicMock(return_value={"data": [{"id": "user-abc123"}]}),
)
self._mock_client = AsyncMock()
self._mock_client.get.return_value = self._mock_response
with patch("app.services.auth.httpx.AsyncClient") as cls:
cls.return_value.__aenter__ = AsyncMock(return_value=self._mock_client)
cls.return_value.__aexit__ = AsyncMock(return_value=False)
yield


class TestGetUserIdFromRequest:
@pytest.mark.asyncio
async def test_returns_none_when_no_cookie(self):
request = MagicMock(cookies={})
assert await get_user_id_from_request(request) is None

@pytest.mark.asyncio
async def test_uses_rancher_url_env(self):
request = MagicMock(cookies={"R_SESS": "tok"})
with patch.dict(os.environ, {"RANCHER_URL": "https://rancher.prod.example.com"}), \
patch("app.services.auth.get_user_id", new_callable=AsyncMock, return_value="u1") as mock_get:
await get_user_id_from_request(request)
mock_get.assert_called_once_with("https://rancher.prod.example.com", "tok")

@pytest.mark.asyncio
async def test_normalizes_url_without_scheme(self):
request = MagicMock(cookies={"R_SESS": "tok"})
with patch.dict(os.environ, {"RANCHER_URL": "rancher.example.com"}), \
patch("app.services.auth.get_user_id", new_callable=AsyncMock, return_value="u1") as mock_get:
await get_user_id_from_request(request)
mock_get.assert_called_once_with("https://rancher.example.com", "tok")

@pytest.mark.asyncio
async def test_falls_back_to_internal_svc(self):
request = MagicMock(cookies={"R_SESS": "tok"})
with patch.dict(os.environ, {}, clear=True), \
patch("app.services.auth.get_user_id", new_callable=AsyncMock, return_value="u1") as mock_get:
await get_user_id_from_request(request)
mock_get.assert_called_once_with("https://rancher.cattle-system.svc", "tok")


# openssl req -x509 -newkey ec -pkeyopt ec_paramgen_curve:prime256v1 \
# -keyout /dev/null -out - -days 3650 -nodes -subj '/CN=test-ca'
TEST_CA_PEM = """\
-----BEGIN CERTIFICATE-----
MIIBeTCCAR+gAwIBAgIUS6up8UzE6XhsSSn6DsLbYcuwCF8wCgYIKoZIzj0EAwIw
EjEQMA4GA1UEAwwHdGVzdC1jYTAeFw0yNjA1MjkxMjE0MzNaFw0yNjA1MzAxMjE0
MzNaMBIxEDAOBgNVBAMMB3Rlc3QtY2EwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNC
AASG176iTWW2SuUOtZnloMy1+xfNH/ZYOsjsc69XgU8OPK9yG1K8sIsx+KfOc9Rv
cEL9YP6RljfHn4Y9vOsHJZhDo1MwUTAdBgNVHQ4EFgQUpUAe+TFKeo2qxMELe53R
ZouQ6R0wHwYDVR0jBBgwFoAUpUAe+TFKeo2qxMELe53RZouQ6R0wDwYDVR0TAQH/
BAUwAwEB/zAKBggqhkjOPQQDAgNIADBFAiA9yYtS/Os2ETOt2F0c834l8WvTUMCo
XIuSPIOGvZDpEgIhAJ5+iN+UoIyC7VtHJZL6WT/NVmXKgO08e6aOGfiXdPuX
-----END CERTIFICATE-----"""
Loading