Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions examples/openclaw-plugin/context-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ export function createMemoryOpenVikingContextEngine(params: {
cfg: Required<MemoryOpenVikingConfig>;
logger: Logger;
getClient: () => Promise<OpenVikingClient>;
quickPrecheck?: () => Promise<{ ok: true } | { ok: false; reason: string }>;
/** Extra args help match hook-populated routing when OpenClaw provides sessionKey / OV session id. */
resolveAgentId: (sessionId: string, sessionKey?: string, ovSessionId?: string) => string;
rememberSessionAgentId?: (ctx: {
Expand All @@ -476,6 +477,7 @@ export function createMemoryOpenVikingContextEngine(params: {
cfg,
logger,
getClient,
quickPrecheck,
resolveAgentId,
rememberSessionAgentId,
} = params;
Expand Down Expand Up @@ -543,6 +545,30 @@ export function createMemoryOpenVikingContextEngine(params: {
return typeof agentId === "string" && agentId.trim() ? agentId.trim() : undefined;
}

async function runLocalPrecheck(
stage: "assemble" | "afterTurn",
sessionId: string,
extra: Record<string, unknown> = {},
): Promise<boolean> {
if (cfg.mode !== "local" || !quickPrecheck) {
return true;
}
const result = await quickPrecheck();
if (result.ok) {
return true;
}
warnOrInfo(
logger,
`openviking: ${stage} precheck failed for session=${sessionId}: ${result.reason}`,
);
diag(`${stage}_skip`, sessionId, {
reason: "precheck_failed",
precheckReason: result.reason,
...extra,
});
return false;
}

return {
info: {
id,
Expand Down Expand Up @@ -586,6 +612,11 @@ export function createMemoryOpenVikingContextEngine(params: {
});

try {
if (!(await runLocalPrecheck("assemble", OVSessionId, {
tokenBudget,
}))) {
return { messages, estimatedTokens: roughEstimate(messages) };
}
const client = await getClient();
const routingRef =
assembleParams.sessionId ?? sessionKey ?? OVSessionId;
Expand Down Expand Up @@ -767,6 +798,13 @@ export function createMemoryOpenVikingContextEngine(params: {
messages: newMsgFull,
});

if (!(await runLocalPrecheck("afterTurn", OVSessionId, {
totalMessages: messages.length,
newMessageCount: newCount,
prePromptMessageCount: start,
}))) {
return;
}
const client = await getClient();
const turnText = newTexts.join("\n");
const sanitized = turnText.replace(/<relevant-memories>[\s\S]*?<\/relevant-memories>/gi, " ").replace(/\s+/g, " ").trim();
Expand Down
14 changes: 11 additions & 3 deletions examples/openclaw-plugin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
} from "./memory-ranking.js";
import {
IS_WIN,
waitForHealth,
waitForHealthOrExit,
quickHealthCheck,
quickRecallPrecheck,
withTimeout,
Expand Down Expand Up @@ -343,6 +343,10 @@ const contextEnginePlugin = {
entry.resolve = resolve;
entry.reject = reject;
});
// Service startup can reject this shared promise before any hook/tool
// awaits it. Attach a sink now so expected local-startup failures do
// not surface as process-level unhandled rejections.
void entry.promise.catch(() => {});
clientPromise = entry.promise;
localClientPendingPromises.set(localCacheKey, entry);
}
Expand Down Expand Up @@ -990,6 +994,10 @@ const contextEnginePlugin = {
cfg,
logger: api.logger,
getClient,
quickPrecheck:
cfg.mode === "local"
? () => quickRecallPrecheck(cfg.mode, cfg.baseUrl, cfg.port, localProcess)
: undefined,
resolveAgentId,
rememberSessionAgentId,
});
Expand Down Expand Up @@ -1094,7 +1102,7 @@ const contextEnginePlugin = {
api.logger.warn(`openviking: subprocess exited (code=${code}, signal=${signal})${out}`);
});
try {
await waitForHealth(baseUrl, timeoutMs, intervalMs);
await waitForHealthOrExit(baseUrl, timeoutMs, intervalMs, child);
const client = new OpenVikingClient(
baseUrl,
cfg.apiKey,
Expand Down Expand Up @@ -1171,7 +1179,7 @@ const contextEnginePlugin = {
api.logger.warn(`openviking: re-spawned subprocess exited (code=${code}, signal=${signal})`);
});
try {
await waitForHealth(baseUrl, timeoutMs, intervalMs);
await waitForHealthOrExit(baseUrl, timeoutMs, intervalMs, child);
const client = new OpenVikingClient(baseUrl, cfg.apiKey, cfg.agentId, cfg.timeoutMs);
localClientCache.set(localCacheKey, { client, process: child });
if (resolveLocalClient) {
Expand Down
62 changes: 62 additions & 0 deletions examples/openclaw-plugin/process-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,68 @@ export function waitForHealth(baseUrl: string, timeoutMs: number, intervalMs: nu
});
}

export function waitForHealthOrExit(
baseUrl: string,
timeoutMs: number,
intervalMs: number,
child: ReturnType<typeof spawn>,
): Promise<void> {
const exited =
child.killed || child.exitCode !== null || child.signalCode !== null;
if (exited) {
return Promise.reject(
new Error(
`OpenViking subprocess exited before health check ` +
`(code=${child.exitCode}, signal=${child.signalCode})`,
),
);
}

return new Promise((resolve, reject) => {
let settled = false;

const cleanup = () => {
child.off?.("error", onError);
child.off?.("exit", onExit);
};

const finishResolve = () => {
if (settled) {
return;
}
settled = true;
cleanup();
resolve();
};

const finishReject = (err: unknown) => {
if (settled) {
return;
}
settled = true;
cleanup();
reject(err instanceof Error ? err : new Error(String(err)));
};

const onError = (err: Error) => {
finishReject(err);
};

const onExit = (code: number | null, signal: string | null) => {
finishReject(
new Error(
`OpenViking subprocess exited before health check ` +
`(code=${code}, signal=${signal})`,
),
);
};

child.once("error", onError);
child.once("exit", onExit);
waitForHealth(baseUrl, timeoutMs, intervalMs).then(finishResolve, finishReject);
});
}

export function withTimeout<T>(promise: Promise<T>, timeoutMs: number, timeoutMessage: string): Promise<T> {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => reject(new Error(timeoutMessage)), timeoutMs);
Expand Down
32 changes: 32 additions & 0 deletions examples/openclaw-plugin/tests/ut/context-engine-afterTurn.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ function makeEngine(opts?: {
commitTokenThreshold?: number;
getSession?: Record<string, unknown>;
addSessionMessageError?: Error;
cfgOverrides?: Record<string, unknown>;
quickPrecheck?: () => Promise<{ ok: true } | { ok: false; reason: string }>;
}) {
const cfg = memoryOpenVikingConfigSchema.parse({
mode: "remote",
Expand All @@ -26,6 +28,7 @@ function makeEngine(opts?: {
ingestReplyAssist: false,
commitTokenThreshold: opts?.commitTokenThreshold ?? 20000,
emitStandardDiagnostics: true,
...(opts?.cfgOverrides ?? {}),
});
const logger = makeLogger();

Expand Down Expand Up @@ -63,6 +66,7 @@ function makeEngine(opts?: {
cfg,
logger,
getClient,
quickPrecheck: opts?.quickPrecheck,
resolveAgentId,
});

Expand Down Expand Up @@ -108,6 +112,34 @@ describe("context-engine afterTurn()", () => {
);
});

it("skips immediately when local precheck reports OpenViking unavailable", async () => {
const quickPrecheck = vi.fn().mockResolvedValue({
ok: false as const,
reason: "local process is not running",
});
const { engine, client, getClient, logger } = makeEngine({
cfgOverrides: {
mode: "local",
port: 1933,
},
quickPrecheck,
});

await engine.afterTurn!({
sessionId: "s1",
sessionFile: "",
messages: [{ role: "user", content: "hello" }],
prePromptMessageCount: 0,
});

expect(quickPrecheck).toHaveBeenCalledTimes(1);
expect(getClient).not.toHaveBeenCalled();
expect(client.addSessionMessage).not.toHaveBeenCalled();
expect(logger.warn).toHaveBeenCalledWith(
expect.stringContaining("afterTurn precheck failed"),
);
});

it("skips when no new user/assistant messages after prePromptMessageCount", async () => {
const { engine, client, logger } = makeEngine();

Expand Down
58 changes: 56 additions & 2 deletions examples/openclaw-plugin/tests/ut/context-engine-assemble.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,34 @@ function makeStats() {
};
}

function makeEngine(contextResult: unknown) {
function makeEngine(
contextResult: unknown,
opts?: {
cfgOverrides?: Record<string, unknown>;
quickPrecheck?: () => Promise<{ ok: true } | { ok: false; reason: string }>;
},
) {
const logger = makeLogger();
const client = {
getSessionContext: vi.fn().mockResolvedValue(contextResult),
} as unknown as OpenVikingClient;
const getClient = vi.fn().mockResolvedValue(client);
const resolveAgentId = vi.fn((sessionId: string) => `agent:${sessionId}`);
const localCfg = opts?.cfgOverrides
? memoryOpenVikingConfigSchema.parse({
...cfg,
...opts.cfgOverrides,
})
: cfg;

const engine = createMemoryOpenVikingContextEngine({
id: "openviking",
name: "Context Engine (OpenViking)",
version: "test",
cfg,
cfg: localCfg,
logger,
getClient,
quickPrecheck: opts?.quickPrecheck,
resolveAgentId,
});

Expand Down Expand Up @@ -142,6 +155,47 @@ describe("context-engine assemble()", () => {
});
});

it("falls back immediately when local precheck reports OpenViking unavailable", async () => {
const quickPrecheck = vi.fn().mockResolvedValue({
ok: false as const,
reason: "local process is not running",
});
const { engine, client, getClient, logger } = makeEngine(
{
latest_archive_overview: "unused",
pre_archive_abstracts: [],
messages: [],
estimatedTokens: 123,
stats: makeStats(),
},
{
cfgOverrides: {
mode: "local",
port: 1933,
},
quickPrecheck,
},
);

const liveMessages = [{ role: "user", content: "fallback live message" }];
const result = await engine.assemble({
sessionId: "session-local",
messages: liveMessages,
tokenBudget: 4096,
});

expect(quickPrecheck).toHaveBeenCalledTimes(1);
expect(getClient).not.toHaveBeenCalled();
expect(client.getSessionContext).not.toHaveBeenCalled();
expect(result).toEqual({
messages: liveMessages,
estimatedTokens: roughEstimate(liveMessages),
});
expect(logger.warn).toHaveBeenCalledWith(
expect.stringContaining("assemble precheck failed"),
);
});

it("emits a non-error toolResult for a running tool (not a synthetic error)", async () => {
const { engine } = makeEngine({
latest_archive_overview: "",
Expand Down
Loading
Loading