Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions backend/app/gateway/routers/thread_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,27 @@ class RunResponse(BaseModel):
updated_at: str = ""


class ThreadTokenUsageModelBreakdown(BaseModel):
tokens: int = 0
runs: int = 0


class ThreadTokenUsageCallerBreakdown(BaseModel):
lead_agent: int = 0
subagent: int = 0
middleware: int = 0


class ThreadTokenUsageResponse(BaseModel):
thread_id: str
total_tokens: int = 0
total_input_tokens: int = 0
total_output_tokens: int = 0
total_runs: int = 0
by_model: dict[str, ThreadTokenUsageModelBreakdown] = Field(default_factory=dict)
by_caller: ThreadTokenUsageCallerBreakdown = Field(default_factory=ThreadTokenUsageCallerBreakdown)


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -368,10 +389,10 @@ async def list_run_events(
return await event_store.list_events(thread_id, run_id, event_types=types, limit=limit)


@router.get("/{thread_id}/token-usage")
@router.get("/{thread_id}/token-usage", response_model=ThreadTokenUsageResponse)
@require_permission("threads", "read", owner_check=True)
async def thread_token_usage(thread_id: str, request: Request) -> dict:
async def thread_token_usage(thread_id: str, request: Request) -> ThreadTokenUsageResponse:
"""Thread-level token usage aggregation."""
run_store = get_run_store(request)
agg = await run_store.aggregate_tokens_by_thread(thread_id)
return {"thread_id": thread_id, **agg}
return ThreadTokenUsageResponse(thread_id=thread_id, **agg)
55 changes: 55 additions & 0 deletions backend/tests/test_run_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,61 @@ async def test_update_run_completion_preserves_existing_fields(self, tmp_path):
assert row["total_tokens"] == 100
await _cleanup()

@pytest.mark.anyio
async def test_aggregate_tokens_by_thread_counts_completed_runs_only(self, tmp_path):
repo = await _make_repo(tmp_path)
await repo.put("success-run", thread_id="t1", status="running")
await repo.update_run_completion(
"success-run",
status="success",
total_input_tokens=70,
total_output_tokens=30,
total_tokens=100,
lead_agent_tokens=80,
subagent_tokens=15,
middleware_tokens=5,
)
await repo.put("error-run", thread_id="t1", status="running")
await repo.update_run_completion(
"error-run",
status="error",
total_input_tokens=20,
total_output_tokens=30,
total_tokens=50,
lead_agent_tokens=40,
subagent_tokens=10,
)
await repo.put("running-run", thread_id="t1", status="running")
await repo.update_run_completion(
"running-run",
status="running",
total_input_tokens=900,
total_output_tokens=99,
total_tokens=999,
lead_agent_tokens=999,
)
await repo.put("other-thread-run", thread_id="t2", status="running")
await repo.update_run_completion(
"other-thread-run",
status="success",
total_tokens=888,
lead_agent_tokens=888,
)

agg = await repo.aggregate_tokens_by_thread("t1")

assert agg["total_tokens"] == 150
assert agg["total_input_tokens"] == 90
assert agg["total_output_tokens"] == 60
assert agg["total_runs"] == 2
assert agg["by_model"] == {"unknown": {"tokens": 150, "runs": 2}}
assert agg["by_caller"] == {
"lead_agent": 120,
"subagent": 25,
"middleware": 5,
}
await _cleanup()

@pytest.mark.anyio
async def test_list_by_thread_ordered_desc(self, tmp_path):
"""list_by_thread returns newest first."""
Expand Down
55 changes: 55 additions & 0 deletions backend/tests/test_thread_token_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Tests for thread-level token usage aggregation API."""

from __future__ import annotations

from unittest.mock import AsyncMock, MagicMock

from _router_auth_helpers import make_authed_test_app
from fastapi.testclient import TestClient

from app.gateway.routers import thread_runs


def _make_app(run_store: MagicMock):
app = make_authed_test_app()
app.include_router(thread_runs.router)
app.state.run_store = run_store
return app


def test_thread_token_usage_returns_stable_shape():
run_store = MagicMock()
run_store.aggregate_tokens_by_thread = AsyncMock(
return_value={
"total_tokens": 150,
"total_input_tokens": 90,
"total_output_tokens": 60,
"total_runs": 2,
"by_model": {"unknown": {"tokens": 150, "runs": 2}},
"by_caller": {
"lead_agent": 120,
"subagent": 25,
"middleware": 5,
},
},
)
app = _make_app(run_store)

with TestClient(app) as client:
response = client.get("/api/threads/thread-1/token-usage")

assert response.status_code == 200
assert response.json() == {
"thread_id": "thread-1",
"total_tokens": 150,
"total_input_tokens": 90,
"total_output_tokens": 60,
"total_runs": 2,
"by_model": {"unknown": {"tokens": 150, "runs": 2}},
"by_caller": {
"lead_agent": 120,
"subagent": 25,
"middleware": 5,
},
}
run_store.aggregate_tokens_by_thread.assert_awaited_once_with("thread-1")
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import { useI18n } from "@/core/i18n/hooks";
import { useModels } from "@/core/models/hooks";
import { useNotification } from "@/core/notification/hooks";
import { useLocalSettings, useThreadSettings } from "@/core/settings";
import { useThreadStream } from "@/core/threads/hooks";
import { useThreadStream, useThreadTokenUsage } from "@/core/threads/hooks";
import { threadTokenUsageToTokenUsage } from "@/core/threads/token-usage";
import { textOfMessage } from "@/core/threads/utils";
import { env } from "@/env";
import { cn } from "@/lib/utils";
Expand All @@ -42,22 +43,29 @@ export default function AgentChatPage() {

const { agent } = useAgent(agent_name);

const { threadId, setThreadId, isNewThread, setIsNewThread } =
const { threadId, setThreadId, isNewThread, setIsNewThread, isMock } =
useThreadChat();
const [settings, setSettings] = useThreadSettings(threadId);
const [localSettings, setLocalSettings] = useLocalSettings();
const { tokenUsageEnabled } = useModels();
const threadTokenUsage = useThreadTokenUsage(
isNewThread || isMock ? undefined : threadId,
{ enabled: tokenUsageEnabled && !isMock },
);
const backendTokenUsage = threadTokenUsageToTokenUsage(threadTokenUsage.data);

const { showNotification } = useNotification();
const {
thread,
pendingUsageMessages,
sendMessage,
isHistoryLoading,
hasMoreHistory,
loadMoreHistory,
} = useThreadStream({
threadId: isNewThread ? undefined : threadId,
context: { ...settings.context, agent_name: agent_name },
isMock,
onStart: (createdThreadId) => {
setThreadId(createdThreadId);
setIsNewThread(false);
Expand Down Expand Up @@ -141,8 +149,11 @@ export default function AgentChatPage() {
</Button>
</Tooltip>
<TokenUsageIndicator
threadId={isNewThread ? undefined : threadId}
backendUsage={backendTokenUsage}
enabled={tokenUsageEnabled}
messages={thread.messages}
pendingMessages={pendingUsageMessages}
preferences={localSettings.tokenUsage}
onPreferencesChange={(preferences) =>
setLocalSettings("tokenUsage", preferences)
Expand Down
12 changes: 11 additions & 1 deletion frontend/src/app/workspace/chats/[thread_id]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import { useI18n } from "@/core/i18n/hooks";
import { useModels } from "@/core/models/hooks";
import { useNotification } from "@/core/notification/hooks";
import { useLocalSettings, useThreadSettings } from "@/core/settings";
import { useThreadStream } from "@/core/threads/hooks";
import { useThreadStream, useThreadTokenUsage } from "@/core/threads/hooks";
import { threadTokenUsageToTokenUsage } from "@/core/threads/token-usage";
import { textOfMessage } from "@/core/threads/utils";
import { env } from "@/env";
import { cn } from "@/lib/utils";
Expand All @@ -44,6 +45,11 @@ export default function ChatPage() {
const [settings, setSettings] = useThreadSettings(threadId);
const [localSettings, setLocalSettings] = useLocalSettings();
const { tokenUsageEnabled } = useModels();
const threadTokenUsage = useThreadTokenUsage(
isNewThread || isMock ? undefined : threadId,
{ enabled: tokenUsageEnabled && !isMock },
);
const backendTokenUsage = threadTokenUsageToTokenUsage(threadTokenUsage.data);
const mountedRef = useRef(false);
useSpecificChatMode();

Expand All @@ -63,6 +69,7 @@ export default function ChatPage() {

const {
thread,
pendingUsageMessages,
sendMessage,
isUploading,
isHistoryLoading,
Expand Down Expand Up @@ -137,8 +144,11 @@ export default function ChatPage() {
</div>
<div className="flex items-center gap-2">
<TokenUsageIndicator
threadId={isNewThread ? undefined : threadId}
backendUsage={backendTokenUsage}
enabled={tokenUsageEnabled}
messages={thread.messages}
pendingMessages={pendingUsageMessages}
preferences={localSettings.tokenUsage}
onPreferencesChange={(preferences) =>
setLocalSettings("tokenUsage", preferences)
Expand Down
22 changes: 20 additions & 2 deletions frontend/src/components/workspace/token-usage-indicator.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ import {
DropdownMenuTrigger,
} from "@/components/ui/dropdown-menu";
import { useI18n } from "@/core/i18n/hooks";
import { accumulateUsage, formatTokenCount } from "@/core/messages/usage";
import {
formatTokenCount,
selectHeaderTokenUsage,
type TokenUsage,
} from "@/core/messages/usage";
import {
getTokenUsageViewPreset,
tokenUsagePreferencesFromPreset,
Expand All @@ -25,23 +29,37 @@ import {
import { cn } from "@/lib/utils";

interface TokenUsageIndicatorProps {
threadId?: string;
messages: Message[];
pendingMessages?: Message[];
backendUsage?: TokenUsage | null;
enabled?: boolean;
preferences: TokenUsagePreferences;
onPreferencesChange: (preferences: TokenUsagePreferences) => void;
className?: string;
}

export function TokenUsageIndicator({
threadId,
messages,
pendingMessages,
backendUsage,
enabled = false,
preferences,
onPreferencesChange,
className,
}: TokenUsageIndicatorProps) {
const { t } = useI18n();

const usage = useMemo(() => accumulateUsage(messages), [messages]);
const usage = useMemo(
() =>
selectHeaderTokenUsage({
backendUsage: threadId ? backendUsage : null,
messages,
pendingMessages,
}),
[backendUsage, messages, pendingMessages, threadId],
);
const preset = getTokenUsageViewPreset(preferences);

if (!enabled) {
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/core/i18n/locales/en-US.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ export const enUS: Translations = {
unavailable:
"No token usage yet. Usage appears only after a successful model response when the provider returns usage_metadata.",
unavailableShort: "No usage returned",
note: "Shown from provider-returned usage_metadata. Totals are best-effort conversation totals and may differ from provider billing pages.",
note: "Header totals use persisted thread usage when available. Per-turn and debug usage come from visible messages. Totals may differ from provider billing pages.",
presets: {
off: "Off",
summary: "Summary",
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/core/i18n/locales/zh-CN.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ export const zhCN: Translations = {
unavailable:
"暂无 Token 用量。只有模型成功返回且供应商提供 usage_metadata 时才会显示。",
unavailableShort: "未返回用量",
note: "基于供应商返回的 usage_metadata 展示。当前总量是 best-effort 的会话参考值,可能与平台账单页不完全一致。",
note: "顶部总量优先使用后端持久化的线程用量。每轮和调试用量来自当前可见消息,可能与平台账单页不完全一致。",
presets: {
off: "关闭",
summary: "总览",
Expand Down
34 changes: 34 additions & 0 deletions frontend/src/core/messages/usage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,40 @@ export function accumulateUsage(messages: Message[]): TokenUsage | null {
return hasUsage ? cumulative : null;
}

function hasNonZeroUsage(
usage: TokenUsage | null | undefined,
): usage is TokenUsage {
return (
usage !== null &&
usage !== undefined &&
(usage.inputTokens > 0 || usage.outputTokens > 0 || usage.totalTokens > 0)
);
}

function addUsage(base: TokenUsage, delta: TokenUsage): TokenUsage {
return {
inputTokens: base.inputTokens + delta.inputTokens,
outputTokens: base.outputTokens + delta.outputTokens,
totalTokens: base.totalTokens + delta.totalTokens,
};
}

export function selectHeaderTokenUsage({
backendUsage,
messages,
pendingMessages = [],
}: {
backendUsage?: TokenUsage | null;
messages: Message[];
pendingMessages?: Message[];
}): TokenUsage | null {
if (hasNonZeroUsage(backendUsage)) {
const pendingUsage = accumulateUsage(pendingMessages);
return pendingUsage ? addUsage(backendUsage, pendingUsage) : backendUsage;
}
return accumulateUsage(messages);
}

/**
* Format a token count for display: 1234 -> "1,234", 12345 -> "12.3K"
*/
Expand Down
24 changes: 24 additions & 0 deletions frontend/src/core/threads/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { fetch as fetchWithAuth } from "@/core/api/fetcher";
import { getBackendBaseURL } from "@/core/config";

import type { ThreadTokenUsageResponse } from "./types";

export async function fetchThreadTokenUsage(
threadId: string,
): Promise<ThreadTokenUsageResponse | null> {
const response = await fetchWithAuth(
`${getBackendBaseURL()}/api/threads/${encodeURIComponent(threadId)}/token-usage`,
{
method: "GET",
},
);

if (!response.ok) {
if (response.status === 403 || response.status === 404) {
return null;
}
throw new Error("Failed to load thread token usage.");
}

return (await response.json()) as ThreadTokenUsageResponse;
}
Loading
Loading