diff --git a/backend/app/ingestion/service.py b/backend/app/ingestion/service.py index a74e3c2..287c417 100644 --- a/backend/app/ingestion/service.py +++ b/backend/app/ingestion/service.py @@ -13,7 +13,7 @@ import uuid from datetime import datetime, timezone UTC = timezone.utc -from typing import Any +from typing import Any, Optional from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session as DBSession @@ -67,19 +67,14 @@ class IngestService: - Append-only storage """ - def __init__(self, service_id: str | None = None): + def __init__(self, service_id: Optional[str] = None): """ - Initialize ingestion service. - - CONSTITUTIONAL REQUIREMENT: service_id is IMMUTABLE. - - Set once at service startup - - NOT configurable per request - - Ideally derived from deployment identity - - Args: - service_id: Static ingestion service identifier for CHAIN_SEAL. - If not provided, reads from INGESTION_SERVICE_ID env var. - Defaults to "default-ingest-01" if neither provided. + Freeze and record the ingestion service identifier used for CHAIN_SEAL operations. + + If provided, the given service_id becomes the immutable identifier for the service; otherwise the value is read from the INGESTION_SERVICE_ID environment variable and falls back to "default-ingest-01". This identifier is persisted on the instance and must not change for the lifetime of the process because CHAIN_SEAL integrity depends on a stable service identity. + + Parameters: + service_id (Optional[str]): Static ingestion service identifier for CHAIN_SEAL. If omitted, the environment variable INGESTION_SERVICE_ID is used, with a final fallback of "default-ingest-01". """ # Freeze service identity at startup if service_id: @@ -100,25 +95,25 @@ def __init__(self, service_id: str | None = None): def start_session( self, - session_id_str: str | None = None, + session_id_str: Optional[str] = None, authority: str = "server", - agent_name: str | None = None, - user_id: int | None = None, + agent_name: Optional[str] = None, + user_id: Optional[int] = None, ) -> str: """ - Start a new session with specified authority. - - Args: - session_id_str: UUID string from SDK (optional, will generate if not provided) - authority: "server" or "sdk" - agent_name: Optional agent identifier - user_id: Optional user ID for legacy compatibility - + Start a new ingestion session and persist it to the database. + + Parameters: + session_id_str (Optional[str]): Optional UUID string to use for the session; a new UUID is generated if not provided. + authority (str): Either "server" or "sdk", determining the session's ChainAuthority and whether the ingestion service id is recorded. + agent_name (Optional[str]): Optional agent identifier associated with the session. + user_id (Optional[int]): Optional legacy user identifier. + Returns: - session_id as string (UUID) - + str: The created session's UUID string. + Raises: - ValueError: If authority is invalid + ValueError: If `authority` is not "server" or "sdk". """ # Validate authority if authority not in ["server", "sdk"]: @@ -142,6 +137,7 @@ def start_session( agent_name=agent_name, user_id=user_id, ingestion_service_id=self.service_id if authority == "server" else None, + is_replay=False, ) db.add(session) @@ -157,38 +153,27 @@ def append_events( self, session_id: str, events: list[dict[str, Any]], - db: DBSession | None = None, + db: Optional[DBSession] = None, ) -> dict[str, Any]: """ - Append events to session with constitutional validation. - - CRITICAL OPERATIONS: - 1. Server-side hash recomputation (ignore SDK hashes) - 2. Strict sequence validation (hard rejection) - 3. Atomic commit (all or none) - - TRANSACTION BOUNDARY: - - When db is None: creates/commits/closes own session (backwards-compatible) - - When db is provided: uses caller's session, does NOT commit or close - Caller is responsible for transaction control. - - Args: - session_id: Session UUID string - events: List of event dictionaries from SDK - db: Optional external DB session for transaction control. - When provided, IngestService does NOT commit or close. - + Append a batch of events to a session with constitutional validation and atomic commit. + + Performs server-side payload and event hash recomputation, enforces strict in-order sequence validation (hard rejection for gaps/duplicates), and persists all events atomically when the service owns the transaction. + + Parameters: + db (Optional[DBSession]): External DB session to use for the operation. If provided, the caller retains transaction control and this method will not commit or close the session; if omitted, the method opens, commits, and closes its own DB session. + Returns: - dict with: - 'status': 'success' - 'accepted_count': int - 'final_hash': str - 'committed_events': list[dict] — canonical committed event representations - for downstream consumers (e.g., PolicyEngine) - + dict: { + 'status': 'success', + 'accepted_count': int, # number of events accepted + 'final_hash': str, # event_hash of the last appended event (or genesis hash if none) + 'committed_events': list[dict] # canonical representations of committed events for downstream consumers + } + Raises: - SequenceViolation: On sequence gaps/duplicates - ValueError: On validation failures + SequenceViolation: If event sequence validation fails (gaps or duplicates). + ValueError: On validation errors (e.g., missing/invalid session, inactive session) or database integrity errors. """ owns_session = db is None if owns_session: @@ -247,7 +232,8 @@ def append_events( # Store in database event_chain = EventChain( event_id=uuid.UUID(event_envelope["event_id"]), - session_id=session.session_id_str, + session_id=session.id, + session_id_str=session.session_id_str, sequence_number=event_envelope["sequence_number"], timestamp_wall=_parse_wall_timestamp(event_envelope["timestamp_wall"]), timestamp_monotonic=event_data.get("timestamp_monotonic", 0), @@ -429,8 +415,13 @@ def seal_session(self, session_id: str) -> dict[str, Any]: # --- Private helper methods --- - def _get_last_event_hash(self, db: DBSession, session: Session) -> str | None: - """Get hash of last event in session, or None if no events.""" + def _get_last_event_hash(self, db: DBSession, session: Session) -> Optional[str]: + """ + Retrieve the hash of the last event for a session. + + Returns: + str: `verifier_core.GENESIS_HASH` if the session has no events, otherwise the last event's hash. + """ last_event = ( db.query(EventChain) .filter(EventChain.session_id == session.session_id_str) @@ -520,8 +511,8 @@ def _emit_log_drop( session: Session, reason: str, dropped_count: int, - first_missing_seq: int | None = None, - last_missing_seq: int | None = None, + first_missing_seq: Optional[int] = None, + last_missing_seq: Optional[int] = None, owns_session: bool = True, ) -> None: """ @@ -608,4 +599,4 @@ def _emit_log_drop( # CRITICAL: Only commit if we own the session. # When owns_session=False, caller controls transaction boundary. if owns_session: - db.commit() + db.commit() \ No newline at end of file