Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions hindsight-api-slim/hindsight_api/api/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -1431,12 +1431,37 @@ class BankStatsResponse(BaseModel):
links_breakdown: dict[str, dict[str, int]]
pending_operations: int
failed_operations: int
operations_by_status: dict[str, int] = Field(
default_factory=dict,
description="Async operations grouped by status (pending, in_progress, completed, failed, cancelled).",
)
# Consolidation stats
last_consolidated_at: str | None = Field(default=None, description="When consolidation last ran (ISO format)")
pending_consolidation: int = Field(default=0, description="Number of memories not yet processed into observations")
total_observations: int = Field(default=0, description="Total number of observations")


class MemoryTimeseriesBucket(BaseModel):
"""One bucket in the memory ingestion time-series."""

time: str = Field(description="Bucket start timestamp in ISO-8601 (UTC).")
world: int = Field(default=0, description="World-fact memories ingested in this bucket.")
experience: int = Field(default=0, description="Experience memories ingested in this bucket.")
observation: int = Field(default=0, description="Observations recorded in this bucket.")


class MemoriesTimeseriesResponse(BaseModel):
"""Time-series of memory ingestion bucketed by time and fact type."""

bank_id: str
period: str = Field(description="One of: 1h, 12h, 1d, 7d, 30d, 90d.")
trunc: str = Field(description="Bucket granularity: minute, hour, day.")
buckets: list[MemoryTimeseriesBucket] = Field(
default_factory=list,
description="Per-bucket counts, always returned fully padded for the requested period.",
)


# Mental Model models


Expand Down Expand Up @@ -3303,6 +3328,7 @@ async def api_stats(
links_breakdown=links_breakdown,
pending_operations=ops.get("pending", 0),
failed_operations=ops.get("failed", 0),
operations_by_status=ops,
last_consolidated_at=stats["last_consolidated_at"],
pending_consolidation=stats["pending_consolidation"],
total_observations=stats["total_observations"],
Expand All @@ -3318,6 +3344,35 @@ async def api_stats(
logger.error(f"Error in /v1/default/banks/{bank_id}/stats: {error_detail}")
raise HTTPException(status_code=500, detail=str(e))

@app.get(
"/v1/default/banks/{bank_id}/stats/memories-timeseries",
response_model=MemoriesTimeseriesResponse,
summary="Memory ingestion time-series",
description="Memories ingested over a period, bucketed by time and broken down by fact type.",
operation_id="get_memories_timeseries",
tags=["Banks"],
)
async def api_memories_timeseries(
bank_id: str,
period: str = "7d",
request_context: RequestContext = Depends(get_request_context),
):
try:
data = await app.state.memory.get_memories_timeseries(
bank_id, period=period, request_context=request_context
)
return MemoriesTimeseriesResponse(**data)
except OperationValidationError as e:
raise HTTPException(status_code=e.status_code, detail=e.reason)
except (AuthenticationError, HTTPException):
raise
except Exception as e:
import traceback

error_detail = f"{str(e)}\n\nTraceback:\n{traceback.format_exc()}"
logger.error(f"Error in /v1/default/banks/{bank_id}/stats/memories-timeseries: {error_detail}")
raise HTTPException(status_code=500, detail=str(e))

@app.get(
"/v1/default/banks/{bank_id}/entities",
response_model=EntityListResponse,
Expand Down
122 changes: 122 additions & 0 deletions hindsight-api-slim/hindsight_api/engine/memory_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,44 @@ def _get_tiktoken_encoding():
return _TIKTOKEN_ENCODING


@dataclass(frozen=True)
class _TimeseriesPeriodConfig:
"""How one period slices the time axis for the memories-ingested chart."""

interval: str # postgres interval literal used in the `now() - interval '...'` filter
trunc: str # date_trunc unit (minute/hour/day)
step: timedelta # distance between adjacent buckets
count: int # total buckets rendered for the period


_MEMORIES_TIMESERIES_PERIODS: dict[str, _TimeseriesPeriodConfig] = {
"1h": _TimeseriesPeriodConfig("1 hour", "minute", timedelta(minutes=1), 60),
"12h": _TimeseriesPeriodConfig("12 hours", "hour", timedelta(hours=1), 12),
"1d": _TimeseriesPeriodConfig("24 hours", "hour", timedelta(hours=1), 24),
"7d": _TimeseriesPeriodConfig("7 days", "day", timedelta(days=1), 7),
"30d": _TimeseriesPeriodConfig("30 days", "day", timedelta(days=1), 30),
"90d": _TimeseriesPeriodConfig("90 days", "day", timedelta(days=1), 90),
}


@dataclass
class MemoryTimeseriesBucketData:
"""One bucket of the memories-ingested time series (engine-side)."""

time: str
world: int = 0
experience: int = 0
observation: int = 0

def as_dict(self) -> dict[str, Any]:
return {
"time": self.time,
"world": self.world,
"experience": self.experience,
"observation": self.observation,
}


@dataclass(frozen=True)
class RefreshTagFiltering:
"""Resolved tag filtering parameters for mental model refresh."""
Expand Down Expand Up @@ -6163,6 +6201,90 @@ async def get_bank_stats(
"total_observations": node_counts.get("observation", 0),
}

async def get_memories_timeseries(
self,
bank_id: str,
*,
period: str,
request_context: "RequestContext",
) -> dict[str, Any]:
"""Memory ingestion bucketed by time, broken down by fact_type.

Always returns the full expected bucket set for the period so the
chart line is continuous (empty buckets show as zeros). Buckets are
anchored on UTC boundaries — we do this (rather than the PG session
timezone) so the API response is deterministic regardless of where
the database is deployed, and so the control-plane chart can match
buckets by ISO key on the client side.
"""
await self._authenticate_tenant(request_context)
if self._operation_validator:
from hindsight_api.extensions import BankReadContext

ctx = BankReadContext(bank_id=bank_id, operation="get_memories_timeseries", request_context=request_context)
await self._validate_operation(self._operation_validator.validate_bank_read(ctx))

cfg = _MEMORIES_TIMESERIES_PERIODS.get(period) or _MEMORIES_TIMESERIES_PERIODS["7d"]
if period not in _MEMORIES_TIMESERIES_PERIODS:
period = "7d"

pool = await self._get_pool()
async with acquire_with_retry(pool) as conn:
rows = await conn.fetch(
f"""
SELECT date_trunc('{cfg.trunc}', created_at AT TIME ZONE 'UTC') AS bucket,
fact_type, COUNT(*) AS count
FROM {fq_table("memory_units")}
WHERE bank_id = $1
AND created_at >= now() - interval '{cfg.interval}'
GROUP BY bucket, fact_type
ORDER BY bucket
""",
bank_id,
)

# Build the canonical bucket list anchored on the most recent UTC boundary.
now_utc = datetime.utcnow()
if cfg.trunc == "minute":
end = now_utc.replace(second=0, microsecond=0)
elif cfg.trunc == "hour":
end = now_utc.replace(minute=0, second=0, microsecond=0)
else:
end = now_utc.replace(hour=0, minute=0, second=0, microsecond=0)

buckets: list[MemoryTimeseriesBucketData] = []
by_iso: dict[str, MemoryTimeseriesBucketData] = {}
for i in range(cfg.count):
t = end - cfg.step * (cfg.count - 1 - i)
entry = MemoryTimeseriesBucketData(time=t.isoformat())
buckets.append(entry)
by_iso[entry.time] = entry

for row in rows:
# asyncpg hands us a tz-aware datetime when the column is timestamptz.
# Normalize to the naive-UTC format we used for the dict keys.
bucket_dt = row["bucket"]
if bucket_dt.tzinfo is not None:
bucket_dt = bucket_dt.astimezone(timezone.utc).replace(tzinfo=None)
entry = by_iso.get(bucket_dt.isoformat())
if entry is None:
# Row fell outside the requested window (clock skew / edge case).
continue
ft = row["fact_type"]
if ft == "world":
entry.world += row["count"]
elif ft == "experience":
entry.experience += row["count"]
elif ft == "observation":
entry.observation += row["count"]

return {
"bank_id": bank_id,
"period": period,
"trunc": cfg.trunc,
"buckets": [b.as_dict() for b in buckets],
}

async def get_entity(
self,
bank_id: str,
Expand Down
170 changes: 170 additions & 0 deletions hindsight-api-slim/tests/test_bank_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
"""
Tests for the bank stats endpoint and the memories-timeseries endpoint.

Covers the new fields exposed by GET /v1/default/banks/{bank_id}/stats
(operations_by_status) and the new endpoint
GET /v1/default/banks/{bank_id}/stats/memories-timeseries.
"""
from datetime import datetime

import httpx
import pytest
import pytest_asyncio

from hindsight_api.api import create_app


@pytest_asyncio.fixture
async def api_client(memory):
app = create_app(memory, initialize_memory=False)
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
yield client


@pytest.fixture
def test_bank_id():
return f"stats_test_{datetime.now().timestamp()}"


@pytest.mark.asyncio
async def test_bank_stats_exposes_operations_by_status(api_client, test_bank_id):
"""/stats should return operations_by_status with all finished operations."""
try:
# Kick off a retain so at least one completed operation exists.
response = await api_client.post(
f"/v1/default/banks/{test_bank_id}/memories",
json={"items": [{"content": "Alice is a software engineer.", "context": "team"}]},
)
assert response.status_code == 200

response = await api_client.get(f"/v1/default/banks/{test_bank_id}/stats")
assert response.status_code == 200
stats = response.json()

assert "operations_by_status" in stats
assert isinstance(stats["operations_by_status"], dict)
# A synchronous retain finishes as "completed".
assert stats["operations_by_status"].get("completed", 0) >= 1
# pending/failed counters should still be present as scalar mirrors.
assert stats["pending_operations"] == stats["operations_by_status"].get("pending", 0)
assert stats["failed_operations"] == stats["operations_by_status"].get("failed", 0)
finally:
await api_client.delete(f"/v1/default/banks/{test_bank_id}")


@pytest.mark.asyncio
@pytest.mark.parametrize(
"period,expected_count,expected_trunc",
[
("1h", 60, "minute"),
("12h", 12, "hour"),
("1d", 24, "hour"),
("7d", 7, "day"),
("30d", 30, "day"),
("90d", 90, "day"),
],
)
async def test_memories_timeseries_periods(
api_client, test_bank_id, period, expected_count, expected_trunc
):
"""Every period must return the full expected bucket count and trunc."""
try:
response = await api_client.post(
f"/v1/default/banks/{test_bank_id}/memories",
json={"items": [{"content": "Bob works on infrastructure.", "context": "team"}]},
)
assert response.status_code == 200

response = await api_client.get(
f"/v1/default/banks/{test_bank_id}/stats/memories-timeseries",
params={"period": period},
)
assert response.status_code == 200
body = response.json()

assert body["bank_id"] == test_bank_id
assert body["period"] == period
assert body["trunc"] == expected_trunc
assert len(body["buckets"]) == expected_count

for bucket in body["buckets"]:
assert "time" in bucket
assert bucket["world"] >= 0
assert bucket["experience"] >= 0
assert bucket["observation"] >= 0
finally:
await api_client.delete(f"/v1/default/banks/{test_bank_id}")


@pytest.mark.asyncio
async def test_memories_timeseries_invalid_period_falls_back(api_client, test_bank_id):
"""An unknown period must fall back to the 7d default."""
try:
response = await api_client.get(
f"/v1/default/banks/{test_bank_id}/stats/memories-timeseries",
params={"period": "nonsense"},
)
assert response.status_code == 200
body = response.json()
assert body["period"] == "7d"
assert body["trunc"] == "day"
assert len(body["buckets"]) == 7
finally:
await api_client.delete(f"/v1/default/banks/{test_bank_id}")


@pytest.mark.asyncio
async def test_memories_timeseries_empty_bank_returns_zero_filled_buckets(
api_client, test_bank_id
):
"""A bank with no memories must still return the full zero-filled bucket set."""
try:
# Ensure the bank exists.
response = await api_client.get(f"/v1/default/banks/{test_bank_id}/profile")
assert response.status_code == 200

response = await api_client.get(
f"/v1/default/banks/{test_bank_id}/stats/memories-timeseries",
params={"period": "7d"},
)
assert response.status_code == 200
body = response.json()
assert len(body["buckets"]) == 7
for bucket in body["buckets"]:
assert bucket["world"] == 0
assert bucket["experience"] == 0
assert bucket["observation"] == 0
finally:
await api_client.delete(f"/v1/default/banks/{test_bank_id}")


@pytest.mark.asyncio
async def test_memories_timeseries_reflects_retained_memories(api_client, test_bank_id):
"""Freshly-retained memories must show up in today's bucket counts."""
try:
response = await api_client.post(
f"/v1/default/banks/{test_bank_id}/memories",
json={
"items": [
{"content": "Alice is a software engineer.", "context": "team"},
{"content": "Bob works on infrastructure.", "context": "team"},
]
},
)
assert response.status_code == 200

response = await api_client.get(
f"/v1/default/banks/{test_bank_id}/stats/memories-timeseries",
params={"period": "7d"},
)
assert response.status_code == 200
body = response.json()
totals = sum(b["world"] + b["experience"] + b["observation"] for b in body["buckets"])
assert totals >= 2, "expected at least two memories across all buckets"

# Those memories should land in the most-recent bucket.
latest = body["buckets"][-1]
assert latest["world"] + latest["experience"] + latest["observation"] >= 2
finally:
await api_client.delete(f"/v1/default/banks/{test_bank_id}")
4 changes: 3 additions & 1 deletion hindsight-cli/.openapi-coverage.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
# Operation-level skips
# ---------------------------------------------------------------------------
[skip]
# (empty — every operation is currently wired)
# UI-only endpoint powering the control-plane stats chart.
# Zero-filled bucket arrays don't map to a useful CLI command.
get_memories_timeseries = "UI-only endpoint for the control plane stats chart"

# ---------------------------------------------------------------------------
# Per-operation parameter skips
Expand Down
Loading
Loading