Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 26 additions & 73 deletions internal/cli/cmd_agent_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,32 +46,19 @@ func cmdAgentJobStatus(w, wErr io.Writer, gf GlobalFlags, args []string, version
return returnUsageError(w, wErr, gf, usage, version, nil)
}

ctx := &cmdCtx{w: w, wErr: wErr, gf: gf, version: version, cmd: "agent.job.status"}

store, err := newSendJobStore()
if err != nil {
if gf.JSON {
ReturnError(w, "job_store_failed", err.Error(), nil, version)
} else {
Errorf(wErr, "failed to initialize send job store: %v", err)
}
return ExitInternalError
return ctx.errResult(ExitInternalError, "job_store_failed", err.Error(), nil, fmt.Sprintf("failed to initialize send job store: %v", err))
}

job, ok, err := store.get(jobID)
if err != nil {
if gf.JSON {
ReturnError(w, "job_status_failed", err.Error(), map[string]any{"job_id": jobID}, version)
} else {
Errorf(wErr, "failed to read job %s: %v", jobID, err)
}
return ExitInternalError
return ctx.errResult(ExitInternalError, "job_status_failed", err.Error(), map[string]any{"job_id": jobID}, fmt.Sprintf("failed to read job %s: %v", jobID, err))
}
if !ok {
if gf.JSON {
ReturnError(w, "not_found", "job not found", map[string]any{"job_id": jobID}, version)
} else {
Errorf(wErr, "job %s not found", jobID)
}
return ExitNotFound
return ctx.errResult(ExitNotFound, "not_found", "job not found", map[string]any{"job_id": jobID}, fmt.Sprintf("job %s not found", jobID))
}

writeJobStatusResult(w, gf, version, job)
Expand All @@ -89,44 +76,24 @@ func cmdAgentJobCancel(w, wErr io.Writer, gf GlobalFlags, args []string, version
if jobID == "" {
return returnUsageError(w, wErr, gf, usage, version, nil)
}
if handled, code := maybeReplayIdempotentResponse(
w, wErr, gf, version, "agent.job.cancel", *idempotencyKey,
); handled {

ctx := &cmdCtx{w: w, wErr: wErr, gf: gf, version: version, cmd: "agent.job.cancel", idemKey: *idempotencyKey}

if handled, code := ctx.maybeReplay(); handled {
return code
}

store, err := newSendJobStore()
if err != nil {
if gf.JSON {
return returnJSONErrorMaybeIdempotent(
w, wErr, gf, version, "agent.job.cancel", *idempotencyKey,
ExitInternalError, "job_store_failed", err.Error(), nil,
)
}
Errorf(wErr, "failed to initialize send job store: %v", err)
return ExitInternalError
return ctx.errResult(ExitInternalError, "job_store_failed", err.Error(), nil, fmt.Sprintf("failed to initialize send job store: %v", err))
}

job, ok, canceled, err := store.cancel(jobID)
if err != nil {
if gf.JSON {
return returnJSONErrorMaybeIdempotent(
w, wErr, gf, version, "agent.job.cancel", *idempotencyKey,
ExitInternalError, "job_cancel_failed", err.Error(), map[string]any{"job_id": jobID},
)
}
Errorf(wErr, "failed to cancel job %s: %v", jobID, err)
return ExitInternalError
return ctx.errResult(ExitInternalError, "job_cancel_failed", err.Error(), map[string]any{"job_id": jobID}, fmt.Sprintf("failed to cancel job %s: %v", jobID, err))
}
if !ok {
if gf.JSON {
return returnJSONErrorMaybeIdempotent(
w, wErr, gf, version, "agent.job.cancel", *idempotencyKey,
ExitNotFound, "not_found", "job not found", map[string]any{"job_id": jobID},
)
}
Errorf(wErr, "job %s not found", jobID)
return ExitNotFound
return ctx.errResult(ExitNotFound, "not_found", "job not found", map[string]any{"job_id": jobID}, fmt.Sprintf("job %s not found", jobID))
}

result := agentJobCancelResult{
Expand All @@ -135,9 +102,7 @@ func cmdAgentJobCancel(w, wErr io.Writer, gf GlobalFlags, args []string, version
Canceled: canceled,
}
if gf.JSON {
return returnJSONSuccessWithIdempotency(
w, wErr, gf, version, "agent.job.cancel", *idempotencyKey, result,
)
return ctx.successResult(result)
}

PrintHuman(w, func(w io.Writer) {
Expand Down Expand Up @@ -166,34 +131,21 @@ func cmdAgentJobWait(w, wErr io.Writer, gf GlobalFlags, args []string, version s
return returnUsageError(w, wErr, gf, usage, version, nil)
}

ctx := &cmdCtx{w: w, wErr: wErr, gf: gf, version: version, cmd: "agent.job.wait"}

store, err := newSendJobStore()
if err != nil {
if gf.JSON {
ReturnError(w, "job_store_failed", err.Error(), nil, version)
} else {
Errorf(wErr, "failed to initialize send job store: %v", err)
}
return ExitInternalError
return ctx.errResult(ExitInternalError, "job_store_failed", err.Error(), nil, fmt.Sprintf("failed to initialize send job store: %v", err))
}

deadline := time.Now().Add(*timeout)
for {
job, ok, getErr := store.get(jobID)
if getErr != nil {
if gf.JSON {
ReturnError(w, "job_status_failed", getErr.Error(), map[string]any{"job_id": jobID}, version)
} else {
Errorf(wErr, "failed to read job %s: %v", jobID, getErr)
}
return ExitInternalError
return ctx.errResult(ExitInternalError, "job_status_failed", getErr.Error(), map[string]any{"job_id": jobID}, fmt.Sprintf("failed to read job %s: %v", jobID, getErr))
}
if !ok {
if gf.JSON {
ReturnError(w, "not_found", "job not found", map[string]any{"job_id": jobID}, version)
} else {
Errorf(wErr, "job %s not found", jobID)
}
return ExitNotFound
return ctx.errResult(ExitNotFound, "not_found", "job not found", map[string]any{"job_id": jobID}, fmt.Sprintf("job %s not found", jobID))
}
if isTerminalSendJobStatus(job.Status) {
writeJobStatusResult(w, gf, version, job)
Expand All @@ -204,15 +156,16 @@ func cmdAgentJobWait(w, wErr io.Writer, gf GlobalFlags, args []string, version s
}

if time.Now().After(deadline) {
if gf.JSON {
ReturnError(w, "timeout", "timed out waiting for job completion", map[string]any{
return ctx.errResult(
ExitInternalError,
"timeout",
"timed out waiting for job completion",
map[string]any{
"job_id": job.ID,
"status": string(job.Status),
}, version)
} else {
Errorf(wErr, "timed out waiting for job %s completion (status: %s)", job.ID, job.Status)
}
return ExitInternalError
},
fmt.Sprintf("timed out waiting for job %s completion (status: %s)", job.ID, job.Status),
)
}
time.Sleep(*interval)
}
Expand Down
72 changes: 25 additions & 47 deletions internal/cli/cmd_agent_run_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cli

import (
"fmt"
"io"
"log/slog"
"strings"
"time"
Expand All @@ -11,26 +10,22 @@ import (
)

func verifyStartedAgentSession(
w, wErr io.Writer,
gf GlobalFlags,
version, idempotencyKey, sessionName string,
ctx *cmdCtx,
sessionName string,
tmuxOpts tmux.Options,
) int {
state, err := tmuxSessionStateFor(sessionName, tmuxOpts)
if err != nil {
if killErr := tmuxKillSession(sessionName, tmuxOpts); killErr != nil {
slog.Debug("best-effort session kill failed", "session", sessionName, "error", killErr)
}
if gf.JSON {
return returnJSONErrorMaybeIdempotent(
w, wErr, gf, version, "agent.run", idempotencyKey,
ExitInternalError, "session_lookup_failed", err.Error(), map[string]any{
"session_name": sessionName,
},
)
}
Errorf(wErr, "failed to verify session %s: %v", sessionName, err)
return ExitInternalError
return ctx.errResult(
ExitInternalError,
"session_lookup_failed",
err.Error(),
map[string]any{"session_name": sessionName},
fmt.Sprintf("failed to verify session %s: %v", sessionName, err),
)
}
if state.Exists && state.HasLivePane {
return ExitOK
Expand All @@ -40,16 +35,9 @@ func verifyStartedAgentSession(
slog.Debug("best-effort session kill failed", "session", sessionName, "error", err)
}
msg := fmt.Sprintf("assistant session %s exited before startup completed", sessionName)
if gf.JSON {
return returnJSONErrorMaybeIdempotent(
w, wErr, gf, version, "agent.run", idempotencyKey,
ExitInternalError, "session_exited", msg, map[string]any{
"session_name": sessionName,
},
)
}
Errorf(wErr, msg)
return ExitInternalError
return ctx.errResult(ExitInternalError, "session_exited", msg, map[string]any{
"session_name": sessionName,
})
}

var (
Expand All @@ -73,19 +61,15 @@ var (
)

func sendAgentRunPromptIfRequested(
w, wErr io.Writer,
gf GlobalFlags,
version, idempotencyKey, sessionName, assistantName, prompt string,
ctx *cmdCtx,
sessionName, assistantName, prompt string,
tmuxOpts tmux.Options,
beforeSend func(),
) int {
if prompt == "" {
return ExitOK
}

// Wait for the agent TUI to render before sending. Agents like Codex can
// take several seconds to initialize; a fixed short sleep causes the Enter
// keystroke to arrive before the input handler is ready.
waitForPaneOutput(sessionName, assistantName, tmuxOpts)
if beforeSend != nil {
beforeSend()
Expand All @@ -95,42 +79,36 @@ func sendAgentRunPromptIfRequested(
preSendHash := tmux.ContentHash(preSendContent)

if err := tmuxSendKeys(sessionName, prompt, true, tmuxOpts); err != nil {
return handlePromptSendError(w, wErr, gf, version, idempotencyKey, sessionName, tmuxOpts, err, "send")
return handlePromptSendError(ctx, sessionName, tmuxOpts, err, "send")
}

// Codex startup can still occasionally drop the very first prompt even when
// a cursor is visible. If pane output does not change after send, retry once.
if strings.EqualFold(strings.TrimSpace(assistantName), "codex") &&
!waitForPromptDelivery(sessionName, preSendHash, tmuxOpts) {
waitForPaneOutput(sessionName, assistantName, tmuxOpts)
if err := tmuxSendKeys(sessionName, prompt, true, tmuxOpts); err != nil {
return handlePromptSendError(w, wErr, gf, version, idempotencyKey, sessionName, tmuxOpts, err, "retry")
return handlePromptSendError(ctx, sessionName, tmuxOpts, err, "retry")
}
}
return ExitOK
}

func handlePromptSendError(
w, wErr io.Writer,
gf GlobalFlags,
version, idempotencyKey, sessionName string,
ctx *cmdCtx,
sessionName string,
tmuxOpts tmux.Options,
err error,
action string,
) int {
if killErr := tmuxKillSession(sessionName, tmuxOpts); killErr != nil {
slog.Debug("best-effort session kill failed", "session", sessionName, "error", killErr)
}
if gf.JSON {
return returnJSONErrorMaybeIdempotent(
w, wErr, gf, version, "agent.run", idempotencyKey,
ExitInternalError, "prompt_send_failed", err.Error(), map[string]any{
"session_name": sessionName,
},
)
}
Errorf(wErr, "failed to %s initial prompt to %s: %v", action, sessionName, err)
return ExitInternalError
return ctx.errResult(
ExitInternalError,
"prompt_send_failed",
err.Error(),
map[string]any{"session_name": sessionName},
fmt.Sprintf("failed to %s initial prompt to %s: %v", action, sessionName, err),
)
}

// waitForPaneOutput polls the tmux pane until the output stabilizes (stops
Expand Down
41 changes: 41 additions & 0 deletions internal/cli/cmd_agent_run_liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,47 @@ import (
"github.com/andyrewlee/amux/internal/tmux"
)

func TestHandlePromptSendErrorHumanMessageIncludesAction(t *testing.T) {
origKillSession := tmuxKillSession
defer func() {
tmuxKillSession = origKillSession
}()

tmuxKillSession = func(_ string, _ tmux.Options) error { return nil }

tests := []struct {
action string
want string
}{
{action: "send", want: "Error: failed to send initial prompt to session-a: send failed\n"},
{action: "retry", want: "Error: failed to retry initial prompt to session-a: send failed\n"},
}

for _, tt := range tests {
t.Run(tt.action, func(t *testing.T) {
var out, errOut bytes.Buffer
ctx := &cmdCtx{
w: &out,
wErr: &errOut,
gf: GlobalFlags{},
version: "test-v1",
cmd: "agent.run",
}

code := handlePromptSendError(ctx, "session-a", tmux.Options{}, errors.New("send failed"), tt.action)
if code != ExitInternalError {
t.Fatalf("handlePromptSendError() code = %d, want %d", code, ExitInternalError)
}
if out.Len() != 0 {
t.Fatalf("expected no stdout output in text mode, got %q", out.String())
}
if got := errOut.String(); got != tt.want {
t.Fatalf("stderr = %q, want %q", got, tt.want)
}
})
}
}

func TestCmdAgentRunSessionExitsBeforeStartupReturnsInternalErrorAndDoesNotPersistTab(t *testing.T) {
t.Setenv("HOME", t.TempDir())

Expand Down
15 changes: 3 additions & 12 deletions internal/cli/cmd_agent_run_prompt_ready_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,7 @@ func TestSendAgentRunPromptIfRequested_CodexRetriesWhenPromptNotDelivered(t *tes
}

code := sendAgentRunPromptIfRequested(
nil, nil,
GlobalFlags{JSON: true},
"test-v1",
"",
&cmdCtx{gf: GlobalFlags{JSON: true}, version: "test-v1", cmd: "agent.run"},
"session-a",
"codex",
"Reply with READY only.",
Expand Down Expand Up @@ -163,10 +160,7 @@ func TestSendAgentRunPromptIfRequested_NonCodexDoesNotRetry(t *testing.T) {
}

code := sendAgentRunPromptIfRequested(
nil, nil,
GlobalFlags{JSON: true},
"test-v1",
"",
&cmdCtx{gf: GlobalFlags{JSON: true}, version: "test-v1", cmd: "agent.run"},
"session-b",
"claude",
"Reply with READY only.",
Expand Down Expand Up @@ -212,10 +206,7 @@ func TestSendAgentRunPromptIfRequested_BeforeSendHookRunsBeforeSend(t *testing.T
}

code := sendAgentRunPromptIfRequested(
nil, nil,
GlobalFlags{JSON: true},
"test-v1",
"",
&cmdCtx{gf: GlobalFlags{JSON: true}, version: "test-v1", cmd: "agent.run"},
"session-c",
"claude",
"Reply with READY only.",
Expand Down
Loading
Loading