Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 22 additions & 75 deletions internal/cli/cmd_agent_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,32 +46,19 @@ func cmdAgentJobStatus(w, wErr io.Writer, gf GlobalFlags, args []string, version
return returnUsageError(w, wErr, gf, usage, version, nil)
}

ctx := &cmdCtx{w: w, wErr: wErr, gf: gf, version: version, cmd: "agent.job.status"}

store, err := newSendJobStore()
if err != nil {
if gf.JSON {
ReturnError(w, "job_store_failed", err.Error(), nil, version)
} else {
Errorf(wErr, "failed to initialize send job store: %v", err)
}
return ExitInternalError
return ctx.errResult(ExitInternalError, "job_store_failed", err.Error(), nil)
}

job, ok, err := store.get(jobID)
if err != nil {
if gf.JSON {
ReturnError(w, "job_status_failed", err.Error(), map[string]any{"job_id": jobID}, version)
} else {
Errorf(wErr, "failed to read job %s: %v", jobID, err)
}
return ExitInternalError
return ctx.errResult(ExitInternalError, "job_status_failed", err.Error(), map[string]any{"job_id": jobID})
}
if !ok {
if gf.JSON {
ReturnError(w, "not_found", "job not found", map[string]any{"job_id": jobID}, version)
} else {
Errorf(wErr, "job %s not found", jobID)
}
return ExitNotFound
return ctx.errResult(ExitNotFound, "not_found", "job not found", map[string]any{"job_id": jobID})
}

writeJobStatusResult(w, gf, version, job)
Expand All @@ -89,44 +76,24 @@ func cmdAgentJobCancel(w, wErr io.Writer, gf GlobalFlags, args []string, version
if jobID == "" {
return returnUsageError(w, wErr, gf, usage, version, nil)
}
if handled, code := maybeReplayIdempotentResponse(
w, wErr, gf, version, "agent.job.cancel", *idempotencyKey,
); handled {

ctx := &cmdCtx{w: w, wErr: wErr, gf: gf, version: version, cmd: "agent.job.cancel", idemKey: *idempotencyKey}

if handled, code := ctx.maybeReplay(); handled {
return code
}

store, err := newSendJobStore()
if err != nil {
if gf.JSON {
return returnJSONErrorMaybeIdempotent(
w, wErr, gf, version, "agent.job.cancel", *idempotencyKey,
ExitInternalError, "job_store_failed", err.Error(), nil,
)
}
Errorf(wErr, "failed to initialize send job store: %v", err)
return ExitInternalError
return ctx.errResult(ExitInternalError, "job_store_failed", err.Error(), nil)
}

job, ok, canceled, err := store.cancel(jobID)
if err != nil {
if gf.JSON {
return returnJSONErrorMaybeIdempotent(
w, wErr, gf, version, "agent.job.cancel", *idempotencyKey,
ExitInternalError, "job_cancel_failed", err.Error(), map[string]any{"job_id": jobID},
)
}
Errorf(wErr, "failed to cancel job %s: %v", jobID, err)
return ExitInternalError
return ctx.errResult(ExitInternalError, "job_cancel_failed", err.Error(), map[string]any{"job_id": jobID})
}
if !ok {
if gf.JSON {
return returnJSONErrorMaybeIdempotent(
w, wErr, gf, version, "agent.job.cancel", *idempotencyKey,
ExitNotFound, "not_found", "job not found", map[string]any{"job_id": jobID},
)
}
Errorf(wErr, "job %s not found", jobID)
return ExitNotFound
return ctx.errResult(ExitNotFound, "not_found", "job not found", map[string]any{"job_id": jobID})
}

result := agentJobCancelResult{
Expand All @@ -135,9 +102,7 @@ func cmdAgentJobCancel(w, wErr io.Writer, gf GlobalFlags, args []string, version
Canceled: canceled,
}
if gf.JSON {
return returnJSONSuccessWithIdempotency(
w, wErr, gf, version, "agent.job.cancel", *idempotencyKey, result,
)
return ctx.successResult(result)
}

PrintHuman(w, func(w io.Writer) {
Expand Down Expand Up @@ -166,34 +131,21 @@ func cmdAgentJobWait(w, wErr io.Writer, gf GlobalFlags, args []string, version s
return returnUsageError(w, wErr, gf, usage, version, nil)
}

ctx := &cmdCtx{w: w, wErr: wErr, gf: gf, version: version, cmd: "agent.job.wait"}

store, err := newSendJobStore()
if err != nil {
if gf.JSON {
ReturnError(w, "job_store_failed", err.Error(), nil, version)
} else {
Errorf(wErr, "failed to initialize send job store: %v", err)
}
return ExitInternalError
return ctx.errResult(ExitInternalError, "job_store_failed", err.Error(), nil)
}

deadline := time.Now().Add(*timeout)
for {
job, ok, getErr := store.get(jobID)
if getErr != nil {
if gf.JSON {
ReturnError(w, "job_status_failed", getErr.Error(), map[string]any{"job_id": jobID}, version)
} else {
Errorf(wErr, "failed to read job %s: %v", jobID, getErr)
}
return ExitInternalError
return ctx.errResult(ExitInternalError, "job_status_failed", getErr.Error(), map[string]any{"job_id": jobID})
}
if !ok {
if gf.JSON {
ReturnError(w, "not_found", "job not found", map[string]any{"job_id": jobID}, version)
} else {
Errorf(wErr, "job %s not found", jobID)
}
return ExitNotFound
return ctx.errResult(ExitNotFound, "not_found", "job not found", map[string]any{"job_id": jobID})
}
if isTerminalSendJobStatus(job.Status) {
writeJobStatusResult(w, gf, version, job)
Expand All @@ -204,15 +156,10 @@ func cmdAgentJobWait(w, wErr io.Writer, gf GlobalFlags, args []string, version s
}

if time.Now().After(deadline) {
if gf.JSON {
ReturnError(w, "timeout", "timed out waiting for job completion", map[string]any{
"job_id": job.ID,
"status": string(job.Status),
}, version)
} else {
Errorf(wErr, "timed out waiting for job %s completion (status: %s)", job.ID, job.Status)
}
return ExitInternalError
return ctx.errResult(ExitInternalError, "timeout", "timed out waiting for job completion", map[string]any{
"job_id": job.ID,
"status": string(job.Status),
})
}
time.Sleep(*interval)
}
Expand Down
65 changes: 17 additions & 48 deletions internal/cli/cmd_agent_run_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cli

import (
"fmt"
"io"
"log/slog"
"strings"
"time"
Expand All @@ -11,26 +10,18 @@ import (
)

func verifyStartedAgentSession(
w, wErr io.Writer,
gf GlobalFlags,
version, idempotencyKey, sessionName string,
ctx *cmdCtx,
sessionName string,
tmuxOpts tmux.Options,
) int {
state, err := tmuxSessionStateFor(sessionName, tmuxOpts)
if err != nil {
if killErr := tmuxKillSession(sessionName, tmuxOpts); killErr != nil {
slog.Debug("best-effort session kill failed", "session", sessionName, "error", killErr)
}
if gf.JSON {
return returnJSONErrorMaybeIdempotent(
w, wErr, gf, version, "agent.run", idempotencyKey,
ExitInternalError, "session_lookup_failed", err.Error(), map[string]any{
"session_name": sessionName,
},
)
}
Errorf(wErr, "failed to verify session %s: %v", sessionName, err)
return ExitInternalError
return ctx.errResult(ExitInternalError, "session_lookup_failed", err.Error(), map[string]any{
"session_name": sessionName,
})
}
if state.Exists && state.HasLivePane {
return ExitOK
Expand All @@ -40,16 +31,9 @@ func verifyStartedAgentSession(
slog.Debug("best-effort session kill failed", "session", sessionName, "error", err)
}
msg := fmt.Sprintf("assistant session %s exited before startup completed", sessionName)
if gf.JSON {
return returnJSONErrorMaybeIdempotent(
w, wErr, gf, version, "agent.run", idempotencyKey,
ExitInternalError, "session_exited", msg, map[string]any{
"session_name": sessionName,
},
)
}
Errorf(wErr, msg)
return ExitInternalError
return ctx.errResult(ExitInternalError, "session_exited", msg, map[string]any{
"session_name": sessionName,
})
}

var (
Expand All @@ -73,19 +57,15 @@ var (
)

func sendAgentRunPromptIfRequested(
w, wErr io.Writer,
gf GlobalFlags,
version, idempotencyKey, sessionName, assistantName, prompt string,
ctx *cmdCtx,
sessionName, assistantName, prompt string,
tmuxOpts tmux.Options,
beforeSend func(),
) int {
if prompt == "" {
return ExitOK
}

// Wait for the agent TUI to render before sending. Agents like Codex can
// take several seconds to initialize; a fixed short sleep causes the Enter
// keystroke to arrive before the input handler is ready.
waitForPaneOutput(sessionName, assistantName, tmuxOpts)
if beforeSend != nil {
beforeSend()
Expand All @@ -95,42 +75,31 @@ func sendAgentRunPromptIfRequested(
preSendHash := tmux.ContentHash(preSendContent)

if err := tmuxSendKeys(sessionName, prompt, true, tmuxOpts); err != nil {
return handlePromptSendError(w, wErr, gf, version, idempotencyKey, sessionName, tmuxOpts, err, "send")
return handlePromptSendError(ctx, sessionName, tmuxOpts, err)
}

// Codex startup can still occasionally drop the very first prompt even when
// a cursor is visible. If pane output does not change after send, retry once.
if strings.EqualFold(strings.TrimSpace(assistantName), "codex") &&
!waitForPromptDelivery(sessionName, preSendHash, tmuxOpts) {
waitForPaneOutput(sessionName, assistantName, tmuxOpts)
if err := tmuxSendKeys(sessionName, prompt, true, tmuxOpts); err != nil {
return handlePromptSendError(w, wErr, gf, version, idempotencyKey, sessionName, tmuxOpts, err, "retry")
return handlePromptSendError(ctx, sessionName, tmuxOpts, err)
}
}
return ExitOK
}

func handlePromptSendError(
w, wErr io.Writer,
gf GlobalFlags,
version, idempotencyKey, sessionName string,
ctx *cmdCtx,
sessionName string,
tmuxOpts tmux.Options,
err error,
action string,
) int {
if killErr := tmuxKillSession(sessionName, tmuxOpts); killErr != nil {
slog.Debug("best-effort session kill failed", "session", sessionName, "error", killErr)
}
if gf.JSON {
return returnJSONErrorMaybeIdempotent(
w, wErr, gf, version, "agent.run", idempotencyKey,
ExitInternalError, "prompt_send_failed", err.Error(), map[string]any{
"session_name": sessionName,
},
)
}
Errorf(wErr, "failed to %s initial prompt to %s: %v", action, sessionName, err)
return ExitInternalError
return ctx.errResult(ExitInternalError, "prompt_send_failed", err.Error(), map[string]any{
"session_name": sessionName,
})
}

// waitForPaneOutput polls the tmux pane until the output stabilizes (stops
Expand Down
15 changes: 3 additions & 12 deletions internal/cli/cmd_agent_run_prompt_ready_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,7 @@ func TestSendAgentRunPromptIfRequested_CodexRetriesWhenPromptNotDelivered(t *tes
}

code := sendAgentRunPromptIfRequested(
nil, nil,
GlobalFlags{JSON: true},
"test-v1",
"",
&cmdCtx{gf: GlobalFlags{JSON: true}, version: "test-v1", cmd: "agent.run"},
"session-a",
"codex",
"Reply with READY only.",
Expand Down Expand Up @@ -163,10 +160,7 @@ func TestSendAgentRunPromptIfRequested_NonCodexDoesNotRetry(t *testing.T) {
}

code := sendAgentRunPromptIfRequested(
nil, nil,
GlobalFlags{JSON: true},
"test-v1",
"",
&cmdCtx{gf: GlobalFlags{JSON: true}, version: "test-v1", cmd: "agent.run"},
"session-b",
"claude",
"Reply with READY only.",
Expand Down Expand Up @@ -212,10 +206,7 @@ func TestSendAgentRunPromptIfRequested_BeforeSendHookRunsBeforeSend(t *testing.T
}

code := sendAgentRunPromptIfRequested(
nil, nil,
GlobalFlags{JSON: true},
"test-v1",
"",
&cmdCtx{gf: GlobalFlags{JSON: true}, version: "test-v1", cmd: "agent.run"},
"session-c",
"claude",
"Reply with READY only.",
Expand Down
Loading
Loading