Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions backend/app/ingestion/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import uuid
from datetime import datetime, timezone
UTC = timezone.utc
from typing import Any
from typing import Any, Optional

from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session as DBSession
Expand Down Expand Up @@ -67,7 +67,7 @@ class IngestService:
- Append-only storage
"""

def __init__(self, service_id: str | None = None):
def __init__(self, service_id: Optional[str] = None):
"""
Initialize ingestion service.

Expand Down Expand Up @@ -100,10 +100,10 @@ def __init__(self, service_id: str | None = None):

def start_session(
self,
session_id_str: str | None = None,
session_id_str: Optional[str] = None,
authority: str = "server",
agent_name: str | None = None,
user_id: int | None = None,
agent_name: Optional[str] = None,
user_id: Optional[int] = None,
) -> str:
"""
Start a new session with specified authority.
Expand Down Expand Up @@ -142,6 +142,7 @@ def start_session(
agent_name=agent_name,
user_id=user_id,
ingestion_service_id=self.service_id if authority == "server" else None,
is_replay=False,
)

db.add(session)
Expand All @@ -157,7 +158,7 @@ def append_events(
self,
session_id: str,
events: list[dict[str, Any]],
db: DBSession | None = None,
db: Optional[DBSession] = None,
) -> dict[str, Any]:
"""
Append events to session with constitutional validation.
Expand Down Expand Up @@ -247,7 +248,8 @@ def append_events(
# Store in database
event_chain = EventChain(
event_id=uuid.UUID(event_envelope["event_id"]),
session_id=session.session_id_str,
session_id=session.id,
session_id_str=session.session_id_str,
sequence_number=event_envelope["sequence_number"],
timestamp_wall=_parse_wall_timestamp(event_envelope["timestamp_wall"]),
timestamp_monotonic=event_data.get("timestamp_monotonic", 0),
Expand Down Expand Up @@ -429,7 +431,7 @@ def seal_session(self, session_id: str) -> dict[str, Any]:

# --- Private helper methods ---

def _get_last_event_hash(self, db: DBSession, session: Session) -> str | None:
def _get_last_event_hash(self, db: DBSession, session: Session) -> Optional[str]:
"""Get hash of last event in session, or None if no events."""
last_event = (
db.query(EventChain)
Expand Down Expand Up @@ -520,8 +522,8 @@ def _emit_log_drop(
session: Session,
reason: str,
dropped_count: int,
first_missing_seq: int | None = None,
last_missing_seq: int | None = None,
first_missing_seq: Optional[int] = None,
last_missing_seq: Optional[int] = None,
owns_session: bool = True,
) -> None:
"""
Expand Down
Loading