Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 9 additions & 34 deletions elixir/lib/symphony_elixir/agent_runner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ defmodule SymphonyElixir.AgentRunner do

@spec run(map(), pid() | nil, keyword()) :: :ok | no_return()
def run(issue, codex_update_recipient \\ nil, opts \\ []) do
worker_hosts =
candidate_worker_hosts(Keyword.get(opts, :worker_host), Config.settings!().worker.ssh_hosts)
# The orchestrator owns host retries so one worker lifetime never hops machines.
worker_host = selected_worker_host(Keyword.get(opts, :worker_host), Config.settings!().worker.ssh_hosts)

Logger.info("Starting agent run for #{issue_context(issue)} worker_hosts=#{inspect(worker_hosts_for_log(worker_hosts))}")
Logger.info("Starting agent run for #{issue_context(issue)} worker_host=#{worker_host_for_log(worker_host)}")

case run_on_worker_hosts(issue, codex_update_recipient, opts, worker_hosts) do
case run_on_worker_host(issue, codex_update_recipient, opts, worker_host) do
:ok ->
:ok

Expand All @@ -26,22 +26,6 @@ defmodule SymphonyElixir.AgentRunner do
end
end

defp run_on_worker_hosts(issue, codex_update_recipient, opts, [worker_host | rest]) do
case run_on_worker_host(issue, codex_update_recipient, opts, worker_host) do
:ok ->
:ok

{:error, reason} when rest != [] ->
Logger.warning("Agent run failed for #{issue_context(issue)} worker_host=#{worker_host_for_log(worker_host)} reason=#{inspect(reason)}; trying next worker host")
run_on_worker_hosts(issue, codex_update_recipient, opts, rest)

{:error, reason} ->
{:error, reason}
end
end

defp run_on_worker_hosts(_issue, _codex_update_recipient, _opts, []), do: {:error, :no_worker_hosts_available}

defp run_on_worker_host(issue, codex_update_recipient, opts, worker_host) do
Logger.info("Starting worker attempt for #{issue_context(issue)} worker_host=#{worker_host_for_log(worker_host)}")

Expand Down Expand Up @@ -188,31 +172,22 @@ defmodule SymphonyElixir.AgentRunner do

defp active_issue_state?(_state_name), do: false

defp candidate_worker_hosts(nil, []), do: [nil]
defp selected_worker_host(nil, []), do: nil

defp candidate_worker_hosts(preferred_host, configured_hosts) when is_list(configured_hosts) do
defp selected_worker_host(preferred_host, configured_hosts) when is_list(configured_hosts) do
hosts =
configured_hosts
|> Enum.map(&String.trim/1)
|> Enum.reject(&(&1 == ""))
|> Enum.uniq()

case preferred_host do
host when is_binary(host) and host != "" ->
[host | Enum.reject(hosts, &(&1 == host))]

_ when hosts == [] ->
[nil]

_ ->
hosts
host when is_binary(host) and host != "" -> host
_ when hosts == [] -> nil
_ -> List.first(hosts)
end
end

defp worker_hosts_for_log(worker_hosts) do
Enum.map(worker_hosts, &worker_host_for_log/1)
end

defp worker_host_for_log(nil), do: "local"
defp worker_host_for_log(worker_host), do: worker_host

Expand Down
70 changes: 70 additions & 0 deletions elixir/test/symphony_elixir/core_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,76 @@ defmodule SymphonyElixir.CoreTest do
end
end

test "agent runner surfaces ssh startup failures instead of silently hopping hosts" do
test_root =
Path.join(
System.tmp_dir!(),
"symphony-elixir-agent-runner-single-host-#{System.unique_integer([:positive])}"
)

previous_path = System.get_env("PATH")
previous_trace = System.get_env("SYMP_TEST_SSH_TRACE")

on_exit(fn ->
restore_env("PATH", previous_path)
restore_env("SYMP_TEST_SSH_TRACE", previous_trace)
end)

try do
trace_file = Path.join(test_root, "ssh.trace")
fake_ssh = Path.join(test_root, "ssh")

File.mkdir_p!(test_root)
System.put_env("SYMP_TEST_SSH_TRACE", trace_file)
System.put_env("PATH", test_root <> ":" <> (previous_path || ""))

File.write!(fake_ssh, """
#!/bin/sh
trace_file="${SYMP_TEST_SSH_TRACE:-/tmp/symphony-fake-ssh.trace}"
printf 'ARGV:%s\\n' "$*" >> "$trace_file"

case "$*" in
*worker-a*"__SYMPHONY_WORKSPACE__"*)
printf '%s\\n' 'worker-a prepare failed' >&2
exit 75
;;
*worker-b*"__SYMPHONY_WORKSPACE__"*)
printf '%s\\t%s\\t%s\\n' '__SYMPHONY_WORKSPACE__' '1' '/remote/home/.symphony-remote-workspaces/MT-SSH-FAILOVER'
exit 0
;;
*)
exit 0
;;
esac
""")

File.chmod!(fake_ssh, 0o755)

write_workflow_file!(Workflow.workflow_file_path(),
workspace_root: "~/.symphony-remote-workspaces",
worker_ssh_hosts: ["worker-a", "worker-b"]
)

issue = %Issue{
id: "issue-ssh-failover",
identifier: "MT-SSH-FAILOVER",
title: "Do not fail over within a single worker run",
description: "Surface the startup failure to the orchestrator",
state: "In Progress"
}

assert_raise RuntimeError, ~r/workspace_prepare_failed/, fn ->
AgentRunner.run(issue, nil, worker_host: "worker-a")
end

trace = File.read!(trace_file)
assert trace =~ "worker-a bash -lc"
refute trace =~ "worker-b bash -lc"
after
File.rm_rf(test_root)
end
end

test "agent runner continues with a follow-up turn while the issue remains active" do
test_root =
Path.join(
Expand Down
Loading