Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 2 additions & 13 deletions .changeset/live-quota-headers.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,6 @@
"@aliou/pi-synthetic": minor
---

Live quota updates from x-synthetic-quotas response header
Use event-driven Synthetic quota updates without polling.

Quota data is now extracted from the `x-synthetic-quotas` response header
on every Synthetic provider response via the new `after_provider_response`
event, eliminating the need for 60-second polling.

- Provider extension ingests `x-synthetic-quotas` from `after_provider_response`
- Quota data broadcast via `pi.events` (`synthetic:quotas:updated`) to all consumers
- `usage-status`: removed polling timer, reads from events
- `quota-warnings`: reacts to quota events instead of fetching
- `sub-bar-integration`: removed polling timer, reads from events
- `command-quotas`: unchanged (fetches directly, works for any model)
- Added `parseQuotaHeader` for case-insensitive header extraction
- Upgraded `@mariozechner/pi-coding-agent` to 0.67.68
Quota data is now extracted from the `x-synthetic-quotas` response header on Synthetic provider responses and stored centrally. Usage status and quota warnings read the latest quota snapshot through short-lived callbacks from fresh Pi lifecycle contexts, avoiding stale `ExtensionContext` crashes after reloads or session switches.
27 changes: 25 additions & 2 deletions src/extensions/provider/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ import { QuotaStore } from "../../services/quota-store";
import {
parseQuotaHeader,
type QuotasResponse,
SYNTHETIC_QUOTAS_READ_EVENT,
SYNTHETIC_QUOTAS_REQUEST_EVENT,
SYNTHETIC_QUOTAS_UPDATED_EVENT,
type SyntheticQuotasReadPayload,
type SyntheticQuotasRequestPayload,
} from "../../types/quotas";
import { fetchQuotas } from "../../utils/quotas";
import { SYNTHETIC_MODELS } from "./models";
Expand Down Expand Up @@ -95,6 +98,7 @@ export default async function (pi: ExtensionAPI) {
pi.events.emit(SYNTHETIC_QUOTAS_UPDATED_EVENT, {
quotas: snapshot.quotas,
source: snapshot.source,
updatedAt: snapshot.updatedAt,
});
});

Expand All @@ -104,8 +108,27 @@ export default async function (pi: ExtensionAPI) {
if (quotas) quotaStore.ingest(quotas, "header");
});

pi.events.on(SYNTHETIC_QUOTAS_REQUEST_EVENT, async () => {
await quotaStore.refreshFromApi(fetchQuotasFromAuth);
pi.events.on(SYNTHETIC_QUOTAS_REQUEST_EVENT, async (data: unknown) => {
const payload = data as SyntheticQuotasRequestPayload | undefined;
const snapshot = await quotaStore.refreshFromApi(fetchQuotasFromAuth);
if (payload?.respond) {
payload.respond(snapshot);
}
});

pi.events.on(SYNTHETIC_QUOTAS_READ_EVENT, (data: unknown) => {
const { respond } = data as SyntheticQuotasReadPayload;
respond(quotaStore.getSnapshot());
});

pi.on("session_before_switch", () => {
quotaStore.clear();
currentAuthStorage = undefined;
});

pi.on("session_shutdown", () => {
quotaStore.clear();
currentAuthStorage = undefined;
});

pi.on("session_start", async (_event, ctx) => {
Expand Down
92 changes: 57 additions & 35 deletions src/extensions/quota-warnings/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,61 @@ import {
} from "../../config";
import { QuotaWarningNotifier } from "../../services/quota-warnings";
import {
SYNTHETIC_QUOTAS_READ_EVENT,
SYNTHETIC_QUOTAS_REQUEST_EVENT,
SYNTHETIC_QUOTAS_UPDATED_EVENT,
type SyntheticQuotasUpdatedPayload,
type SyntheticQuotasReadPayload,
type SyntheticQuotasRequestPayload,
type SyntheticQuotasSnapshotPayload,
} from "../../types/quotas";

export default async function (pi: ExtensionAPI) {
await configLoader.load();

let enabled = configLoader.getConfig().quotaWarnings;
let currentProvider: string | undefined;
let currentContext: ExtensionContext | undefined;

// Pi-agnostic notifier — all logic is testable without Pi
const notifier = new QuotaWarningNotifier();

function requestQuotas(): void {
pi.events.emit(SYNTHETIC_QUOTAS_REQUEST_EVENT, undefined);
function requestQuotas(
respond?: (snapshot: SyntheticQuotasSnapshotPayload | undefined) => void,
): void {
pi.events.emit(SYNTHETIC_QUOTAS_REQUEST_EVENT, {
respond,
} satisfies SyntheticQuotasRequestPayload);
}

// Receive quota updates from the provider extension and evaluate warnings
pi.events.on(SYNTHETIC_QUOTAS_UPDATED_EVENT, (data: unknown) => {
if (!enabled || currentProvider !== "synthetic" || !currentContext) return;
const { quotas, source } = data as SyntheticQuotasUpdatedPayload;
notifier.evaluate(quotas, source === "header", (message, level) => {
if (currentContext) currentContext.ui.notify(message, level);
function readQuotas(
respond: (snapshot: SyntheticQuotasSnapshotPayload | undefined) => void,
): void {
pi.events.emit(SYNTHETIC_QUOTAS_READ_EVENT, {
respond,
} satisfies SyntheticQuotasReadPayload);
}

function evaluateFromStoreOrRefresh(ctx: ExtensionContext): void {
if (!enabled || ctx.model?.provider !== "synthetic") return;
readQuotas((snapshot) => {
if (snapshot) {
notifier.evaluate(
snapshot.quotas,
snapshot.source === "header",
(message, level) => {
ctx.ui.notify(message, level);
},
);
} else {
requestQuotas((refreshed) => {
if (!refreshed) return;
notifier.evaluate(
refreshed.quotas,
refreshed.source === "header",
(message, level) => {
ctx.ui.notify(message, level);
},
);
});
}
});
});
}

pi.events.on(SYNTHETIC_CONFIG_UPDATED_EVENT, (data: unknown) => {
enabled = (data as SyntheticConfigUpdatedPayload).config.quotaWarnings;
Expand All @@ -47,39 +75,33 @@ export default async function (pi: ExtensionAPI) {
return;
}

if (currentContext && currentProvider === "synthetic") {
notifier.clearAlertState();
requestQuotas();
}
notifier.clearAlertState();
// In config updates we don't have ctx, so we just clear. The next lifecycle event will refresh.
});

pi.on("session_start", async (_event, ctx) => {
currentContext = ctx;
currentProvider = ctx.model?.provider;
if (!enabled || ctx.model?.provider !== "synthetic") return;
pi.on("session_start", (_event, ctx) => {
notifier.clearAlertState();
// Provider fetches on session_start; warnings fire when the event arrives.
evaluateFromStoreOrRefresh(ctx);
});

pi.on("model_select", (_event, ctx) => {
currentContext = ctx;
currentProvider = ctx.model?.provider;
if (!enabled || ctx.model?.provider !== "synthetic") {
notifier.clearAlertState();
return;
}
notifier.clearAlertState();
requestQuotas();
evaluateFromStoreOrRefresh(ctx);
});

pi.on("agent_end", (_event, ctx) => {
evaluateFromStoreOrRefresh(ctx);
});

pi.on("turn_end", (_event, ctx) => {
evaluateFromStoreOrRefresh(ctx);
});

pi.on("session_before_switch", (_event, ctx) => {
currentContext = ctx;
currentProvider = ctx.model?.provider;
pi.on("session_before_switch", () => {
notifier.clearAlertState();
});

pi.on("session_shutdown", () => {
currentContext = undefined;
currentProvider = undefined;
notifier.clearAlertState();
});

Expand Down
132 changes: 62 additions & 70 deletions src/extensions/usage-status/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import {
} from "../../config";
import {
type QuotasResponse,
SYNTHETIC_QUOTAS_READ_EVENT,
SYNTHETIC_QUOTAS_REQUEST_EVENT,
SYNTHETIC_QUOTAS_UPDATED_EVENT,
type SyntheticQuotasUpdatedPayload,
type SyntheticQuotasReadPayload,
type SyntheticQuotasRequestPayload,
type SyntheticQuotasSnapshotPayload,
} from "../../types/quotas";
import { formatResetTime } from "../../utils/quotas";
import {
Expand Down Expand Up @@ -80,100 +82,90 @@ export default async function (pi: ExtensionAPI) {
await configLoader.load();

let enabled = configLoader.getConfig().usageStatus;
let currentContext: ExtensionContext | undefined;
let currentProvider: string | undefined;
let lastSnapshot: WindowStatus[] | undefined;

function renderFromSnapshot(ctx: ExtensionContext): void {
function requestQuotas(
respond: (snapshot: SyntheticQuotasSnapshotPayload | undefined) => void,
): void {
pi.events.emit(SYNTHETIC_QUOTAS_REQUEST_EVENT, {
respond,
} satisfies SyntheticQuotasRequestPayload);
}

function readQuotas(
respond: (snapshot: SyntheticQuotasSnapshotPayload | undefined) => void,
): void {
pi.events.emit(SYNTHETIC_QUOTAS_READ_EVENT, {
respond,
} satisfies SyntheticQuotasReadPayload);
}

function renderSnapshot(
ctx: ExtensionContext,
snapshot: SyntheticQuotasSnapshotPayload | undefined,
): void {
if (!ctx.hasUI) return;
if (!lastSnapshot || lastSnapshot.length === 0) {
if (!snapshot) {
ctx.ui.setStatus(
EXTENSION_ID,
ctx.ui.theme.fg("dim", "loading usage..."),
);
return;
}

const windows = parseSnapshot(snapshot.quotas);
if (windows.length === 0) {
ctx.ui.setStatus(EXTENSION_ID, undefined);
return;
}
ctx.ui.setStatus(EXTENSION_ID, formatStatus(ctx, lastSnapshot));
}

function requestQuotas(): void {
pi.events.emit(SYNTHETIC_QUOTAS_REQUEST_EVENT, undefined);
ctx.ui.setStatus(EXTENSION_ID, formatStatus(ctx, windows));
}

function setLoadingStatus(ctx: ExtensionContext): void {
function clearStatus(ctx: ExtensionContext): void {
if (!ctx.hasUI) return;
ctx.ui.setStatus(EXTENSION_ID, ctx.ui.theme.fg("dim", "loading usage..."));
ctx.ui.setStatus(EXTENSION_ID, undefined);
}

function clearStatus(ctx?: ExtensionContext): void {
lastSnapshot = undefined;
ctx?.ui.setStatus(EXTENSION_ID, undefined);
function renderFromStoreOrRefresh(ctx: ExtensionContext): void {
if (!enabled || ctx.model?.provider !== "synthetic") {
clearStatus(ctx);
return;
}
readQuotas((snapshot) => {
if (snapshot) {
renderSnapshot(ctx, snapshot);
} else {
renderSnapshot(ctx, undefined); // show loading
requestQuotas((refreshed) => renderSnapshot(ctx, refreshed));
}
});
}

// Receive quota updates from the provider extension
pi.events.on(SYNTHETIC_QUOTAS_UPDATED_EVENT, (data: unknown) => {
if (!enabled || currentProvider !== "synthetic") return;
const { quotas } = data as SyntheticQuotasUpdatedPayload;
lastSnapshot = parseSnapshot(quotas);
if (currentContext) renderFromSnapshot(currentContext);
});

pi.events.on(SYNTHETIC_CONFIG_UPDATED_EVENT, (data: unknown) => {
enabled = (data as SyntheticConfigUpdatedPayload).config.usageStatus;
if (!enabled) {
clearStatus(currentContext);
} else if (currentContext && currentProvider === "synthetic") {
if (lastSnapshot) {
renderFromSnapshot(currentContext);
} else {
setLoadingStatus(currentContext);
requestQuotas();
}
}
});

pi.on("session_start", async (_event, ctx) => {
currentContext = ctx;
currentProvider = ctx.model?.provider;
if (!enabled || ctx.model?.provider !== "synthetic") return;
// The provider extension fetches quotas on session_start and emits the
// result via synthetic:quotas:updated. Just show loading and wait.
if (lastSnapshot) {
renderFromSnapshot(ctx);
} else {
setLoadingStatus(ctx);
}
pi.on("session_start", (_event, ctx) => {
renderFromStoreOrRefresh(ctx);
});

pi.on("model_select", (_event, ctx) => {
currentContext = ctx;
currentProvider = ctx.model?.provider;
if (!enabled || ctx.model?.provider !== "synthetic") {
clearStatus(ctx);
return;
}
if (lastSnapshot) {
renderFromSnapshot(ctx);
} else {
setLoadingStatus(ctx);
requestQuotas();
}
renderFromStoreOrRefresh(ctx);
});

pi.on("agent_end", (_event, ctx) => {
renderFromStoreOrRefresh(ctx);
});

pi.on("turn_end", (_event, ctx) => {
renderFromStoreOrRefresh(ctx);
});

pi.on("session_before_switch", (_event, ctx) => {
currentContext = ctx;
currentProvider = ctx.model?.provider;
if (enabled && ctx.model?.provider === "synthetic") {
if (lastSnapshot) {
renderFromSnapshot(ctx);
} else {
setLoadingStatus(ctx);
}
} else {
clearStatus(ctx);
}
clearStatus(ctx);
});

pi.on("session_shutdown", (_event, ctx) => {
currentContext = undefined;
currentProvider = undefined;
clearStatus(ctx);
});

Expand Down
Loading