diff --git a/lib/elsa/consumer/worker.ex b/lib/elsa/consumer/worker.ex index 504b986..8890e58 100644 --- a/lib/elsa/consumer/worker.ex +++ b/lib/elsa/consumer/worker.ex @@ -6,7 +6,7 @@ defmodule Elsa.Consumer.Worker do passed in from the manager before calling the ack function to notify the cluster the messages have been successfully processed. """ - use GenServer, restart: :temporary, shutdown: 10_000 + use GenServer require Logger import Elsa.Supervisor, only: [registry: 1] @@ -55,6 +55,28 @@ defmodule Elsa.Consumer.Worker do GenServer.start_link(__MODULE__, init_args) end + def child_spec(arg) do + {worker_type, init_arg} = Keyword.pop!(arg, :worker_type) + + # Group workers are managed via the `Elsa.Group.Supervisor` supervision tree + # processes. If a process needs restarting, it gets handled there. + # + # Non-group consumers are on their own though and need their OTP supervisor + # to restart them if they crash. + restart = + case worker_type do + :group -> :temporary + :non_group -> :transient + end + + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [init_arg]}, + restart: restart, + shutdown: 10_000 + } + end + def init(init_args) do Process.flag(:trap_exit, true) diff --git a/lib/elsa/group/manager/worker_manager.ex b/lib/elsa/group/manager/worker_manager.ex index 01e7b85..222c63f 100644 --- a/lib/elsa/group/manager/worker_manager.ex +++ b/lib/elsa/group/manager/worker_manager.ex @@ -75,6 +75,7 @@ defmodule Elsa.Group.Manager.WorkerManager do assignment = Enum.into(brod_received_assignment(assignment), %{}) init_args = [ + worker_type: :group, topic: assignment.topic, partition: assignment.partition, generation_id: generation_id, diff --git a/lib/elsa/supervisor.ex b/lib/elsa/supervisor.ex index ffbc690..45de2ae 100644 --- a/lib/elsa/supervisor.ex +++ b/lib/elsa/supervisor.ex @@ -196,6 +196,7 @@ defmodule Elsa.Supervisor do consumer_args = args + |> Keyword.put(:worker_type, :non_group) |> Keyword.put(:registry, registry) |> Keyword.put(:connection, connection) |> Keyword.put(:topics, topics) diff --git a/test/integration/elsa/consumer_test.exs b/test/integration/elsa/consumer_test.exs index e77a9aa..83464bc 100644 --- a/test/integration/elsa/consumer_test.exs +++ b/test/integration/elsa/consumer_test.exs @@ -58,6 +58,38 @@ defmodule Elsa.ConsumerTest do Supervisor.stop(pid) end + test "restarts a crashed worker that isn't in a group" do + topic = "consumer-test3" + Elsa.create_topic(@brokers, topic) + + start_supervised!( + {Elsa.Supervisor, + connection: :name1, + endpoints: @brokers, + consumer: [ + topic: topic, + handler: Testing.ExampleMessageHandlerWithState, + handler_init_args: %{pid: self()}, + begin_offset: :earliest + ]} + ) + + send_messages(topic, ["message1"]) + send_messages(topic, ["message2"]) + + assert_receive {:message, %{topic: ^topic, value: "message1"}}, 5_000 + assert_receive {:message, %{topic: ^topic, value: "message2"}}, 5_000 + + kill_worker(topic) + + send_messages(topic, ["message3"]) + send_messages(topic, ["message4"]) + + # These assertions fail, because the worker wasn't brought back up. + assert_receive {:message, %{topic: ^topic, value: "message3"}}, 5_000 + assert_receive {:message, %{topic: ^topic, value: "message4"}}, 5_000 + end + defp send_messages(topic, messages) do :brod.start_link_client(@brokers, :test_client) :brod.start_producer(:test_client, topic, []) @@ -69,6 +101,15 @@ defmodule Elsa.ConsumerTest do :brod.produce_sync(:test_client, topic, partition, "", msg) end) end + + defp kill_worker(topic) do + partition = 0 + + worker_pid = Elsa.Registry.whereis_name({:elsa_registry_name1, :"worker_#{topic}_#{partition}"}) + Process.exit(worker_pid, :kill) + + assert false == Process.alive?(worker_pid) + end end defmodule Testing.ExampleMessageHandlerWithState do