From 4ec66f8bc5575fdfd874b15c55b8b47eec8d26bc Mon Sep 17 00:00:00 2001 From: hissssst Date: Sat, 3 Aug 2024 14:36:34 +0200 Subject: [PATCH 1/6] HTTP1 stream --- lib/finch.ex | 5 ++ lib/finch/http1/conn.ex | 171 ++++++++++++++++++++++++++++++++++++++++ lib/finch/http1/pool.ex | 143 +++++++++++++++++++++++++++++++-- 3 files changed, 311 insertions(+), 8 deletions(-) diff --git a/lib/finch.ex b/lib/finch.ex index cc7115f..52d775b 100644 --- a/lib/finch.ex +++ b/lib/finch.ex @@ -307,6 +307,11 @@ defmodule Finch do end end + def actual_stream(request, name, opts \\ []) do + {pool, pool_mod} = get_pool(request, name) + pool_mod.stream(pool, request, name, opts) + end + @doc """ Builds an HTTP request to be sent with `request/3` or `stream/4`. diff --git a/lib/finch/http1/conn.ex b/lib/finch/http1/conn.ex index b331b91..4babf27 100644 --- a/lib/finch/http1/conn.ex +++ b/lib/finch/http1/conn.ex @@ -100,6 +100,90 @@ defmodule Finch.HTTP1.Conn do end end + def stream(%{mint: nil} = conn, _, _, _, _, _), do: {:error, conn, "Could not connect"} + + def stream(conn, req, name, ref, handler, receive_timeout, request_timeout, idle_time) do + full_path = Finch.Request.request_path(req) + + metadata = %{request: req, name: name} + + extra_measurements = %{idle_time: idle_time} + + start_time = Telemetry.start(:send, metadata, extra_measurements) + + try do + case Mint.HTTP.request( + conn.mint, + req.method, + full_path, + req.headers, + stream_or_body(req.body) + ) do + {:ok, mint, mint_ref} -> + case maybe_stream_request_body(mint, mint_ref, req.body) do + {:ok, mint} -> + Telemetry.stop(:send, start_time, metadata, extra_measurements) + start_time = Telemetry.start(:recv, metadata, extra_measurements) + resp_metadata = %{status: nil, headers: [], trailers: []} + timeouts = %{receive_timeout: receive_timeout, request_timeout: request_timeout} + + stream = + Stream.resource( + fn -> {[], mint, mint_ref, timeouts, resp_metadata} end, + &receive_stream_response/1, + fn + {mint, error, resp_metadata} -> + conn = %{conn | mint: mint} + + state = + if open?(conn) do + {:ok, conn} + else + :closed + end + + if error do + metadata = Map.merge(metadata, Map.put(resp_metadata, :error, error)) + Telemetry.stop(:recv, start_time, metadata, extra_measurements) + raise error + else + metadata = Map.merge(metadata, resp_metadata) + Telemetry.stop(:recv, start_time, metadata, extra_measurements) + end + + send(handler, {ref, :stop, state}) + + {_entries, mint, _mint_ref, _timeout, _resp_metadata} -> + # In case some exception occured, we close the connection + Mint.HTTP.close(mint) + send(handler, {ref, :stop, :closed}) + end + ) + + {:ok, stream} + + {:error, mint, error} -> + handle_request_error( + conn, + mint, + error, + metadata, + start_time, + extra_measurements + ) + end + + {:error, mint, error} -> + handle_request_error(conn, mint, error, metadata, start_time, extra_measurements) + end + catch + kind, error -> + close(conn) + Telemetry.exception(:recv, start_time, kind, error, __STACKTRACE__, metadata) + :erlang.raise(kind, error, __STACKTRACE__) + end + end + def request(%{mint: nil} = conn, _, _, _, _, _, _, _), do: {:error, conn, "Could not connect"} def request(conn, req, acc, fun, name, receive_timeout, request_timeout, idle_time) do @@ -368,4 +452,91 @@ defmodule Finch.HTTP1.Conn do {:error, mint, error, resp_metadata} end end + + defp receive_stream_response({ + [{:done, _minf_ref} | _], + mint, + _mint_ref, + _timeouts, + resp_metadata + }) do + {:halt, {mint, nil, resp_metadata}} + |> IO.inspect(label: :x1) + end + + defp receive_stream_response({ + _, + mint, + _mint_ref, + timeouts, + resp_metadata + }) + when timeouts.request_timeout < 0 do + {:ok, mint} = Mint.HTTP1.close(mint) + + {:halt, {mint, %Mint.TransportError{reason: :timeout}, resp_metadata}} + |> IO.inspect(label: :x2) + end + + defp receive_stream_response({ + [], + mint, + mint_ref, + timeouts, + resp_metadata + }) do + start_time = System.monotonic_time(:millisecond) + %{} = timeouts + + case Mint.HTTP.recv(mint, 0, timeouts.receive_timeout) do + {:ok, mint, entries} -> + timeouts = + if is_integer(timeouts.request_timeout) do + elapsed_time = System.monotonic_time(:millisecond) - start_time + update_in(timeouts.request_timeout, &(&1 - elapsed_time)) + else + timeouts + end + + {[], {entries, mint, mint_ref, timeouts, resp_metadata}} + + {:error, mint, error, _responses} -> + {:halt, {mint, error, resp_metadata}} + end + |> IO.inspect(label: :x3) + end + + defp receive_stream_response({ + [entry | entries], + mint, + mint_ref, + timeouts, + resp_metadata + }) do + case entry do + {key, ^mint_ref, value} when key in ~w[status headers data]a -> + resp_metadata = + case key do + :headers -> + Map.update(resp_metadata, :headers, value, &(&1 ++ value)) + + _ -> + resp_metadata + end + + acc = { + entries, + mint, + mint_ref, + timeouts, + %{resp_metadata | status: value} + } + + {[{key, value}], acc} + + {:error, ^mint_ref, error} -> + {:halt, {mint, error, resp_metadata}} + end + |> IO.inspect(label: :x4) + end end diff --git a/lib/finch/http1/pool.ex b/lib/finch/http1/pool.ex index 771a9e9..4644c62 100644 --- a/lib/finch/http1/pool.ex +++ b/lib/finch/http1/pool.ex @@ -38,6 +38,119 @@ defmodule Finch.HTTP1.Pool do ) end + def stream(pool, req, name, opts) do + pool_timeout = Keyword.get(opts, :pool_timeout, 5_000) + receive_timeout = Keyword.get(opts, :receive_timeout, 15_000) + request_timeout = Keyword.get(opts, :request_timeout, 30_000) + + metadata = %{request: req, pool: pool, name: name} + + start_time = Telemetry.start(:queue, metadata) + owner = self() + ref = make_ref() + + holder = + spawn_link(fn -> + try do + NimblePool.checkout!( + pool, + :checkout, + fn from, {state, conn, idle_time} -> + Telemetry.stop(:queue, start_time, metadata, %{idle_time: idle_time}) + + case Conn.connect(conn, name) do + {:ok, conn} -> + send(owner, {ref, :ok, {conn, from, idle_time}}) + {:ok, if_open(conn, state, from)} + + {:error, _conn, reason} -> + send(owner, {ref, :error, reason}) + {:ok, if_open(conn, state, from)} + end + + receive do + {^ref, :stop, state} -> {:ok, state} + end + end, + pool_timeout + ) + catch + :exit, data -> + Telemetry.exception(:queue, start_time, :exit, data, __STACKTRACE__, metadata) + send(owner, {ref, :exit, {data, __STACKTRACE__}}) + end + end) + + receive do + {^ref, :ok, conn_from} -> + {conn, _from, idle_time} = conn_from + + case Conn.transfer(conn, self()) do + {:ok, conn} -> + case Conn.stream( + conn, + req, + name, + ref, + holder, + receive_timeout, + request_timeout, + idle_time + ) do + {:ok, stream} -> + {:ok, stream} + + {:error, conn, error} -> + state = + if Conn.open?(conn) do + {:ok, conn} + else + :closed + end + + send(holder, {ref, :stop, state}) + {:error, error} + end + + {:error, _, _} -> + send(holder, {ref, :stop, :closed}) + end + + {^ref, :error, reason} -> + {:error, reason} + + {^ref, :exit, data_trace} -> + {data, trace} = data_trace + + case data do + {:timeout, {NimblePool, :checkout, _affected_pids}} -> + # Provide helpful error messages for known errors + reraise( + """ + Finch was unable to provide a connection within the timeout due to excess queuing \ + for connections. Consider adjusting the pool size, count, timeout or reducing the \ + rate of requests if it is possible that the downstream service is unable to keep up \ + with the current rate. + """, + trace + ) + + _ -> + exit(data) + end + after + pool_timeout + request_timeout -> + # Cleanup late messages + receive do + {^ref, _, _} -> :ok + after + 0 -> :ok + end + + raise "Has not received message from pool yet" + end + end + @impl Finch.Pool def request(pool, req, acc, fun, name, opts) do pool_timeout = Keyword.get(opts, :pool_timeout, 5_000) @@ -279,17 +392,31 @@ defmodule Finch.HTTP1.Pool do def handle_cancelled(:queued, _pool_state), do: :ok - defp transfer_if_open(conn, state, {pid, _} = from) do + defp if_open(conn, state, from) do if Conn.open?(conn) do - if state == :fresh do + with :fresh <- state do NimblePool.update(from, conn) + end - case Conn.transfer(conn, pid) do - {:ok, conn} -> {:ok, conn} - {:error, _, _} -> :closed - end - else - {:ok, conn} + {:ok, conn} + else + :closed + end + end + + defp transfer_if_open(conn, state, {pid, _} = from) do + if Conn.open?(conn) do + case state do + :fresh -> + NimblePool.update(from, conn) + + case Conn.transfer(conn, pid) do + {:ok, conn} -> {:ok, conn} + {:error, _, _} -> :closed + end + + _ -> + {:ok, conn} end else :closed From 045ba72004890d16c677214d816eea4c1cf10d5c Mon Sep 17 00:00:00 2001 From: hissssst Date: Sat, 3 Aug 2024 19:09:08 +0200 Subject: [PATCH 2/6] Test and doc --- lib/finch.ex | 17 ++++++++ lib/finch/http1/conn.ex | 5 --- lib/finch/http1/pool.ex | 87 +++++++++++++++++------------------------ lib/finch/http2/pool.ex | 5 +++ lib/finch/pool.ex | 3 ++ test/finch_test.exs | 37 ++++++++++++++++++ 6 files changed, 98 insertions(+), 56 deletions(-) diff --git a/lib/finch.ex b/lib/finch.ex index 52d775b..e29fd79 100644 --- a/lib/finch.ex +++ b/lib/finch.ex @@ -307,11 +307,28 @@ defmodule Finch do end end + @doc """ + Like `stream/5`, but returns valid `Stream` structure, safe for passing between processes, but working __only__ + on local node. Works only for HTTP1, HTTP2 not supported currently + """ + @spec actual_stream(Request.t(), name(), request_opts()) :: + {:ok, Enumerable.t()} | {:error, Exception.t()} def actual_stream(request, name, opts \\ []) do {pool, pool_mod} = get_pool(request, name) pool_mod.stream(pool, request, name, opts) end + @doc """ + Raising version of `actual_stream/3` + """ + @spec actual_stream!(Request.t(), name(), request_opts()) :: Enumerable.t() + def actual_stream!(request, name, opts \\ []) do + case actual_stream(request, name, opts) do + {:ok, stream} -> stream + exception -> raise exception + end + end + @doc """ Builds an HTTP request to be sent with `request/3` or `stream/4`. diff --git a/lib/finch/http1/conn.ex b/lib/finch/http1/conn.ex index 4babf27..d70d98c 100644 --- a/lib/finch/http1/conn.ex +++ b/lib/finch/http1/conn.ex @@ -461,7 +461,6 @@ defmodule Finch.HTTP1.Conn do resp_metadata }) do {:halt, {mint, nil, resp_metadata}} - |> IO.inspect(label: :x1) end defp receive_stream_response({ @@ -473,9 +472,7 @@ defmodule Finch.HTTP1.Conn do }) when timeouts.request_timeout < 0 do {:ok, mint} = Mint.HTTP1.close(mint) - {:halt, {mint, %Mint.TransportError{reason: :timeout}, resp_metadata}} - |> IO.inspect(label: :x2) end defp receive_stream_response({ @@ -503,7 +500,6 @@ defmodule Finch.HTTP1.Conn do {:error, mint, error, _responses} -> {:halt, {mint, error, resp_metadata}} end - |> IO.inspect(label: :x3) end defp receive_stream_response({ @@ -537,6 +533,5 @@ defmodule Finch.HTTP1.Conn do {:error, ^mint_ref, error} -> {:halt, {mint, error, resp_metadata}} end - |> IO.inspect(label: :x4) end end diff --git a/lib/finch/http1/pool.ex b/lib/finch/http1/pool.ex index 4644c62..6fa6e6a 100644 --- a/lib/finch/http1/pool.ex +++ b/lib/finch/http1/pool.ex @@ -38,6 +38,7 @@ defmodule Finch.HTTP1.Pool do ) end + @impl Finch.Pool def stream(pool, req, name, opts) do pool_timeout = Keyword.get(opts, :pool_timeout, 5_000) receive_timeout = Keyword.get(opts, :receive_timeout, 15_000) @@ -57,16 +58,7 @@ defmodule Finch.HTTP1.Pool do :checkout, fn from, {state, conn, idle_time} -> Telemetry.stop(:queue, start_time, metadata, %{idle_time: idle_time}) - - case Conn.connect(conn, name) do - {:ok, conn} -> - send(owner, {ref, :ok, {conn, from, idle_time}}) - {:ok, if_open(conn, state, from)} - - {:error, _conn, reason} -> - send(owner, {ref, :error, reason}) - {:ok, if_open(conn, state, from)} - end + send(owner, {ref, :ok, {conn, from, state, idle_time}}) receive do {^ref, :stop, state} -> {:ok, state} @@ -83,37 +75,38 @@ defmodule Finch.HTTP1.Pool do receive do {^ref, :ok, conn_from} -> - {conn, _from, idle_time} = conn_from + {conn, from, state, idle_time} = conn_from + + with {:ok, conn} <- Conn.connect(conn, name), + {:ok, conn} <- transfer_if_open(conn, state, from), + {:ok, stream} <- + Conn.stream( + conn, + req, + name, + ref, + holder, + receive_timeout, + request_timeout, + idle_time + ) do + {:ok, stream} + else + :closed -> + send(holder, {ref, :stop, state}) + # FIXME + {:error, :closed} + + {:error, conn, error} -> + state = + if Conn.open?(conn) do + {:ok, conn} + else + :closed + end - case Conn.transfer(conn, self()) do - {:ok, conn} -> - case Conn.stream( - conn, - req, - name, - ref, - holder, - receive_timeout, - request_timeout, - idle_time - ) do - {:ok, stream} -> - {:ok, stream} - - {:error, conn, error} -> - state = - if Conn.open?(conn) do - {:ok, conn} - else - :closed - end - - send(holder, {ref, :stop, state}) - {:error, error} - end - - {:error, _, _} -> - send(holder, {ref, :stop, :closed}) + send(holder, {ref, :stop, state}) + {:error, error} end {^ref, :error, reason} -> @@ -392,19 +385,11 @@ defmodule Finch.HTTP1.Pool do def handle_cancelled(:queued, _pool_state), do: :ok - defp if_open(conn, state, from) do - if Conn.open?(conn) do - with :fresh <- state do - NimblePool.update(from, conn) - end - - {:ok, conn} - else - :closed - end + defp transfer_if_open(conn, state, {pid, _} = from) do + transfer_if_open(conn, state, from, pid) end - defp transfer_if_open(conn, state, {pid, _} = from) do + defp transfer_if_open(conn, state, from, pid) do if Conn.open?(conn) do case state do :fresh -> diff --git a/lib/finch/http2/pool.ex b/lib/finch/http2/pool.ex index 604a01d..713831b 100644 --- a/lib/finch/http2/pool.ex +++ b/lib/finch/http2/pool.ex @@ -27,6 +27,11 @@ defmodule Finch.HTTP2.Pool do } end + @impl Finch.Pool + def stream(_pool, _request, _name, _opts) do + {:error, %RuntimeError{message: "Streaming is not supported for HTTP2"}} + end + # Call the pool with the request. The pool will multiplex multiple requests # and stream the result set back to the calling process using `send` @impl Finch.Pool diff --git a/lib/finch/pool.ex b/lib/finch/pool.ex index 8fbbb88..c8d953f 100644 --- a/lib/finch/pool.ex +++ b/lib/finch/pool.ex @@ -4,6 +4,9 @@ defmodule Finch.Pool do @type request_ref :: {pool_mod :: module(), cancel_ref :: term()} + @callback stream(pid(), Finch.Request.t(), finch_name :: atom(), list()) :: + {:ok, Stream.t()} | {:error, Exception.t()} + @callback request( pid(), Finch.Request.t(), diff --git a/test/finch_test.exs b/test/finch_test.exs index 12c80ef..b8c972a 100644 --- a/test/finch_test.exs +++ b/test/finch_test.exs @@ -625,6 +625,43 @@ defmodule FinchTest do end end + describe "actual_stream/3" do + test "Not supported for HTTP2", %{finch_name: finch_name} do + start_supervised!( + {Finch, name: finch_name, pools: %{default: [protocols: [:http2], size: 10, count: 1]}} + ) + + request = Finch.build("GET", "http://example.com/") + + assert {:error, %RuntimeError{message: "Streaming is not supported for HTTP2"}} = + Finch.actual_stream(request, finch_name) + end + + test "Streams response", %{bypass: bypass, finch_name: finch_name} do + start_supervised!({Finch, name: finch_name}) + + Bypass.expect_once(bypass, "GET", "/", fn conn -> + Plug.Conn.send_resp(conn, 200, "OK") + end) + + request = Finch.build("GET", endpoint(bypass)) + + assert {:ok, stream} = Finch.actual_stream(request, finch_name) + + assert [ + {:status, 200}, + {:headers, + [ + {"cache-control", "max-age=0, private, must-revalidate"}, + {"content-length", "2"}, + {"date", _}, + {"server", "Cowboy"} + ]}, + {:data, "OK"} + ] = Enum.to_list(stream) + end + end + describe "stream/5" do test "successful get request, with query string", %{bypass: bypass, finch_name: finch_name} do start_supervised!({Finch, name: finch_name}) From 4aaf5573753cb2afe844babf6cbc89c9513c486a Mon Sep 17 00:00:00 2001 From: hissssst Date: Sat, 3 Aug 2024 19:41:32 +0200 Subject: [PATCH 3/6] Requested changes, process transfer and sending stream test --- lib/finch/http1/conn.ex | 14 ++++---------- lib/finch/http1/pool.ex | 43 ++++++++++++++--------------------------- test/finch_test.exs | 37 +++++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 38 deletions(-) diff --git a/lib/finch/http1/conn.ex b/lib/finch/http1/conn.ex index d70d98c..531dbc7 100644 --- a/lib/finch/http1/conn.ex +++ b/lib/finch/http1/conn.ex @@ -135,13 +135,6 @@ defmodule Finch.HTTP1.Conn do {mint, error, resp_metadata} -> conn = %{conn | mint: mint} - state = - if open?(conn) do - {:ok, conn} - else - :closed - end - if error do metadata = Map.merge(metadata, Map.put(resp_metadata, :error, error)) Telemetry.stop(:recv, start_time, metadata, extra_measurements) @@ -151,12 +144,13 @@ defmodule Finch.HTTP1.Conn do Telemetry.stop(:recv, start_time, metadata, extra_measurements) end - send(handler, {ref, :stop, state}) + send(handler, {ref, :stop, conn}) {_entries, mint, _mint_ref, _timeout, _resp_metadata} -> # In case some exception occured, we close the connection - Mint.HTTP.close(mint) - send(handler, {ref, :stop, :closed}) + {:ok, mint} = Mint.HTTP.close(mint) + conn = %{conn | mint: mint} + send(handler, {ref, :stop, conn}) end ) diff --git a/lib/finch/http1/pool.ex b/lib/finch/http1/pool.ex index 6fa6e6a..dc255ec 100644 --- a/lib/finch/http1/pool.ex +++ b/lib/finch/http1/pool.ex @@ -58,10 +58,11 @@ defmodule Finch.HTTP1.Pool do :checkout, fn from, {state, conn, idle_time} -> Telemetry.stop(:queue, start_time, metadata, %{idle_time: idle_time}) - send(owner, {ref, :ok, {conn, from, state, idle_time}}) + send(owner, {ref, :ok, {conn, idle_time}}) receive do - {^ref, :stop, state} -> {:ok, state} + {^ref, :stop, conn} -> + transfer_if_open(conn, state, from) end end, pool_timeout @@ -74,11 +75,11 @@ defmodule Finch.HTTP1.Pool do end) receive do - {^ref, :ok, conn_from} -> - {conn, from, state, idle_time} = conn_from + {^ref, :ok, conn_idle_time} -> + {conn, idle_time} = conn_idle_time with {:ok, conn} <- Conn.connect(conn, name), - {:ok, conn} <- transfer_if_open(conn, state, from), + {:ok, conn} <- Conn.transfer(conn, self()), {:ok, stream} <- Conn.stream( conn, @@ -92,20 +93,8 @@ defmodule Finch.HTTP1.Pool do ) do {:ok, stream} else - :closed -> - send(holder, {ref, :stop, state}) - # FIXME - {:error, :closed} - {:error, conn, error} -> - state = - if Conn.open?(conn) do - {:ok, conn} - else - :closed - end - - send(holder, {ref, :stop, state}) + send(holder, {ref, :stop, conn}) {:error, error} end @@ -391,17 +380,15 @@ defmodule Finch.HTTP1.Pool do defp transfer_if_open(conn, state, from, pid) do if Conn.open?(conn) do - case state do - :fresh -> - NimblePool.update(from, conn) + if state == :fresh do + NimblePool.update(from, conn) - case Conn.transfer(conn, pid) do - {:ok, conn} -> {:ok, conn} - {:error, _, _} -> :closed - end - - _ -> - {:ok, conn} + case Conn.transfer(conn, pid) do + {:ok, conn} -> {:ok, conn} + {:error, _, _} -> :closed + end + else + {:ok, conn} end else :closed diff --git a/test/finch_test.exs b/test/finch_test.exs index b8c972a..e84950f 100644 --- a/test/finch_test.exs +++ b/test/finch_test.exs @@ -660,6 +660,43 @@ defmodule FinchTest do {:data, "OK"} ] = Enum.to_list(stream) end + + test "Streams response even when sent to another process", %{ + bypass: bypass, + finch_name: finch_name + } do + start_supervised!({Finch, name: finch_name}) + + Bypass.expect_once(bypass, "GET", "/", fn conn -> + Plug.Conn.send_resp(conn, 200, "OK") + end) + + request = Finch.build("GET", endpoint(bypass)) + + assert {:ok, stream} = Finch.actual_stream(request, finch_name) + + owner = self() + + spawn_link(fn -> + assert [ + {:status, 200}, + {:headers, + [ + {"cache-control", "max-age=0, private, must-revalidate"}, + {"content-length", "2"}, + {"date", _}, + {"server", "Cowboy"} + ]}, + {:data, "OK"} + ] = Enum.to_list(stream) + + send(owner, :continue) + end) + + receive do + :continue -> :ok + end + end end describe "stream/5" do From 0dd28a1908a701439c3ff0328113595932555ae0 Mon Sep 17 00:00:00 2001 From: hissssst Date: Sat, 3 Aug 2024 23:51:20 +0200 Subject: [PATCH 4/6] Fail safe timeout and FN implementation --- lib/finch/http1/conn.ex | 160 ---------------------------------------- lib/finch/http1/pool.ex | 66 ++++++++++++----- 2 files changed, 47 insertions(+), 179 deletions(-) diff --git a/lib/finch/http1/conn.ex b/lib/finch/http1/conn.ex index 531dbc7..b331b91 100644 --- a/lib/finch/http1/conn.ex +++ b/lib/finch/http1/conn.ex @@ -100,84 +100,6 @@ defmodule Finch.HTTP1.Conn do end end - def stream(%{mint: nil} = conn, _, _, _, _, _), do: {:error, conn, "Could not connect"} - - def stream(conn, req, name, ref, handler, receive_timeout, request_timeout, idle_time) do - full_path = Finch.Request.request_path(req) - - metadata = %{request: req, name: name} - - extra_measurements = %{idle_time: idle_time} - - start_time = Telemetry.start(:send, metadata, extra_measurements) - - try do - case Mint.HTTP.request( - conn.mint, - req.method, - full_path, - req.headers, - stream_or_body(req.body) - ) do - {:ok, mint, mint_ref} -> - case maybe_stream_request_body(mint, mint_ref, req.body) do - {:ok, mint} -> - Telemetry.stop(:send, start_time, metadata, extra_measurements) - start_time = Telemetry.start(:recv, metadata, extra_measurements) - resp_metadata = %{status: nil, headers: [], trailers: []} - timeouts = %{receive_timeout: receive_timeout, request_timeout: request_timeout} - - stream = - Stream.resource( - fn -> {[], mint, mint_ref, timeouts, resp_metadata} end, - &receive_stream_response/1, - fn - {mint, error, resp_metadata} -> - conn = %{conn | mint: mint} - - if error do - metadata = Map.merge(metadata, Map.put(resp_metadata, :error, error)) - Telemetry.stop(:recv, start_time, metadata, extra_measurements) - raise error - else - metadata = Map.merge(metadata, resp_metadata) - Telemetry.stop(:recv, start_time, metadata, extra_measurements) - end - - send(handler, {ref, :stop, conn}) - - {_entries, mint, _mint_ref, _timeout, _resp_metadata} -> - # In case some exception occured, we close the connection - {:ok, mint} = Mint.HTTP.close(mint) - conn = %{conn | mint: mint} - send(handler, {ref, :stop, conn}) - end - ) - - {:ok, stream} - - {:error, mint, error} -> - handle_request_error( - conn, - mint, - error, - metadata, - start_time, - extra_measurements - ) - end - - {:error, mint, error} -> - handle_request_error(conn, mint, error, metadata, start_time, extra_measurements) - end - catch - kind, error -> - close(conn) - Telemetry.exception(:recv, start_time, kind, error, __STACKTRACE__, metadata) - :erlang.raise(kind, error, __STACKTRACE__) - end - end - def request(%{mint: nil} = conn, _, _, _, _, _, _, _), do: {:error, conn, "Could not connect"} def request(conn, req, acc, fun, name, receive_timeout, request_timeout, idle_time) do @@ -446,86 +368,4 @@ defmodule Finch.HTTP1.Conn do {:error, mint, error, resp_metadata} end end - - defp receive_stream_response({ - [{:done, _minf_ref} | _], - mint, - _mint_ref, - _timeouts, - resp_metadata - }) do - {:halt, {mint, nil, resp_metadata}} - end - - defp receive_stream_response({ - _, - mint, - _mint_ref, - timeouts, - resp_metadata - }) - when timeouts.request_timeout < 0 do - {:ok, mint} = Mint.HTTP1.close(mint) - {:halt, {mint, %Mint.TransportError{reason: :timeout}, resp_metadata}} - end - - defp receive_stream_response({ - [], - mint, - mint_ref, - timeouts, - resp_metadata - }) do - start_time = System.monotonic_time(:millisecond) - %{} = timeouts - - case Mint.HTTP.recv(mint, 0, timeouts.receive_timeout) do - {:ok, mint, entries} -> - timeouts = - if is_integer(timeouts.request_timeout) do - elapsed_time = System.monotonic_time(:millisecond) - start_time - update_in(timeouts.request_timeout, &(&1 - elapsed_time)) - else - timeouts - end - - {[], {entries, mint, mint_ref, timeouts, resp_metadata}} - - {:error, mint, error, _responses} -> - {:halt, {mint, error, resp_metadata}} - end - end - - defp receive_stream_response({ - [entry | entries], - mint, - mint_ref, - timeouts, - resp_metadata - }) do - case entry do - {key, ^mint_ref, value} when key in ~w[status headers data]a -> - resp_metadata = - case key do - :headers -> - Map.update(resp_metadata, :headers, value, &(&1 ++ value)) - - _ -> - resp_metadata - end - - acc = { - entries, - mint, - mint_ref, - timeouts, - %{resp_metadata | status: value} - } - - {[{key, value}], acc} - - {:error, ^mint_ref, error} -> - {:halt, {mint, error, resp_metadata}} - end - end end diff --git a/lib/finch/http1/pool.ex b/lib/finch/http1/pool.ex index dc255ec..076b492 100644 --- a/lib/finch/http1/pool.ex +++ b/lib/finch/http1/pool.ex @@ -43,6 +43,7 @@ defmodule Finch.HTTP1.Pool do pool_timeout = Keyword.get(opts, :pool_timeout, 5_000) receive_timeout = Keyword.get(opts, :receive_timeout, 15_000) request_timeout = Keyword.get(opts, :request_timeout, 30_000) + fail_safe_timeout = Keyword.get(opts, :fail_safe_timeout, 15 * 60_000) metadata = %{request: req, pool: pool, name: name} @@ -63,10 +64,20 @@ defmodule Finch.HTTP1.Pool do receive do {^ref, :stop, conn} -> transfer_if_open(conn, state, from) + after + fail_safe_timeout -> + {_, state} = transfer_if_open(conn, state, from) + {:fail_safe_timeout, state} end end, pool_timeout ) + else + :fail_safe_timeout -> + raise "Fail safe timeout was hit" + + other -> + other catch :exit, data -> Telemetry.exception(:queue, start_time, :exit, data, __STACKTRACE__, metadata) @@ -78,25 +89,42 @@ defmodule Finch.HTTP1.Pool do {^ref, :ok, conn_idle_time} -> {conn, idle_time} = conn_idle_time - with {:ok, conn} <- Conn.connect(conn, name), - {:ok, conn} <- Conn.transfer(conn, self()), - {:ok, stream} <- - Conn.stream( - conn, - req, - name, - ref, - holder, - receive_timeout, - request_timeout, - idle_time - ) do - {:ok, stream} - else - {:error, conn, error} -> - send(holder, {ref, :stop, conn}) - {:error, error} - end + stream = + fn + {:cont, acc}, function -> + Process.link(holder) + + try do + with {:ok, conn} <- Conn.connect(conn, name), + {:ok, conn} <- Conn.transfer(conn, self()), + {:ok, conn, acc} <- + Conn.request( + conn, + req, + acc, + function, + name, + receive_timeout, + request_timeout, + idle_time + ) do + send(holder, {ref, :stop, conn}) + {:cont, acc} + else + {:error, conn, _error} -> + send(holder, {ref, :stop, conn}) + end + catch + class, reason -> + send(holder, {ref, :stop, conn}) + :erlang.raise(class, reason, __STACKTRACE__) + end + + {_, _acc}, _function -> + send(holder, {ref, :stop, conn}) + end + + {:ok, stream} {^ref, :error, reason} -> {:error, reason} From 6b18ebffd7316b19e899ae7b0260fc3deacfa38a Mon Sep 17 00:00:00 2001 From: hissssst Date: Sun, 4 Aug 2024 16:18:19 +0200 Subject: [PATCH 5/6] Fail safe timeout implementation and tests --- lib/finch.ex | 13 +++++++- lib/finch/http1/pool.ex | 61 +++++++++++++++++++++++++--------- test/finch_test.exs | 72 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 129 insertions(+), 17 deletions(-) diff --git a/lib/finch.ex b/lib/finch.ex index e29fd79..046354b 100644 --- a/lib/finch.ex +++ b/lib/finch.ex @@ -309,7 +309,18 @@ defmodule Finch do @doc """ Like `stream/5`, but returns valid `Stream` structure, safe for passing between processes, but working __only__ - on local node. Works only for HTTP1, HTTP2 not supported currently + on local node. Works only for HTTP1, HTTP2 not supported currently. + + ## Options + + Same as `request/3` plus + + * `:fail_safe_timeout` (`t:timeout/0`) (optional) (default is 15 minutes) - timeout in milliseconds. + Since this function returns an Enumerable which is lazily executed, it makes sense to + have a timeout which will close the connection in case it's never read from in erroneous situations. + + * `:stop_notify` (`{t:GenServer.name/0 | t:pid/0, t:any/0}`) (optional) - destination and message which + will be notified once connection is returned to pool or closed. """ @spec actual_stream(Request.t(), name(), request_opts()) :: {:ok, Enumerable.t()} | {:error, Exception.t()} diff --git a/lib/finch/http1/pool.ex b/lib/finch/http1/pool.ex index 076b492..bb992b4 100644 --- a/lib/finch/http1/pool.ex +++ b/lib/finch/http1/pool.ex @@ -44,6 +44,7 @@ defmodule Finch.HTTP1.Pool do receive_timeout = Keyword.get(opts, :receive_timeout, 15_000) request_timeout = Keyword.get(opts, :request_timeout, 30_000) fail_safe_timeout = Keyword.get(opts, :fail_safe_timeout, 15 * 60_000) + stop_notify = Keyword.get(opts, :stop_notify, nil) metadata = %{request: req, pool: pool, name: name} @@ -58,17 +59,37 @@ defmodule Finch.HTTP1.Pool do pool, :checkout, fn from, {state, conn, idle_time} -> + if fail_safe_timeout != :infinity do + Process.send_after(self(), :fail_safe_timeout, fail_safe_timeout) + end + Telemetry.stop(:queue, start_time, metadata, %{idle_time: idle_time}) - send(owner, {ref, :ok, {conn, idle_time}}) - - receive do - {^ref, :stop, conn} -> - transfer_if_open(conn, state, from) - after - fail_safe_timeout -> - {_, state} = transfer_if_open(conn, state, from) - {:fail_safe_timeout, state} + + return = + case Conn.connect(conn, name) do + {:ok, conn} -> + send(owner, {ref, :ok, {conn, idle_time}}) + + receive do + {^ref, :stop, conn} -> + with :closed <- transfer_if_open(conn, state, from) do + {:ok, :closed} + end + + :fail_safe_timeout -> + Conn.close(conn) + {:fail_safe_timeout, :closed} + end + + {:error, conn, error} -> + {{:error, error}, transfer_if_open(conn, state, from)} + end + + with {to, message} <- stop_notify do + send(to, message) end + + return end, pool_timeout ) @@ -92,12 +113,18 @@ defmodule Finch.HTTP1.Pool do stream = fn {:cont, acc}, function -> - Process.link(holder) + case Process.alive?(holder) do + true -> + Process.link(holder) + + false -> + raise "Process holding the connection in pool died" <> + " before the stream enumeration started." <> + " Most likely fail_safe_timeout occured" + end try do - with {:ok, conn} <- Conn.connect(conn, name), - {:ok, conn} <- Conn.transfer(conn, self()), - {:ok, conn, acc} <- + with {:ok, conn, acc} <- Conn.request( conn, req, @@ -109,10 +136,11 @@ defmodule Finch.HTTP1.Pool do idle_time ) do send(holder, {ref, :stop, conn}) - {:cont, acc} + {:done, acc} else - {:error, conn, _error} -> + {:error, conn, error} -> send(holder, {ref, :stop, conn}) + raise error end catch class, reason -> @@ -120,8 +148,9 @@ defmodule Finch.HTTP1.Pool do :erlang.raise(class, reason, __STACKTRACE__) end - {_, _acc}, _function -> + {_, acc}, _function -> send(holder, {ref, :stop, conn}) + {:done, acc} end {:ok, stream} diff --git a/test/finch_test.exs b/test/finch_test.exs index e84950f..012bc91 100644 --- a/test/finch_test.exs +++ b/test/finch_test.exs @@ -697,6 +697,78 @@ defmodule FinchTest do :continue -> :ok end end + + test "Stream can be halted", %{ + bypass: bypass, + finch_name: finch_name + } do + start_supervised!({Finch, name: finch_name}) + + Bypass.expect_once(bypass, "GET", "/", fn conn -> + Plug.Conn.send_resp(conn, 200, "OK") + end) + + request = Finch.build("GET", endpoint(bypass)) + + assert {:ok, stream} = + Finch.actual_stream(request, finch_name, stop_notify: {self(), :stopped}) + + assert [status: 200] = Enum.take_while(stream, &(not match?({:headers, _}, &1))) + + assert_receive :stopped + end + + test "Raising in stream closes the connection", %{ + bypass: bypass, + finch_name: finch_name + } do + start_supervised!({Finch, name: finch_name}) + + Bypass.expect_once(bypass, "GET", "/", fn conn -> + Plug.Conn.send_resp(conn, 200, "OK") + end) + + request = Finch.build("GET", endpoint(bypass)) + + assert {:ok, stream} = + Finch.actual_stream(request, finch_name, stop_notify: {self(), :stopped}) + + assert_raise RuntimeError, fn -> + Enum.map(stream, fn _ -> raise "oops" end) + end + + assert_receive :stopped + end + + test "Fail safe timeout works", %{ + bypass: bypass, + finch_name: finch_name + } do + start_supervised!({Finch, name: finch_name}) + + Process.flag(:trap_exit, true) + + Bypass.stub(bypass, "GET", "/", fn conn -> + Process.sleep(1_000) + Plug.Conn.send_resp(conn, 200, "OK") + end) + + request = Finch.build("GET", endpoint(bypass)) + + assert {:ok, stream} = + Finch.actual_stream(request, finch_name, + stop_notify: {self(), :stopped}, + fail_safe_timeout: 100 + ) + + Process.sleep(200) + + assert_raise RuntimeError, fn -> + Enum.to_list(stream) + end + + assert_receive :stopped + end end describe "stream/5" do From f0a2b97d6a50913f2567683c432af6b5b9c5a852 Mon Sep 17 00:00:00 2001 From: hissssst Date: Sun, 1 Sep 2024 02:06:06 +0200 Subject: [PATCH 6/6] Reimplementation and failing tests --- lib/finch/http1/conn.ex | 123 ++++++++++++++++++------------------ lib/finch/http1/pool.ex | 134 +++++++++++++++++++++------------------- test/finch_test.exs | 56 +++++++++++++++-- 3 files changed, 185 insertions(+), 128 deletions(-) diff --git a/lib/finch/http1/conn.ex b/lib/finch/http1/conn.ex index b331b91..6f47e1d 100644 --- a/lib/finch/http1/conn.ex +++ b/lib/finch/http1/conn.ex @@ -201,6 +201,9 @@ defmodule Finch.HTTP1.Conn do Telemetry.stop(:recv, start_time, metadata, extra_measurements) {:ok, %{conn | mint: mint}, acc} + {:suspended, _, _} = suspended -> + suspended + {:error, mint, error, resp_metadata} -> metadata = Map.merge(metadata, Map.put(resp_metadata, :error, error)) Telemetry.stop(:recv, start_time, metadata, extra_measurements) @@ -297,75 +300,73 @@ defmodule Finch.HTTP1.Conn do ) do case entry do {:status, ^ref, value} -> - case fun.({:status, value}, acc) do - {:cont, acc} -> - receive_response( - entries, - acc, - fun, - mint, - ref, - timeouts, - fields, - %{resp_metadata | status: value} - ) - - {:halt, acc} -> - {:ok, mint} = Mint.HTTP1.close(mint) - {:ok, mint, acc, resp_metadata} - - other -> - raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}" - end + resp_metadata = %{resp_metadata | status: value} + call_fun(:status, value, acc, entries, fun, mint, ref, timeouts, fields, resp_metadata) {:headers, ^ref, value} -> resp_metadata = update_in(resp_metadata, [fields], &(&1 ++ value)) - - case fun.({fields, value}, acc) do - {:cont, acc} -> - receive_response( - entries, - acc, - fun, - mint, - ref, - timeouts, - fields, - resp_metadata - ) - - {:halt, acc} -> - {:ok, mint} = Mint.HTTP1.close(mint) - {:ok, mint, acc, resp_metadata} - - other -> - raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}" - end + call_fun(fields, value, acc, entries, fun, mint, ref, timeouts, fields, resp_metadata) {:data, ^ref, value} -> - case fun.({:data, value}, acc) do - {:cont, acc} -> - receive_response( - entries, - acc, - fun, - mint, - ref, - timeouts, - :trailers, - resp_metadata - ) - - {:halt, acc} -> - {:ok, mint} = Mint.HTTP1.close(mint) - {:ok, mint, acc, resp_metadata} - - other -> - raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}" - end + call_fun(:data, value, acc, entries, fun, mint, ref, timeouts, :trailers, resp_metadata) {:error, ^ref, error} -> {:error, mint, error, resp_metadata} end end + + defp call_fun(tag, value, acc, entries, fun, mint, ref, timeouts, fields, resp_metadata) do + case fun.({tag, value}, acc) do + {:cont, acc} -> + receive_response( + entries, + acc, + fun, + mint, + ref, + timeouts, + fields, + resp_metadata + ) + + {:halt, acc} -> + {:ok, mint} = Mint.HTTP1.close(mint) + {:ok, mint, acc, resp_metadata} + + {:__finch_suspend__, acc, extra} -> + enum_acc = {entries, fun, mint, ref, timeouts, fields, resp_metadata, extra} + {:suspended, acc, suspension_function(enum_acc)} + + other -> + raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}" + end + end + + def suspension_function(enum_acc) do + fn acc -> + {entries, fun, mint, ref, timeouts, fields, resp_metadata, extra} = enum_acc + case acc do + {:cont, acc} -> + receive_response( + entries, + acc, + fun, + mint, + ref, + timeouts, + fields, + resp_metadata + ) + + {:suspend, acc} -> + {:suspended, acc, &suspension_function/1} + + {:halt, acc} -> + {holder, holder_ref, conn} = extra + conn = %{conn | mint: mint} + send(holder, {holder_ref, :stop, conn}) + {:ok, mint, acc, resp_metadata} + end + end + end end diff --git a/lib/finch/http1/pool.ex b/lib/finch/http1/pool.ex index bb992b4..e1db925 100644 --- a/lib/finch/http1/pool.ex +++ b/lib/finch/http1/pool.ex @@ -38,20 +38,17 @@ defmodule Finch.HTTP1.Pool do ) end - @impl Finch.Pool - def stream(pool, req, name, opts) do - pool_timeout = Keyword.get(opts, :pool_timeout, 5_000) - receive_timeout = Keyword.get(opts, :receive_timeout, 15_000) - request_timeout = Keyword.get(opts, :request_timeout, 30_000) + defp spawn_holder(pool, req, name, opts) do + metadata = %{request: req, pool: pool, name: name} fail_safe_timeout = Keyword.get(opts, :fail_safe_timeout, 15 * 60_000) stop_notify = Keyword.get(opts, :stop_notify, nil) + pool_timeout = Keyword.get(opts, :pool_timeout, 5_000) - metadata = %{request: req, pool: pool, name: name} - - start_time = Telemetry.start(:queue, metadata) owner = self() ref = make_ref() + start_time = Telemetry.start(:queue, metadata) + holder = spawn_link(fn -> try do @@ -78,7 +75,7 @@ defmodule Finch.HTTP1.Pool do :fail_safe_timeout -> Conn.close(conn) - {:fail_safe_timeout, :closed} + {:ok, :closed} end {:error, conn, error} -> @@ -93,12 +90,9 @@ defmodule Finch.HTTP1.Pool do end, pool_timeout ) - else - :fail_safe_timeout -> - raise "Fail safe timeout was hit" - - other -> - other + rescue + x -> + IO.inspect(x) catch :exit, data -> Telemetry.exception(:queue, start_time, :exit, data, __STACKTRACE__, metadata) @@ -107,53 +101,9 @@ defmodule Finch.HTTP1.Pool do end) receive do - {^ref, :ok, conn_idle_time} -> - {conn, idle_time} = conn_idle_time - - stream = - fn - {:cont, acc}, function -> - case Process.alive?(holder) do - true -> - Process.link(holder) - - false -> - raise "Process holding the connection in pool died" <> - " before the stream enumeration started." <> - " Most likely fail_safe_timeout occured" - end - - try do - with {:ok, conn, acc} <- - Conn.request( - conn, - req, - acc, - function, - name, - receive_timeout, - request_timeout, - idle_time - ) do - send(holder, {ref, :stop, conn}) - {:done, acc} - else - {:error, conn, error} -> - send(holder, {ref, :stop, conn}) - raise error - end - catch - class, reason -> - send(holder, {ref, :stop, conn}) - :erlang.raise(class, reason, __STACKTRACE__) - end - - {_, acc}, _function -> - send(holder, {ref, :stop, conn}) - {:done, acc} - end - - {:ok, stream} + {^ref, :ok, {conn, idle_time}} -> + Process.link(holder) + {:ok, holder, ref, conn, idle_time} {^ref, :error, reason} -> {:error, reason} @@ -178,7 +128,7 @@ defmodule Finch.HTTP1.Pool do exit(data) end after - pool_timeout + request_timeout -> + pool_timeout -> # Cleanup late messages receive do {^ref, _, _} -> :ok @@ -188,6 +138,64 @@ defmodule Finch.HTTP1.Pool do raise "Has not received message from pool yet" end + rescue + x -> + IO.inspect({x, __STACKTRACE__}) + end + + @impl Finch.Pool + def stream(pool, req, name, opts) do + receive_timeout = Keyword.get(opts, :receive_timeout, 15_000) + request_timeout = Keyword.get(opts, :request_timeout, 30_000) + + stream = + fn + {:cont, acc}, function -> + case spawn_holder(pool, req, name, opts) do + {:ok, holder, ref, conn, idle_time} -> + function = fn x, y -> + with {:suspend, acc} <- function.(x, y) do + {:__finch_suspend__, acc, {holder, ref, conn}} + end + end + + try do + with {:ok, conn, acc} <- + Conn.request( + conn, + req, + acc, + function, + name, + receive_timeout, + request_timeout, + idle_time + ) do + send(holder, {ref, :stop, conn}) + {:done, acc} + else + {:error, conn, error} -> + send(holder, {ref, :stop, conn}) + raise error + + {:suspended, _, _} = suspended -> + suspended + end + catch + class, reason -> + send(holder, {ref, :stop, conn}) + :erlang.raise(class, reason, __STACKTRACE__) + end + + other -> + other + end + + {:halt, acc}, _function -> + {:halted, acc} + end + + {:ok, stream} end @impl Finch.Pool diff --git a/test/finch_test.exs b/test/finch_test.exs index 012bc91..410594e 100644 --- a/test/finch_test.exs +++ b/test/finch_test.exs @@ -626,6 +626,7 @@ defmodule FinchTest do end describe "actual_stream/3" do + @tag bypass: false test "Not supported for HTTP2", %{finch_name: finch_name} do start_supervised!( {Finch, name: finch_name, pools: %{default: [protocols: [:http2], size: 10, count: 1]}} @@ -718,6 +719,28 @@ defmodule FinchTest do assert_receive :stopped end + test "Stream can be suspended", %{ + bypass: bypass, + finch_name: finch_name + } do + start_supervised!({Finch, name: finch_name}) + + Bypass.expect_once(bypass, "GET", "/", fn conn -> + Plug.Conn.send_resp(conn, 200, "OK") + end) + + request = Finch.build("GET", endpoint(bypass)) + + assert {:ok, stream} = + Finch.actual_stream(request, finch_name, stop_notify: {self(), :stopped}) + + stream = Stream.zip(stream, Stream.cycle([1])) + + assert [{{:status, 200}, 1}] = Enum.take_while(stream, &(not match?({{:headers, _}, _}, &1))) + + assert_receive :stopped + end + test "Raising in stream closes the connection", %{ bypass: bypass, finch_name: finch_name @@ -740,14 +763,12 @@ defmodule FinchTest do assert_receive :stopped end - test "Fail safe timeout works", %{ + test "Fail safe timeout works when occurs before enumeration", %{ bypass: bypass, finch_name: finch_name } do start_supervised!({Finch, name: finch_name}) - Process.flag(:trap_exit, true) - Bypass.stub(bypass, "GET", "/", fn conn -> Process.sleep(1_000) Plug.Conn.send_resp(conn, 200, "OK") @@ -763,7 +784,34 @@ defmodule FinchTest do Process.sleep(200) - assert_raise RuntimeError, fn -> + assert_raise Mint.TransportError, fn -> + Enum.to_list(stream) + end + + assert_receive :stopped + end + + test "Fail safe timeout works when occurs during enumeration", %{ + bypass: bypass, + finch_name: finch_name + } do + start_supervised!({Finch, name: finch_name}) + Process.flag(:trap_exit, true) + + Bypass.stub(bypass, "GET", "/", fn conn -> + Process.sleep(600) + Plug.Conn.send_resp(conn, 200, "OK") + end) + + request = Finch.build("GET", endpoint(bypass)) + + assert {:ok, stream} = + Finch.actual_stream(request, finch_name, + stop_notify: {self(), :stopped}, + fail_safe_timeout: 500 + ) + + assert_raise Mint.TransportError, fn -> Enum.to_list(stream) end