Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
348 changes: 197 additions & 151 deletions packages/opencode/src/acp/agent.ts

Large diffs are not rendered by default.

45 changes: 27 additions & 18 deletions packages/opencode/src/acp/session.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { RequestError, type McpServer } from "@agentclientprotocol/sdk"
import type { ACPSessionState } from "./types"
import { Log } from "@/util/log"
import { withTimeout } from "@/util/timeout"

const SDK_TIMEOUT_MS = 60_000
import type { OpencodeClient } from "@opencode-ai/sdk/v2"

const log = Log.create({ service: "acp-session-manager" })
Expand All @@ -18,15 +21,18 @@ export class ACPSessionManager {
}

async create(cwd: string, mcpServers: McpServer[], model?: ACPSessionState["model"]): Promise<ACPSessionState> {
const session = await this.sdk.session
.create(
{
title: `ACP Session ${crypto.randomUUID()}`,
directory: cwd,
},
{ throwOnError: true },
)
.then((x) => x.data!)
const session = await withTimeout(
this.sdk.session
.create(
{
title: `ACP Session ${crypto.randomUUID()}`,
directory: cwd,
},
{ throwOnError: true },
)
.then((x) => x.data!),
SDK_TIMEOUT_MS
)

const sessionId = session.id
const resolvedModel = model
Expand All @@ -50,15 +56,18 @@ export class ACPSessionManager {
mcpServers: McpServer[],
model?: ACPSessionState["model"],
): Promise<ACPSessionState> {
const session = await this.sdk.session
.get(
{
sessionID: sessionId,
directory: cwd,
},
{ throwOnError: true },
)
.then((x) => x.data!)
const session = await withTimeout(
this.sdk.session
.get(
{
sessionID: sessionId,
directory: cwd,
},
{ throwOnError: true },
)
.then((x) => x.data!),
SDK_TIMEOUT_MS
)

const resolvedModel = model

Expand Down
35 changes: 29 additions & 6 deletions packages/opencode/src/cli/cmd/acp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,20 @@ export const AcpCommand = cmd({
})
const output = new ReadableStream<Uint8Array>({
start(controller) {
process.stdin.on("data", (chunk: Buffer) => {
const onData = (chunk: Buffer) => {
controller.enqueue(new Uint8Array(chunk))
})
process.stdin.on("end", () => controller.close())
process.stdin.on("error", (err) => controller.error(err))
}
const onEnd = () => controller.close()
const onError = (err: Error) => controller.error(err)

process.stdin.on("data", onData)
process.stdin.on("end", onEnd)
process.stdin.on("error", onError)

// Store references for cleanup
;(controller as any)._onData = onData
;(controller as any)._onEnd = onEnd
;(controller as any)._onError = onError
},
})

Expand All @@ -61,8 +70,22 @@ export const AcpCommand = cmd({
log.info("setup connection")
process.stdin.resume()
await new Promise((resolve, reject) => {
process.stdin.on("end", resolve)
process.stdin.on("error", reject)
const onEnd = () => {
cleanup()
resolve(undefined)
}
const onError = (err: Error) => {
cleanup()
reject(err)
}

const cleanup = () => {
process.stdin.removeListener("end", onEnd)
process.stdin.removeListener("error", onError)
}

process.stdin.once("end", onEnd)
process.stdin.once("error", onError)
})
})
},
Expand Down
35 changes: 19 additions & 16 deletions packages/opencode/src/cli/cmd/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import { createOpencodeClient, type OpencodeClient } from "@opencode-ai/sdk/v2"
import { Server } from "../../server/server"
import { Provider } from "../../provider/provider"
import { Agent } from "../../agent/agent"
import { withTimeout } from "../../util/timeout"

const SDK_TIMEOUT_MS = 60_000

const TOOL: Record<string, [string, string]> = {
todowrite: ["Todo", UI.Style.TEXT_WARNING_BOLD],
Expand Down Expand Up @@ -151,7 +154,7 @@ export const RunCommand = cmd({
return false
}

const events = await sdk.event.subscribe()
const events = await withTimeout(sdk.event.subscribe(), SDK_TIMEOUT_MS)
let errorMsg: string | undefined

const eventProcessor = (async () => {
Expand Down Expand Up @@ -219,11 +222,11 @@ export const RunCommand = cmd({
initialValue: "once",
}).catch(() => "reject")
const response = (result.toString().includes("cancel") ? "reject" : result) as "once" | "always" | "reject"
await sdk.permission.respond({
await withTimeout(sdk.permission.respond({
sessionID,
permissionID: permission.id,
response,
})
}), SDK_TIMEOUT_MS)
}
}
})()
Expand Down Expand Up @@ -252,23 +255,23 @@ export const RunCommand = cmd({
})()

if (args.command) {
await sdk.session.command({
await withTimeout(sdk.session.command({
sessionID,
agent: resolvedAgent,
model: args.model,
command: args.command,
arguments: message,
variant: args.variant,
})
}), SDK_TIMEOUT_MS)
} else {
const modelParam = args.model ? Provider.parseModel(args.model) : undefined
await sdk.session.prompt({
await withTimeout(sdk.session.prompt({
sessionID,
agent: resolvedAgent,
model: modelParam,
variant: args.variant,
parts: [...fileParts, { type: "text", text: message }],
})
}), SDK_TIMEOUT_MS)
}

await eventProcessor
Expand All @@ -280,7 +283,7 @@ export const RunCommand = cmd({

const sessionID = await (async () => {
if (args.continue) {
const result = await sdk.session.list()
const result = await withTimeout(sdk.session.list(), SDK_TIMEOUT_MS)
return result.data?.find((s) => !s.parentID)?.id
}
if (args.session) return args.session
Expand All @@ -292,7 +295,7 @@ export const RunCommand = cmd({
: args.title
: undefined

const result = await sdk.session.create(
const result = await withTimeout(sdk.session.create(
title
? {
title,
Expand All @@ -313,7 +316,7 @@ export const RunCommand = cmd({
},
],
},
)
), SDK_TIMEOUT_MS)
return result.data?.id
})()

Expand All @@ -322,9 +325,9 @@ export const RunCommand = cmd({
process.exit(1)
}

const cfgResult = await sdk.config.get()
const cfgResult = await withTimeout(sdk.config.get(), SDK_TIMEOUT_MS)
if (cfgResult.data && (cfgResult.data.share === "auto" || Flag.OPENCODE_AUTO_SHARE || args.share)) {
const shareResult = await sdk.session.share({ sessionID }).catch((error) => {
const shareResult = await withTimeout(sdk.session.share({ sessionID }), SDK_TIMEOUT_MS).catch((error) => {
if (error instanceof Error && error.message.includes("disabled")) {
UI.println(UI.Style.TEXT_DANGER_BOLD + "! " + error.message)
}
Expand Down Expand Up @@ -355,7 +358,7 @@ export const RunCommand = cmd({

const sessionID = await (async () => {
if (args.continue) {
const result = await sdk.session.list()
const result = await withTimeout(sdk.session.list(), SDK_TIMEOUT_MS)
return result.data?.find((s) => !s.parentID)?.id
}
if (args.session) return args.session
Expand All @@ -367,7 +370,7 @@ export const RunCommand = cmd({
: args.title
: undefined

const result = await sdk.session.create(title ? { title } : {})
const result = await withTimeout(sdk.session.create(title ? { title } : {}), SDK_TIMEOUT_MS)
return result.data?.id
})()

Expand All @@ -376,9 +379,9 @@ export const RunCommand = cmd({
process.exit(1)
}

const cfgResult = await sdk.config.get()
const cfgResult = await withTimeout(sdk.config.get(), SDK_TIMEOUT_MS)
if (cfgResult.data && (cfgResult.data.share === "auto" || Flag.OPENCODE_AUTO_SHARE || args.share)) {
const shareResult = await sdk.session.share({ sessionID }).catch((error) => {
const shareResult = await withTimeout(sdk.session.share({ sessionID }), SDK_TIMEOUT_MS).catch((error) => {
if (error instanceof Error && error.message.includes("disabled")) {
UI.println(UI.Style.TEXT_DANGER_BOLD + "! " + error.message)
}
Expand Down
27 changes: 25 additions & 2 deletions packages/opencode/src/cli/cmd/tui/thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,24 @@ export const TuiThreadCommand = cmd({
Object.entries(process.env).filter((entry): entry is [string, string] => entry[1] !== undefined),
),
})

const client = Rpc.client<typeof rpc>(worker)

const handleWorkerCrash = (reason: string) => {
Log.Default.error(`Worker crashed: ${reason}`)
client.rejectAll(new Error(`Worker crashed: ${reason}`))
process.exit(1)
}

worker.onerror = (e) => {
Log.Default.error(e)
handleWorkerCrash(e.message || "unknown error")
}
const client = Rpc.client<typeof rpc>(worker)

;(worker as any).on?.("exit", (code: number) => {
if (code !== 0) {
handleWorkerCrash(`exit code ${code}`)
}
})
process.on("uncaughtException", (e) => {
Log.Default.error(e)
})
Expand All @@ -109,6 +123,15 @@ export const TuiThreadCommand = cmd({
await client.call("reload", undefined)
})

// Handle graceful shutdown on SIGINT (Ctrl+C) and SIGTERM (kill)
const handleShutdown = async () => {
await client.call("shutdown", undefined)
process.exit(0)
}

process.on("SIGINT", handleShutdown)
process.on("SIGTERM", handleShutdown)

const prompt = await iife(async () => {
const piped = !process.stdin.isTTY ? await Bun.stdin.text() : undefined
if (!args.prompt) return piped
Expand Down
45 changes: 37 additions & 8 deletions packages/opencode/src/cli/cmd/tui/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ process.on("uncaughtException", (e) => {
})

// Subscribe to global events and forward them via RPC
GlobalBus.on("event", (event) => {
const globalBusHandler = (event: any) => {
Rpc.emit("global.event", event)
})
}
GlobalBus.on("event", globalBusHandler)

let server: Bun.Server<BunWebSocketData> | undefined

Expand Down Expand Up @@ -64,6 +65,10 @@ const startEventStream = (directory: string) => {
})

;(async () => {
let consecutiveErrors = 0
const MAX_CONSECUTIVE_ERRORS = 10
const ERROR_BACKOFF_MS = 1000

while (!signal.aborted) {
const events = await Promise.resolve(
sdk.event.subscribe(
Expand All @@ -72,23 +77,45 @@ const startEventStream = (directory: string) => {
signal,
},
),
).catch(() => undefined)
).catch((error) => {
consecutiveErrors++
if (consecutiveErrors <= 3 || consecutiveErrors % 10 === 0) {
Log.Default.warn("event subscription failed", {
error: error instanceof Error ? error.message : error,
attempt: consecutiveErrors,
})
}
return undefined
})

if (!events) {
await Bun.sleep(250)
if (consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) {
Log.Default.error("event stream: too many consecutive errors, backing off")
await Bun.sleep(ERROR_BACKOFF_MS * Math.min(consecutiveErrors, 30))
} else {
await Bun.sleep(250)
}
continue
}

for await (const event of events.stream) {
Rpc.emit("event", event as Event)

consecutiveErrors = 0

try {
for await (const event of events.stream) {
Rpc.emit("event", event as Event)
}
} catch (streamError) {
Log.Default.warn("event stream iteration error", {
error: streamError instanceof Error ? streamError.message : streamError,
})
}

if (!signal.aborted) {
await Bun.sleep(250)
}
}
})().catch((error) => {
Log.Default.error("event stream error", {
Log.Default.error("event stream fatal error", {
error: error instanceof Error ? error.message : error,
})
})
Expand Down Expand Up @@ -137,6 +164,8 @@ export const rpc = {
async shutdown() {
Log.Default.info("worker shutting down")
if (eventStream.abort) eventStream.abort.abort()
GlobalBus.off("event", globalBusHandler)
Log.Default.debug("GlobalBus listener unsubscribed")
await Instance.disposeAll()
if (server) server.stop(true)
},
Expand Down
Loading