Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions apps/server/src/provider/Layers/AmpAdapter.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import assert from "node:assert/strict";

import {
ApprovalRequestId,
EventId,
RuntimeItemId,
ThreadId,
TurnId,
type ProviderApprovalDecision,
type ProviderRuntimeEvent,
type ProviderSession,
type ProviderTurnStartResult,
type ProviderUserInputAnswers,
} from "@t3tools/contracts";
import { it, vi } from "@effect/vitest";
import { Effect, Layer, Stream } from "effect";

import { AmpServerManager } from "../../ampServerManager.ts";
import { AmpAdapter } from "../Services/AmpAdapter.ts";
import { makeAmpAdapterLive } from "./AmpAdapter.ts";
import { ServerSettingsService } from "../../serverSettings.ts";

const asThreadId = (value: string): ThreadId => ThreadId.makeUnsafe(value);
const asTurnId = (value: string): TurnId => TurnId.makeUnsafe(value);
const asEventId = (value: string): EventId => EventId.makeUnsafe(value);
const asItemId = (value: string): RuntimeItemId => RuntimeItemId.makeUnsafe(value);

class FakeAmpManager extends AmpServerManager {
public startSessionImpl = vi.fn(async (threadId: ThreadId): Promise<ProviderSession> => {
const now = new Date().toISOString();
return {
provider: "amp",
status: "ready",
runtimeMode: "full-access",
threadId,
cwd: process.cwd(),
createdAt: now,
updatedAt: now,
resumeCursor: { sessionId: `session-${threadId}` },
} as unknown as ProviderSession;
});

public sendTurnImpl = vi.fn(
async (threadId: ThreadId): Promise<ProviderTurnStartResult> => ({
threadId,
turnId: asTurnId(`turn-${threadId}`),
}),
);

public interruptTurnImpl = vi.fn(async (): Promise<void> => undefined);
public respondToRequestImpl = vi.fn(async (): Promise<void> => undefined);
public respondToUserInputImpl = vi.fn(async (): Promise<void> => undefined);
public readThreadImpl = vi.fn(async (threadId: ThreadId) => ({ threadId, turns: [] }));
public rollbackThreadImpl = vi.fn(async (threadId: ThreadId) => ({ threadId, turns: [] }));
public stopAllImpl = vi.fn(() => undefined);

override startSession(input: { threadId: ThreadId }): Promise<ProviderSession> {
return this.startSessionImpl(input.threadId);
}

override sendTurn(input: { threadId: ThreadId }): Promise<ProviderTurnStartResult> {
return this.sendTurnImpl(input.threadId);
}

override interruptTurn(_threadId: ThreadId): Promise<void> {
return this.interruptTurnImpl();
}

override respondToRequest(
_threadId: ThreadId,
_requestId: ApprovalRequestId,
_decision: ProviderApprovalDecision,
): Promise<void> {
return this.respondToRequestImpl();
}

override respondToUserInput(
_threadId: ThreadId,
_requestId: ApprovalRequestId,
_answers: ProviderUserInputAnswers,
): Promise<void> {
return this.respondToUserInputImpl();
}

override readThread(threadId: ThreadId) {
return this.readThreadImpl(threadId);
}

override rollbackThread(threadId: ThreadId) {
return this.rollbackThreadImpl(threadId);
}

override stopSession(_threadId: ThreadId): void {}

override listSessions(): ProviderSession[] {
return [];
}

override hasSession(_threadId: ThreadId): boolean {
return false;
}

override stopAll(): void {
this.stopAllImpl();
}
}

const manager = new FakeAmpManager();
const layer = it.layer(
makeAmpAdapterLive({ manager }).pipe(Layer.provideMerge(ServerSettingsService.layerTest())),
);

layer("AmpAdapterLive", (it) => {
it.effect("delegates session startup to the manager", () =>
Effect.gen(function* () {
manager.startSessionImpl.mockClear();
const adapter = yield* AmpAdapter;

const session = yield* adapter.startSession({
threadId: asThreadId("thread-1"),
runtimeMode: "full-access",
});

assert.equal(session.provider, "amp");
assert.equal(manager.startSessionImpl.mock.calls[0]?.[0], asThreadId("thread-1"));
}),
);

it.effect("rejects attachments until AMP attachment wiring exists", () =>
Effect.gen(function* () {
const adapter = yield* AmpAdapter;
const result = yield* adapter
.sendTurn({
threadId: asThreadId("thread-attachments"),
input: "hello",
attachments: [{ id: "attachment-1" }] as never,
})
.pipe(Effect.result);

assert.equal(result._tag, "Failure");
if (result._tag !== "Failure") {
return;
}
assert.equal(result.failure._tag, "ProviderAdapterValidationError");
}),
);

it.effect("forwards manager runtime events through the adapter stream", () =>
Effect.gen(function* () {
const adapter = yield* AmpAdapter;

const event = {
type: "content.delta",
eventId: asEventId("evt-amp-delta"),
provider: "amp",
createdAt: new Date().toISOString(),
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-1"),
itemId: asItemId("item-1"),
payload: {
streamKind: "assistant_text",
delta: "hello",
},
} as unknown as ProviderRuntimeEvent;

// Emit first — the event is buffered in the unbounded queue via the
// listener that was registered during layer construction.
manager.emit("event", event);

// Now consume the head. Since the queue already has an item, this
// resolves immediately without a race condition.
const received = yield* Stream.runHead(adapter.streamEvents);

assert.equal(received._tag, "Some");
if (received._tag !== "Some") {
return;
}
assert.equal(received.value.type, "content.delta");
if (received.value.type !== "content.delta") {
return;
}
assert.equal(received.value.payload.delta, "hello");
}),
);
});
80 changes: 31 additions & 49 deletions apps/server/src/provider/Layers/AmpAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,68 +2,27 @@ import { type ProviderRuntimeEvent } from "@t3tools/contracts";
import { Effect, Layer, Queue, Stream } from "effect";

import { AmpServerManager } from "../../ampServerManager.ts";
import {
ProviderAdapterRequestError,
ProviderAdapterSessionClosedError,
ProviderAdapterSessionNotFoundError,
ProviderAdapterValidationError,
type ProviderAdapterError,
} from "../Errors.ts";
import { ProviderAdapterProcessError, ProviderAdapterValidationError } from "../Errors.ts";
import { getProviderCapabilities } from "../Services/ProviderAdapter.ts";
import { AmpAdapter, type AmpAdapterShape } from "../Services/AmpAdapter.ts";
import { makeErrorHelpers } from "./ProviderAdapterUtils.ts";
import { ServerSettingsService } from "../../serverSettings.ts";

const PROVIDER = "amp" as const;
const { toRequestError } = makeErrorHelpers(PROVIDER);

export interface AmpAdapterLiveOptions {
readonly manager?: AmpServerManager;
readonly makeManager?: () => AmpServerManager;
}

function toMessage(cause: unknown, fallback: string): string {
if (cause instanceof Error && cause.message.length > 0) {
return cause.message;
}
return fallback;
}

function toSessionError(threadId: string, cause: unknown) {
const normalized = toMessage(cause, "").toLowerCase();
if (normalized.includes("unknown amp session") || normalized.includes("unknown session")) {
return new ProviderAdapterSessionNotFoundError({
provider: PROVIDER,
threadId,
cause,
});
}
if (normalized.includes("closed")) {
return new ProviderAdapterSessionClosedError({
provider: PROVIDER,
threadId,
cause,
});
}
return undefined;
}

function toRequestError(threadId: string, method: string, cause: unknown): ProviderAdapterError {
const sessionError = toSessionError(threadId, cause);
if (sessionError) {
return sessionError;
}
return new ProviderAdapterRequestError({
provider: PROVIDER,
method,
detail: toMessage(cause, `${method} failed`),
cause,
});
}

export function makeAmpAdapterLive(options: AmpAdapterLiveOptions = {}) {
return Layer.effect(
AmpAdapter,
Effect.gen(function* () {
const manager = options.manager ?? options.makeManager?.() ?? new AmpServerManager();
const runtimeEventQueue = yield* Queue.unbounded<ProviderRuntimeEvent>();
const serverSettingsService = yield* ServerSettingsService;

yield* Effect.acquireRelease(
Effect.sync(() => {
Expand All @@ -85,9 +44,32 @@ export function makeAmpAdapterLive(options: AmpAdapterLiveOptions = {}) {
provider: PROVIDER,
capabilities: getProviderCapabilities(PROVIDER),
startSession: (input) =>
Effect.tryPromise({
try: () => manager.startSession(input),
catch: (cause) => toRequestError(input.threadId, "session/start", cause),
Effect.gen(function* () {
const providerSettings = yield* serverSettingsService.getSettings.pipe(
Effect.map((s) => s.providers.amp),
Effect.mapError(
(error) =>
new ProviderAdapterProcessError({
provider: PROVIDER,
threadId: input.threadId,
detail: error.message,
cause: error,
}),
),
);
if (!providerSettings.enabled) {
return yield* new ProviderAdapterValidationError({
provider: PROVIDER,
operation: "startSession",
issue: "AMP provider is disabled in server settings.",
});
}
const _binaryPath = providerSettings.binaryPath.trim() || "amp";
void _binaryPath;
return yield* Effect.tryPromise({
try: () => manager.startSession(input),
catch: (cause) => toRequestError(input.threadId, "session/start", cause),
});
Comment on lines +47 to +71
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Configured AMP path is read and then dropped.

Line 67 computes the AMP binary from server settings, but Line 70 still starts the session with the original input. That makes providers.amp.binaryPath ineffective, so users who configure a non-default AMP executable will still launch the default path.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/server/src/provider/Layers/AmpAdapter.ts` around lines 47 - 72, The AMP
binary path from server settings is read into _binaryPath but never used, so
providerSettings.binaryPath is ignored; update the call to manager.startSession
inside the Effect.tryPromise to pass the resolved binary path (use
providerSettings.binaryPath or the trimmed _binaryPath) into the session start
(e.g., include it on the input object or as the explicit parameter expected by
manager.startSession) so manager.startSession receives the configured binary;
locate the _binaryPath/ providerSettings usage and the manager.startSession(...)
call and wire the path through, keeping the existing error handling
(toRequestError) and ProviderAdapter errors intact.

}),
sendTurn: (input) => {
if ((input.attachments?.length ?? 0) > 0) {
Expand Down
69 changes: 12 additions & 57 deletions apps/server/src/provider/Layers/CodexAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ import { Effect, FileSystem, Layer, Queue, Schema, ServiceMap, Stream } from "ef
import {
ProviderAdapterProcessError,
ProviderAdapterRequestError,
ProviderAdapterSessionClosedError,
ProviderAdapterSessionNotFoundError,
ProviderAdapterValidationError,
type ProviderAdapterError,
} from "../Errors.ts";
import { getProviderCapabilities } from "../Services/ProviderAdapter.ts";
import { CodexAdapter, type CodexAdapterShape } from "../Services/CodexAdapter.ts";
Expand All @@ -41,7 +38,14 @@ import { resolveAttachmentPath } from "../../attachmentStore.ts";
import { ServerConfig } from "../../config.ts";
import { ServerSettingsService } from "../../serverSettings.ts";
import { type EventNdjsonLogger, makeEventNdjsonLogger } from "./EventNdjsonLogger.ts";
import { toMessage } from "../toMessage.ts";
import {
asArray,
asNumber,
asObject,
asString,
makeErrorHelpers,
toMessage,
} from "./ProviderAdapterUtils.ts";

const PROVIDER = "codex" as const;

Expand All @@ -55,59 +59,10 @@ export interface CodexAdapterLiveOptions {
readonly nativeEventLogger?: EventNdjsonLogger;
}

function toSessionError(
threadId: ThreadId,
cause: unknown,
): ProviderAdapterSessionNotFoundError | ProviderAdapterSessionClosedError | undefined {
const normalized = toMessage(cause, "").toLowerCase();
if (normalized.includes("unknown session") || normalized.includes("unknown provider session")) {
return new ProviderAdapterSessionNotFoundError({
provider: PROVIDER,
threadId,
cause,
});
}
if (normalized.includes("session is closed")) {
return new ProviderAdapterSessionClosedError({
provider: PROVIDER,
threadId,
cause,
});
}
return undefined;
}

function toRequestError(threadId: ThreadId, method: string, cause: unknown): ProviderAdapterError {
const sessionError = toSessionError(threadId, cause);
if (sessionError) {
return sessionError;
}
return new ProviderAdapterRequestError({
provider: PROVIDER,
method,
detail: toMessage(cause, `${method} failed`),
cause,
});
}

function asObject(value: unknown): Record<string, unknown> | undefined {
if (!value || typeof value !== "object") {
return undefined;
}
return value as Record<string, unknown>;
}

function asString(value: unknown): string | undefined {
return typeof value === "string" ? value : undefined;
}

function asArray(value: unknown): unknown[] | undefined {
return Array.isArray(value) ? value : undefined;
}

function asNumber(value: unknown): number | undefined {
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
const { toRequestError } = makeErrorHelpers(PROVIDER, {
sessionNotFoundHints: ["unknown session", "unknown provider session"],
sessionClosedHint: "session is closed",
});

function normalizeCodexTokenUsage(value: unknown): ThreadTokenUsageSnapshot | undefined {
const usage = asObject(value);
Expand Down
Loading
Loading