Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 52 additions & 61 deletions backend/app/ingestion/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import uuid
from datetime import datetime, timezone
UTC = timezone.utc
from typing import Any
from typing import Any, Optional

from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session as DBSession
Expand Down Expand Up @@ -67,19 +67,14 @@ class IngestService:
- Append-only storage
"""

def __init__(self, service_id: str | None = None):
def __init__(self, service_id: Optional[str] = None):
"""
Initialize ingestion service.

CONSTITUTIONAL REQUIREMENT: service_id is IMMUTABLE.
- Set once at service startup
- NOT configurable per request
- Ideally derived from deployment identity

Args:
service_id: Static ingestion service identifier for CHAIN_SEAL.
If not provided, reads from INGESTION_SERVICE_ID env var.
Defaults to "default-ingest-01" if neither provided.
Freeze and record the ingestion service identifier used for CHAIN_SEAL operations.

If provided, the given service_id becomes the immutable identifier for the service; otherwise the value is read from the INGESTION_SERVICE_ID environment variable and falls back to "default-ingest-01". This identifier is persisted on the instance and must not change for the lifetime of the process because CHAIN_SEAL integrity depends on a stable service identity.

Parameters:
service_id (Optional[str]): Static ingestion service identifier for CHAIN_SEAL. If omitted, the environment variable INGESTION_SERVICE_ID is used, with a final fallback of "default-ingest-01".
"""
# Freeze service identity at startup
if service_id:
Expand All @@ -100,25 +95,25 @@ def __init__(self, service_id: str | None = None):

def start_session(
self,
session_id_str: str | None = None,
session_id_str: Optional[str] = None,
authority: str = "server",
agent_name: str | None = None,
user_id: int | None = None,
agent_name: Optional[str] = None,
user_id: Optional[int] = None,
) -> str:
"""
Start a new session with specified authority.

Args:
session_id_str: UUID string from SDK (optional, will generate if not provided)
authority: "server" or "sdk"
agent_name: Optional agent identifier
user_id: Optional user ID for legacy compatibility

Start a new ingestion session and persist it to the database.
Parameters:
session_id_str (Optional[str]): Optional UUID string to use for the session; a new UUID is generated if not provided.
authority (str): Either "server" or "sdk", determining the session's ChainAuthority and whether the ingestion service id is recorded.
agent_name (Optional[str]): Optional agent identifier associated with the session.
user_id (Optional[int]): Optional legacy user identifier.
Returns:
session_id as string (UUID)

str: The created session's UUID string.
Raises:
ValueError: If authority is invalid
ValueError: If `authority` is not "server" or "sdk".
"""
# Validate authority
if authority not in ["server", "sdk"]:
Expand All @@ -142,6 +137,7 @@ def start_session(
agent_name=agent_name,
user_id=user_id,
ingestion_service_id=self.service_id if authority == "server" else None,
is_replay=False,
)

db.add(session)
Expand All @@ -157,38 +153,27 @@ def append_events(
self,
session_id: str,
events: list[dict[str, Any]],
db: DBSession | None = None,
db: Optional[DBSession] = None,
) -> dict[str, Any]:
"""
Append events to session with constitutional validation.

CRITICAL OPERATIONS:
1. Server-side hash recomputation (ignore SDK hashes)
2. Strict sequence validation (hard rejection)
3. Atomic commit (all or none)

TRANSACTION BOUNDARY:
- When db is None: creates/commits/closes own session (backwards-compatible)
- When db is provided: uses caller's session, does NOT commit or close
Caller is responsible for transaction control.

Args:
session_id: Session UUID string
events: List of event dictionaries from SDK
db: Optional external DB session for transaction control.
When provided, IngestService does NOT commit or close.

Append a batch of events to a session with constitutional validation and atomic commit.

Performs server-side payload and event hash recomputation, enforces strict in-order sequence validation (hard rejection for gaps/duplicates), and persists all events atomically when the service owns the transaction.

Parameters:
db (Optional[DBSession]): External DB session to use for the operation. If provided, the caller retains transaction control and this method will not commit or close the session; if omitted, the method opens, commits, and closes its own DB session.

Returns:
dict with:
'status': 'success'
'accepted_count': int
'final_hash': str
'committed_events': list[dict] canonical committed event representations
for downstream consumers (e.g., PolicyEngine)

dict: {
'status': 'success',
'accepted_count': int, # number of events accepted
'final_hash': str, # event_hash of the last appended event (or genesis hash if none)
'committed_events': list[dict] # canonical representations of committed events for downstream consumers
}
Raises:
SequenceViolation: On sequence gaps/duplicates
ValueError: On validation failures
SequenceViolation: If event sequence validation fails (gaps or duplicates).
ValueError: On validation errors (e.g., missing/invalid session, inactive session) or database integrity errors.
"""
owns_session = db is None
if owns_session:
Expand Down Expand Up @@ -247,7 +232,8 @@ def append_events(
# Store in database
event_chain = EventChain(
event_id=uuid.UUID(event_envelope["event_id"]),
session_id=session.session_id_str,
session_id=session.id,
session_id_str=session.session_id_str,
sequence_number=event_envelope["sequence_number"],
timestamp_wall=_parse_wall_timestamp(event_envelope["timestamp_wall"]),
timestamp_monotonic=event_data.get("timestamp_monotonic", 0),
Expand Down Expand Up @@ -429,8 +415,13 @@ def seal_session(self, session_id: str) -> dict[str, Any]:

# --- Private helper methods ---

def _get_last_event_hash(self, db: DBSession, session: Session) -> str | None:
"""Get hash of last event in session, or None if no events."""
def _get_last_event_hash(self, db: DBSession, session: Session) -> Optional[str]:
"""
Retrieve the hash of the last event for a session.

Returns:
str: `verifier_core.GENESIS_HASH` if the session has no events, otherwise the last event's hash.
"""
last_event = (
db.query(EventChain)
.filter(EventChain.session_id == session.session_id_str)
Expand Down Expand Up @@ -520,8 +511,8 @@ def _emit_log_drop(
session: Session,
reason: str,
dropped_count: int,
first_missing_seq: int | None = None,
last_missing_seq: int | None = None,
first_missing_seq: Optional[int] = None,
last_missing_seq: Optional[int] = None,
owns_session: bool = True,
) -> None:
"""
Expand Down Expand Up @@ -608,4 +599,4 @@ def _emit_log_drop(
# CRITICAL: Only commit if we own the session.
# When owns_session=False, caller controls transaction boundary.
if owns_session:
db.commit()
db.commit()
Loading