Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from graph.infrastructure.age_client import AgeGraphClient
from graph.presentation import routes as graph_routes
from iam.presentation import router as iam_router
from management.presentation import management_router
from infrastructure.database.dependencies import (
close_database_engines,
init_database_engines,
Expand Down Expand Up @@ -216,6 +217,9 @@ async def kartograph_lifespan(app: FastAPI):
# Include IAM bounded context routes
app.include_router(iam_router)

# Include Management bounded context routes
app.include_router(management_router)

# Include dev utility routes (easy to remove for production)
app.include_router(dev_routes.router)

Expand Down
99 changes: 61 additions & 38 deletions src/api/management/application/services/data_source_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from datetime import UTC, datetime

from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from ulid import ULID

Expand All @@ -18,7 +19,7 @@
from management.domain.aggregates import DataSource
from management.domain.entities import DataSourceSyncRun
from management.domain.value_objects import DataSourceId, KnowledgeGraphId
from management.ports.exceptions import UnauthorizedError
from management.ports.exceptions import DuplicateDataSourceNameError, UnauthorizedError
from management.ports.repositories import (
IDataSourceRepository,
IDataSourceSyncRunRepository,
Expand Down Expand Up @@ -150,26 +151,36 @@ async def create(
if kg.tenant_id != self._scope_to_tenant:
raise ValueError(f"Knowledge graph {kg_id} belongs to different tenant")

async with self._session.begin():
ds = DataSource.create(
knowledge_graph_id=kg_id,
tenant_id=self._scope_to_tenant,
name=name,
adapter_type=adapter_type,
connection_config=connection_config,
created_by=user_id,
)

if raw_credentials is not None:
cred_path = f"datasource/{ds.id.value}/credentials"
await self._secret_store.store(
path=cred_path,
try:
async with self._session.begin():
ds = DataSource.create(
knowledge_graph_id=kg_id,
tenant_id=self._scope_to_tenant,
credentials=raw_credentials,
name=name,
adapter_type=adapter_type,
connection_config=connection_config,
created_by=user_id,
)
ds.credentials_path = cred_path

await self._ds_repo.save(ds)
if raw_credentials is not None:
cred_path = f"datasource/{ds.id.value}/credentials"
await self._secret_store.store(
path=cred_path,
tenant_id=self._scope_to_tenant,
credentials=raw_credentials,
)
ds.credentials_path = cred_path

await self._ds_repo.save(ds)
except IntegrityError as e:
Comment on lines +165 to +175
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Avoid writing secrets before the database write is known to succeed.

At Line 167 and Line 339, _secret_store.store() runs before the database work has been flushed/committed. If save() or commit then fails, create leaves an orphaned secret and update can rotate credentials even though the request returns an error. It also keeps the transaction open across external I/O. Flush first, then write the secret, and add compensation if anything after the secret write fails.

Also applies to: 337-347

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/application/services/data_source_service.py` around lines
165 - 175, The secret is being stored via _secret_store.store(...) before the
database save/commit (_ds_repo.save(ds)) completes, which can create orphaned
secrets and prolong transactions; change the flow so you persist/flush/commit
the datasource first (call _ds_repo.save(ds) and ensure the transaction is
committed or flushed) and only then call _secret_store.store(...) and set
ds.credentials_path, minimizing time inside the DB transaction; additionally,
add compensation logic around the secret write (e.g., if any subsequent
operation after storing the secret fails, call _secret_store.delete(...) to
remove the orphaned secret) and ensure error handling around the try/except
block (including the IntegrityError handler) is updated to account for the new
ordering.

if "uq_data_sources_kg_name" in str(e):
self._probe.data_source_creation_failed(
kg_id=kg_id, name=name, error="duplicate name"
)
raise DuplicateDataSourceNameError(
f"Data source '{name}' already exists in knowledge graph '{kg_id}'"
) from e
raise

self._probe.data_source_created(
ds_id=ds.id.value,
Expand Down Expand Up @@ -311,27 +322,39 @@ async def update(
if ds.tenant_id != self._scope_to_tenant:
raise ValueError(f"Data source {ds_id} not found")

async with self._session.begin():
if name is not None or connection_config is not None:
ds.update_connection(
name=name if name is not None else ds.name,
connection_config=connection_config
if connection_config is not None
else ds.connection_config,
credentials_path=ds.credentials_path,
updated_by=user_id,
try:
async with self._session.begin():
if name is not None or connection_config is not None:
ds.update_connection(
name=name if name is not None else ds.name,
connection_config=connection_config
if connection_config is not None
else ds.connection_config,
credentials_path=ds.credentials_path,
updated_by=user_id,
)

if raw_credentials is not None:
cred_path = f"datasource/{ds.id.value}/credentials"
await self._secret_store.store(
path=cred_path,
tenant_id=self._scope_to_tenant,
credentials=raw_credentials,
)
ds.credentials_path = cred_path

await self._ds_repo.save(ds)
except IntegrityError as e:
if "uq_data_sources_kg_name" in str(e):
self._probe.data_source_creation_failed(
kg_id=ds.knowledge_graph_id,
name=name or ds.name,
error="duplicate name",
)

if raw_credentials is not None:
cred_path = f"datasource/{ds.id.value}/credentials"
await self._secret_store.store(
path=cred_path,
tenant_id=self._scope_to_tenant,
credentials=raw_credentials,
)
ds.credentials_path = cred_path

await self._ds_repo.save(ds)
raise DuplicateDataSourceNameError(
f"Data source '{name or ds.name}' already exists in knowledge graph '{ds.knowledge_graph_id}'"
) from e
raise

if name is not None:
self._probe.data_source_updated(ds_id=ds_id, name=name)
Expand Down
23 changes: 16 additions & 7 deletions src/api/management/application/services/knowledge_graph_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,16 +267,16 @@ async def update(
self,
user_id: str,
kg_id: str,
name: str,
description: str,
name: str | None = None,
description: str | None = None,
) -> KnowledgeGraph:
"""Update a knowledge graph's metadata.

Args:
user_id: The user performing the update
kg_id: The knowledge graph ID
name: New name
description: New description
name: Optional new name (uses existing if None)
description: Optional new description (uses existing if None)

Returns:
The updated KnowledgeGraph aggregate
Expand Down Expand Up @@ -310,17 +310,26 @@ async def update(
if kg.tenant_id != self._scope_to_tenant:
raise ValueError(f"Knowledge graph {kg_id} not found")

kg.update(name=name, description=description, updated_by=user_id)
resolved_name = name if name is not None else kg.name
resolved_description = (
description if description is not None else kg.description
)

kg.update(
name=resolved_name,
description=resolved_description,
updated_by=user_id,
)

try:
async with self._session.begin():
await self._kg_repo.save(kg)
except IntegrityError as e:
raise DuplicateKnowledgeGraphNameError(
f"Knowledge graph '{name}' already exists in tenant"
f"Knowledge graph '{resolved_name}' already exists in tenant"
) from e

self._probe.knowledge_graph_updated(kg_id=kg_id, name=name)
self._probe.knowledge_graph_updated(kg_id=kg_id, name=resolved_name)

return kg

Expand Down
22 changes: 22 additions & 0 deletions src/api/management/presentation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Management presentation layer.

Aggregates sub-routers for knowledge graphs and data sources,
exporting a single management_router for registration in main.py.
"""

from __future__ import annotations

from fastapi import APIRouter

from management.presentation.data_sources.routes import router as ds_router
from management.presentation.knowledge_graphs.routes import router as kg_router

management_router = APIRouter(
prefix="/management",
tags=["management"],
)

management_router.include_router(kg_router)
management_router.include_router(ds_router)

__all__ = ["management_router"]
1 change: 1 addition & 0 deletions src/api/management/presentation/data_sources/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Data source presentation sub-package."""
155 changes: 155 additions & 0 deletions src/api/management/presentation/data_sources/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""Request and response models for Data Source API endpoints."""

from __future__ import annotations

from datetime import datetime

from pydantic import BaseModel, Field

from management.domain.aggregates import DataSource
from management.domain.entities import DataSourceSyncRun


class CreateDataSourceRequest(BaseModel):
"""Request to create a data source.

Attributes:
name: Data source name (1-100 characters)
adapter_type: Adapter type string (validated against DataSourceAdapterType in route)
connection_config: Key-value connection configuration
credentials: Optional write-only credentials (never returned in responses)
"""

name: str = Field(min_length=1, max_length=100)
adapter_type: str
connection_config: dict[str, str]
credentials: dict[str, str] | None = None


class UpdateDataSourceRequest(BaseModel):
"""Request to partially update a data source.

Attributes:
name: Optional new name (1-100 characters)
connection_config: Optional new connection configuration
credentials: Optional new credentials (write-only)
"""

name: str | None = Field(default=None, min_length=1, max_length=100)
connection_config: dict[str, str] | None = None
credentials: dict[str, str] | None = None


class DataSourceResponse(BaseModel):
"""Response containing data source details.

Credentials are never returned. Instead, has_credentials indicates
whether credentials have been configured.

Attributes:
id: Data source ID (ULID)
knowledge_graph_id: Parent knowledge graph ID
tenant_id: Tenant ID this data source belongs to
name: Data source name
adapter_type: Adapter type string
connection_config: Connection configuration key-value pairs
has_credentials: Whether credentials are configured
schedule_type: Schedule type (manual, cron, interval)
schedule_value: Schedule expression (None for manual)
last_sync_at: Last successful sync timestamp
created_at: Creation timestamp
updated_at: Last update timestamp
"""

id: str
knowledge_graph_id: str
tenant_id: str
name: str
adapter_type: str
connection_config: dict[str, str]
has_credentials: bool
schedule_type: str
schedule_value: str | None
last_sync_at: datetime | None
created_at: datetime
updated_at: datetime

@classmethod
def from_domain(cls, ds: DataSource) -> DataSourceResponse:
"""Convert domain DataSource aggregate to API response.

Args:
ds: DataSource domain aggregate

Returns:
DataSourceResponse with data source details
"""
return cls(
id=ds.id.value,
knowledge_graph_id=ds.knowledge_graph_id,
tenant_id=ds.tenant_id,
name=ds.name,
adapter_type=ds.adapter_type.value,
connection_config=ds.connection_config,
has_credentials=ds.credentials_path is not None,
schedule_type=ds.schedule.schedule_type.value,
schedule_value=ds.schedule.value,
last_sync_at=ds.last_sync_at,
created_at=ds.created_at,
updated_at=ds.updated_at,
)


class DataSourceListResponse(BaseModel):
"""Response containing a paginated list of data sources.

Attributes:
items: List of data source details
total: Total number of data sources (before pagination)
offset: Number of items skipped
limit: Maximum number of items returned
"""

items: list[DataSourceResponse]
total: int
offset: int
limit: int


class SyncRunResponse(BaseModel):
"""Response containing sync run details.

Attributes:
id: Sync run ID
data_source_id: Data source this sync belongs to
status: Sync run status (pending, running, completed, failed)
started_at: Sync start timestamp
completed_at: Sync completion timestamp (None if not complete)
created_at: Record creation timestamp
"""

id: str
data_source_id: str
status: str
started_at: datetime
completed_at: datetime | None
created_at: datetime

@classmethod
def from_domain(cls, sync_run: DataSourceSyncRun) -> SyncRunResponse:
"""Convert domain DataSourceSyncRun entity to API response.

Args:
sync_run: DataSourceSyncRun domain entity

Returns:
SyncRunResponse with sync run details
"""
return cls(
id=sync_run.id,
data_source_id=sync_run.data_source_id,
status=sync_run.status,
started_at=sync_run.started_at,
completed_at=sync_run.completed_at,
created_at=sync_run.created_at,
)
Loading
Loading