Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow For Multiple Parallel Messages #41

Open
mmmries opened this issue Mar 26, 2022 · 7 comments
Open

Allow For Multiple Parallel Messages #41

mmmries opened this issue Mar 26, 2022 · 7 comments

Comments

@mmmries
Copy link
Owner

mmmries commented Mar 26, 2022

The current implementation is essentially "single-threaded" since it requests a single message and then wait for it to arrive, then we handle that message before sending back the ACK or next_message.

My use-case at work would certainly benefit from the ability to specify a limit of how many messages to handle in parallel. Something like:

@impl Jetstream.PullConsumer
  def init(nil) do
    consumer = [
      connection_name: :gnat,
      stream_name: "my_stream",
      consumer_name: "my_stream",
      max_concurrency: 10
    ]
    {:ok, nil, consumer}
  end

This would match the same option name from Task.async_stream in the standard library.

@mkaput
Copy link
Collaborator

mkaput commented Mar 29, 2022

This is an use-case that I do not want to support in PullConsumer as it is meant to be a super simple API serving say 60-70% use-cases. See #8. Your use case basically would much more benefit from batching messages, so you could then pass these to async_stream or whatever else suits you. For this, I planned to recommend Broadway and to provide Broadway Producer and Consumer modules in this library. What do you think?

@mmmries
Copy link
Owner Author

mmmries commented Apr 9, 2022

Picking up the thread of conversation from #43 (comment)

I did a handful of benchmarks to check on the performance and overhead of starting multiple PullConsumer processes vs starting a single PullConsumer that delegates the work to separate processes.

Benchmark Process and Results

Basic Assumptions

The purpose of these tests is to check for library overhead or bottlenecks. We aren't trying to simulate a realistic workload, we are setting up a simplistic scenario where nats won't be under heavy load so that we can stress test our own library in specific ways.

All of these benchmarks were run on my MacBook Pro (13-inch, M1, 2020) with nats-server: v2.6.6 installed view homebrew. I shut down and started nats between each benchmark run and then created the stream + consumer with memory-only retention to get a comparable nats performance for each run. I also disabled all of the debug logging in our library by adding a config/config.exs file with the following content.

import Config

config :logger,
  backends: [:console],
  compile_time_purge_matching: [
    [level_lower_than: :error]
  ]

For each run, I would start nats server with nats-server -js, then start the jetstream project with iex -S mix and copy-paste in the test scripts below.

Single Threaded Scenario (ie current master branch)

{:ok, _} = Gnat.ConnectionSupervisor.start_link(%{name: :gnat, connection_settings: [%{}]})
{:ok, stream} = Jetstream.API.Stream.create(:gnat, %Jetstream.API.Stream{name: "TEST", storage: :memory, subjects: ["test"]})

Enum.each(1..100_000, fn(i) -> 
  Gnat.pub(:gnat, "test", Jason.encode!(%{num: i, body: "foobar"}))
end)

{:ok, consumer} = Jetstream.API.Consumer.create(:gnat, %Jetstream.API.Consumer{stream_name: "TEST", durable_name: "TEST"})

defmodule ConsumeTest do
  use Jetstream.PullConsumer
  
  def start_link(arg) do
    Jetstream.PullConsumer.start_link(__MODULE__, arg)
  end

  @impl true
  def init(_arg) do
    {:ok, {nil, nil},
      connection_name: :gnat,
      stream_name: "TEST",
      consumer_name: "TEST"}
  end

  @impl true
  def handle_message(message, {nil, ni}) do
    {:ack, {:erlang.monotonic_time(:millisecond), nil}}
  end

  def handle_message(message, {first, _last}) do
    last = :erlang.monotonic_time(:millisecond)
    {:ack, {first, last}}
  end
end

{:ok, pid} = ConsumeTest.start_link(nil)

# Wait for all 100k messages to be consumed

%{mod_state: %{state: {from, to}}} = :sys.get_state(pid)
100_000.0 / ((to - from) / 1000.0)
# this will output a number of messages per second

Results:

10:34:22.956 to 10:35:35.546 => 22.956 to 95.546 => 72.59sec
100k messages / 72.59sec => 1,377.6 msg/sec

After changing log level and removing extra IO.puts messages
-576460748985 to -576460695478 (milliseconds) => 53507ms
100k message / 53.507sec => 1868.9 msg/sec

Multiple PullConsumer Processes

defmodule MessageTimer do
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, nil, name: __MODULE__)
  end

  def init(nil) do
    state = {0, nil, nil}
    {:ok, state}
  end

  def handle_info({:message, milliseconds}, {0, nil, nil}) do
    {:noreply, {1, milliseconds, milliseconds}}
  end

  def handle_info({:message, milliseconds}, state) do
    update_state(state, milliseconds)
    |> check_for_terminal()
  end

  defp check_for_terminal({100_000, first, last}) do
    messages_per_second = 100_000.0 / ((last - first) / 1000.0)
    IO.puts("Received 100k messages at a rate of #{messages_per_second} messages per second")
    {:stop, :normal, nil}
  end
  defp check_for_terminal(state), do: {:noreply, state}

  defp update_state({num_received, first, last}, milliseconds) do
    cond do
      milliseconds < first ->
        {num_received + 1, milliseconds, last}

      milliseconds > last ->
        {num_received + 1, first, milliseconds}

      true ->
        {num_received + 1, first, last}
    end
  end
end

{:ok, _} = MessageTimer.start_link()
{:ok, _} = Gnat.ConnectionSupervisor.start_link(%{name: :gnat, connection_settings: [%{}]})
{:ok, stream} = Jetstream.API.Stream.create(:gnat, %Jetstream.API.Stream{name: "TEST", storage: :memory, subjects: ["test"]})

Enum.each(1..100_000, fn(i) -> 
  Gnat.pub(:gnat, "test", Jason.encode!(%{num: i, body: "foobar"}))
end)

{:ok, consumer} = Jetstream.API.Consumer.create(:gnat, %Jetstream.API.Consumer{stream_name: "TEST", durable_name: "TEST"})

defmodule ConsumeTest do
  use Jetstream.PullConsumer
  
  def start_link(arg) do
    Jetstream.PullConsumer.start_link(__MODULE__, arg)
  end

  @impl true
  def init(_arg) do
    {:ok, {nil, nil},
      connection_name: :gnat,
      stream_name: "TEST",
      consumer_name: "TEST"}
  end

  @impl true
  def handle_message(message, state) do
    send(MessageTimer, {:message, :erlang.monotonic_time(:millisecond)})
    {:ack, state}
  end
end

Enum.each(1..10, fn(_) ->
  {:ok, _pid} = ConsumeTest.start_link(nil)
end)

See full list of parallel test results at the bottom of this section

Batched Message Pull w/ Parallel Processing

defmodule MessageTimer do
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, nil, name: __MODULE__)
  end

  def init(nil) do
    state = {0, nil, nil}
    {:ok, state}
  end

  def handle_info({:message, milliseconds}, {0, nil, nil}) do
    {:noreply, {1, milliseconds, milliseconds}}
  end

  def handle_info({:message, milliseconds}, state) do
    update_state(state, milliseconds)
    |> check_for_terminal()
  end

  defp check_for_terminal({100_000, first, last}) do
    messages_per_second = 100_000.0 / ((last - first) / 1000.0)
    IO.puts("Received 100k messages at a rate of #{messages_per_second} messages per second")
    {:stop, :normal, nil}
  end
  defp check_for_terminal(state), do: {:noreply, state}

  defp update_state({num_received, first, last}, milliseconds) do
    cond do
      milliseconds < first ->
        {num_received + 1, milliseconds, last}

      milliseconds > last ->
        {num_received + 1, first, milliseconds}

      true ->
        {num_received + 1, first, last}
    end
  end
end

{:ok, _} = MessageTimer.start_link()
{:ok, _} = Gnat.ConnectionSupervisor.start_link(%{name: :gnat, connection_settings: [%{}]})
{:ok, stream} = Jetstream.API.Stream.create(:gnat, %Jetstream.API.Stream{name: "TEST", storage: :memory, subjects: ["test"]})

Enum.each(1..100_000, fn(i) -> 
  Gnat.pub(:gnat, "test", Jason.encode!(%{num: i, body: "foobar"}))
end)

{:ok, consumer} = Jetstream.API.Consumer.create(:gnat, %Jetstream.API.Consumer{stream_name: "TEST", durable_name: "TEST"})

defmodule ConsumeTest do
  use Jetstream.PullConsumer
  
  def start_link(arg) do
    Jetstream.PullConsumer.start_link(__MODULE__, arg)
  end

  @impl true
  def init(_arg) do
    {:ok, {nil, nil},
      connection_name: :gnat,
      stream_name: "TEST",
      consumer_name: "TEST"}
  end

  @impl true
  def handle_message(message, state) do
    send(MessageTimer, {:message, :erlang.monotonic_time(:millisecond)})
    {:ack, state}
  end
end

{:ok, pid} = ConsumeTest.start_link(nil)

Parallel Test Results:

Parallel Throughput PullConsumer Throughput Tasks
1 29559.56252 28768.6997
2 48567.26566 47370.9143
4 73046.01899 70422.5352
8 96525.09653 93023.2558
10 88028.16901 101419.878
16 104166.6667 106609.808
20 114155.2511 120336.943
24 120918.9843 130548.303
32 127713.9208 141843.972
64 140845.0704 149700.599
96 134589.502 150602.41
128 131406.0447 149476.831

The results are summarized in this graph.

Screenshot 2022-04-09 at 19 10 20

The blue line shows the messages received acknowledge per second when starting multiple PullConsumer processes and the orange line shows the messages per second when running on a slightly modified version with a single PullConsumer that starts each job in a separate process.

There's not a huge gap in performance between the two methods, so starting multiple PullConsumer's is certainly a viable option.

@mmmries
Copy link
Owner Author

mmmries commented Apr 9, 2022

this is an use-case that I do not want to support in PullConsumer as it is meant to be a super simple API serving say 60-70% use-cases

@mkaput I think this would be a good topic for us to discuss on a call, but if you have any thoughts before we get schedules lined up, I would be happy to read and try to understand asynchronously.

In my working experience, I have mostly worked at companies that were setting up event architectures between multiple backend services. In every case we have had multiple copies of the app running in production (for redundancy/resiliency). So this would mean having a single PullConsumer running per instance the beam and we would still need to deal with the fact that different instances would get only part of the message history.

And in each of my professional projects, we have wanted to avoid having a queue fall behind just because of a single slow message (maybe something waiting on an IO call), so we have always allowed some number (like 20 - 200) of parallel messages to be processed per instance of the application. This helps to keep each service up-to-date with the stream.

So for me, parallelism is the common-case and I would think of running only a single PullConsumer as being niche. It sounds like you have the opposite experience? I would love to better understand that use-case.

@mmmries
Copy link
Owner Author

mmmries commented Apr 19, 2022

@mkaput I've continued thinking about the issue of tracking state in the processes receiving messages, vs tracking it elsewhere. There are a few other minor reasons to prefer separate processes:

  • better throughput as seen above
  • simple supervision tree (having 1 or 2 supervised processes vs 100 supervised processes)
  • memory performance (handling each message in its own process means you can very efficiently recover memory after processing)
  • fault isolation (lots of options for returning nack on a single message or even letting a process crash without blocking other things from continuing on)

But I think all of those issues are relatively minor, the big issue is whether we are primarily writing a library to allow individual processes to receive small amounts of messages and track state in memory very efficiently (ie GenServer model) or enabling deployments where you have multiple copies of the app running and you need to potentially keep up with a large volume of messages without blocking the stream.

@byu I would also love to get your take on which of these use-cases best fits your problem space.

@marmor157
Copy link
Collaborator

marmor157 commented Apr 22, 2022

Hi @mmmries I've run some more benchmarks based on yours but with average of 10 runs per batch size, so it's less likely that we had some weird spikes etc. added https://github.com/membraneframework/beamchmark, so it's possible to see whats is holding us back and also on two different machines and got these results with small message size:

image

Looking at Beamchmark data we've came to a conclusion that requesting Jetstream itself is a chokepoint here and that might explain why in larger batch sizes one PullConsumer is faster as at the start as we send one request instead of i.e. 128

But also to check for larger size of messages I've mocked larger message(~6kB)

image

Then with example message from our system(~1kB)

image

So for me it feels like it really depends on use case and think that we should somehow support both.

Please let me know if I've missed something that would make this measurements incorrect.

If you would like to check out the code I've forked the repo marmor157#1

@brandynbennett
Copy link
Collaborator

It looks like JetStream has Batching built into its system and it makes sense to me to lean into the built-in batching mechanism as it will likely be the most optimized. It sounds cumbersome to me to require developers to create a separate Jetstream.PullConsumer for each parallel message they want to consume. Best I can tell it looks like the Broadway PR takes care of the complex case with concurrency and batching and the existing Jetstream.PullConsumer is good for simpler cases of processing one message at a time. I'm not super familiar with Broadway, but it makes sense to me to leverage it for the complex concurrency case because it already knows how to do that and won't require reinventing it ourselves.

@marmor157
Copy link
Collaborator

@mmmries What are your thoughts about the Broadway approach of handling parallel messages? We would like to finally make a release of this library and this discussion seems like the only thing that is blocking the release.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants