diff --git a/lib/finch.ex b/lib/finch.ex index cc7115f..046354b 100644 --- a/lib/finch.ex +++ b/lib/finch.ex @@ -307,6 +307,39 @@ defmodule Finch do end end + @doc """ + Like `stream/5`, but returns valid `Stream` structure, safe for passing between processes, but working __only__ + on local node. Works only for HTTP1, HTTP2 not supported currently. + + ## Options + + Same as `request/3` plus + + * `:fail_safe_timeout` (`t:timeout/0`) (optional) (default is 15 minutes) - timeout in milliseconds. + Since this function returns an Enumerable which is lazily executed, it makes sense to + have a timeout which will close the connection in case it's never read from in erroneous situations. + + * `:stop_notify` (`{t:GenServer.name/0 | t:pid/0, t:any/0}`) (optional) - destination and message which + will be notified once connection is returned to pool or closed. + """ + @spec actual_stream(Request.t(), name(), request_opts()) :: + {:ok, Enumerable.t()} | {:error, Exception.t()} + def actual_stream(request, name, opts \\ []) do + {pool, pool_mod} = get_pool(request, name) + pool_mod.stream(pool, request, name, opts) + end + + @doc """ + Raising version of `actual_stream/3` + """ + @spec actual_stream!(Request.t(), name(), request_opts()) :: Enumerable.t() + def actual_stream!(request, name, opts \\ []) do + case actual_stream(request, name, opts) do + {:ok, stream} -> stream + exception -> raise exception + end + end + @doc """ Builds an HTTP request to be sent with `request/3` or `stream/4`. diff --git a/lib/finch/http1/conn.ex b/lib/finch/http1/conn.ex index b331b91..6f47e1d 100644 --- a/lib/finch/http1/conn.ex +++ b/lib/finch/http1/conn.ex @@ -201,6 +201,9 @@ defmodule Finch.HTTP1.Conn do Telemetry.stop(:recv, start_time, metadata, extra_measurements) {:ok, %{conn | mint: mint}, acc} + {:suspended, _, _} = suspended -> + suspended + {:error, mint, error, resp_metadata} -> metadata = Map.merge(metadata, Map.put(resp_metadata, :error, error)) Telemetry.stop(:recv, start_time, metadata, extra_measurements) @@ -297,75 +300,73 @@ defmodule Finch.HTTP1.Conn do ) do case entry do {:status, ^ref, value} -> - case fun.({:status, value}, acc) do - {:cont, acc} -> - receive_response( - entries, - acc, - fun, - mint, - ref, - timeouts, - fields, - %{resp_metadata | status: value} - ) - - {:halt, acc} -> - {:ok, mint} = Mint.HTTP1.close(mint) - {:ok, mint, acc, resp_metadata} - - other -> - raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}" - end + resp_metadata = %{resp_metadata | status: value} + call_fun(:status, value, acc, entries, fun, mint, ref, timeouts, fields, resp_metadata) {:headers, ^ref, value} -> resp_metadata = update_in(resp_metadata, [fields], &(&1 ++ value)) - - case fun.({fields, value}, acc) do - {:cont, acc} -> - receive_response( - entries, - acc, - fun, - mint, - ref, - timeouts, - fields, - resp_metadata - ) - - {:halt, acc} -> - {:ok, mint} = Mint.HTTP1.close(mint) - {:ok, mint, acc, resp_metadata} - - other -> - raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}" - end + call_fun(fields, value, acc, entries, fun, mint, ref, timeouts, fields, resp_metadata) {:data, ^ref, value} -> - case fun.({:data, value}, acc) do - {:cont, acc} -> - receive_response( - entries, - acc, - fun, - mint, - ref, - timeouts, - :trailers, - resp_metadata - ) - - {:halt, acc} -> - {:ok, mint} = Mint.HTTP1.close(mint) - {:ok, mint, acc, resp_metadata} - - other -> - raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}" - end + call_fun(:data, value, acc, entries, fun, mint, ref, timeouts, :trailers, resp_metadata) {:error, ^ref, error} -> {:error, mint, error, resp_metadata} end end + + defp call_fun(tag, value, acc, entries, fun, mint, ref, timeouts, fields, resp_metadata) do + case fun.({tag, value}, acc) do + {:cont, acc} -> + receive_response( + entries, + acc, + fun, + mint, + ref, + timeouts, + fields, + resp_metadata + ) + + {:halt, acc} -> + {:ok, mint} = Mint.HTTP1.close(mint) + {:ok, mint, acc, resp_metadata} + + {:__finch_suspend__, acc, extra} -> + enum_acc = {entries, fun, mint, ref, timeouts, fields, resp_metadata, extra} + {:suspended, acc, suspension_function(enum_acc)} + + other -> + raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}" + end + end + + def suspension_function(enum_acc) do + fn acc -> + {entries, fun, mint, ref, timeouts, fields, resp_metadata, extra} = enum_acc + case acc do + {:cont, acc} -> + receive_response( + entries, + acc, + fun, + mint, + ref, + timeouts, + fields, + resp_metadata + ) + + {:suspend, acc} -> + {:suspended, acc, &suspension_function/1} + + {:halt, acc} -> + {holder, holder_ref, conn} = extra + conn = %{conn | mint: mint} + send(holder, {holder_ref, :stop, conn}) + {:ok, mint, acc, resp_metadata} + end + end + end end diff --git a/lib/finch/http1/pool.ex b/lib/finch/http1/pool.ex index 771a9e9..e1db925 100644 --- a/lib/finch/http1/pool.ex +++ b/lib/finch/http1/pool.ex @@ -38,6 +38,166 @@ defmodule Finch.HTTP1.Pool do ) end + defp spawn_holder(pool, req, name, opts) do + metadata = %{request: req, pool: pool, name: name} + fail_safe_timeout = Keyword.get(opts, :fail_safe_timeout, 15 * 60_000) + stop_notify = Keyword.get(opts, :stop_notify, nil) + pool_timeout = Keyword.get(opts, :pool_timeout, 5_000) + + owner = self() + ref = make_ref() + + start_time = Telemetry.start(:queue, metadata) + + holder = + spawn_link(fn -> + try do + NimblePool.checkout!( + pool, + :checkout, + fn from, {state, conn, idle_time} -> + if fail_safe_timeout != :infinity do + Process.send_after(self(), :fail_safe_timeout, fail_safe_timeout) + end + + Telemetry.stop(:queue, start_time, metadata, %{idle_time: idle_time}) + + return = + case Conn.connect(conn, name) do + {:ok, conn} -> + send(owner, {ref, :ok, {conn, idle_time}}) + + receive do + {^ref, :stop, conn} -> + with :closed <- transfer_if_open(conn, state, from) do + {:ok, :closed} + end + + :fail_safe_timeout -> + Conn.close(conn) + {:ok, :closed} + end + + {:error, conn, error} -> + {{:error, error}, transfer_if_open(conn, state, from)} + end + + with {to, message} <- stop_notify do + send(to, message) + end + + return + end, + pool_timeout + ) + rescue + x -> + IO.inspect(x) + catch + :exit, data -> + Telemetry.exception(:queue, start_time, :exit, data, __STACKTRACE__, metadata) + send(owner, {ref, :exit, {data, __STACKTRACE__}}) + end + end) + + receive do + {^ref, :ok, {conn, idle_time}} -> + Process.link(holder) + {:ok, holder, ref, conn, idle_time} + + {^ref, :error, reason} -> + {:error, reason} + + {^ref, :exit, data_trace} -> + {data, trace} = data_trace + + case data do + {:timeout, {NimblePool, :checkout, _affected_pids}} -> + # Provide helpful error messages for known errors + reraise( + """ + Finch was unable to provide a connection within the timeout due to excess queuing \ + for connections. Consider adjusting the pool size, count, timeout or reducing the \ + rate of requests if it is possible that the downstream service is unable to keep up \ + with the current rate. + """, + trace + ) + + _ -> + exit(data) + end + after + pool_timeout -> + # Cleanup late messages + receive do + {^ref, _, _} -> :ok + after + 0 -> :ok + end + + raise "Has not received message from pool yet" + end + rescue + x -> + IO.inspect({x, __STACKTRACE__}) + end + + @impl Finch.Pool + def stream(pool, req, name, opts) do + receive_timeout = Keyword.get(opts, :receive_timeout, 15_000) + request_timeout = Keyword.get(opts, :request_timeout, 30_000) + + stream = + fn + {:cont, acc}, function -> + case spawn_holder(pool, req, name, opts) do + {:ok, holder, ref, conn, idle_time} -> + function = fn x, y -> + with {:suspend, acc} <- function.(x, y) do + {:__finch_suspend__, acc, {holder, ref, conn}} + end + end + + try do + with {:ok, conn, acc} <- + Conn.request( + conn, + req, + acc, + function, + name, + receive_timeout, + request_timeout, + idle_time + ) do + send(holder, {ref, :stop, conn}) + {:done, acc} + else + {:error, conn, error} -> + send(holder, {ref, :stop, conn}) + raise error + + {:suspended, _, _} = suspended -> + suspended + end + catch + class, reason -> + send(holder, {ref, :stop, conn}) + :erlang.raise(class, reason, __STACKTRACE__) + end + + other -> + other + end + + {:halt, acc}, _function -> + {:halted, acc} + end + + {:ok, stream} + end + @impl Finch.Pool def request(pool, req, acc, fun, name, opts) do pool_timeout = Keyword.get(opts, :pool_timeout, 5_000) @@ -280,6 +440,10 @@ defmodule Finch.HTTP1.Pool do def handle_cancelled(:queued, _pool_state), do: :ok defp transfer_if_open(conn, state, {pid, _} = from) do + transfer_if_open(conn, state, from, pid) + end + + defp transfer_if_open(conn, state, from, pid) do if Conn.open?(conn) do if state == :fresh do NimblePool.update(from, conn) diff --git a/lib/finch/http2/pool.ex b/lib/finch/http2/pool.ex index 604a01d..713831b 100644 --- a/lib/finch/http2/pool.ex +++ b/lib/finch/http2/pool.ex @@ -27,6 +27,11 @@ defmodule Finch.HTTP2.Pool do } end + @impl Finch.Pool + def stream(_pool, _request, _name, _opts) do + {:error, %RuntimeError{message: "Streaming is not supported for HTTP2"}} + end + # Call the pool with the request. The pool will multiplex multiple requests # and stream the result set back to the calling process using `send` @impl Finch.Pool diff --git a/lib/finch/pool.ex b/lib/finch/pool.ex index 8fbbb88..c8d953f 100644 --- a/lib/finch/pool.ex +++ b/lib/finch/pool.ex @@ -4,6 +4,9 @@ defmodule Finch.Pool do @type request_ref :: {pool_mod :: module(), cancel_ref :: term()} + @callback stream(pid(), Finch.Request.t(), finch_name :: atom(), list()) :: + {:ok, Stream.t()} | {:error, Exception.t()} + @callback request( pid(), Finch.Request.t(), diff --git a/test/finch_test.exs b/test/finch_test.exs index 12c80ef..410594e 100644 --- a/test/finch_test.exs +++ b/test/finch_test.exs @@ -625,6 +625,200 @@ defmodule FinchTest do end end + describe "actual_stream/3" do + @tag bypass: false + test "Not supported for HTTP2", %{finch_name: finch_name} do + start_supervised!( + {Finch, name: finch_name, pools: %{default: [protocols: [:http2], size: 10, count: 1]}} + ) + + request = Finch.build("GET", "http://example.com/") + + assert {:error, %RuntimeError{message: "Streaming is not supported for HTTP2"}} = + Finch.actual_stream(request, finch_name) + end + + test "Streams response", %{bypass: bypass, finch_name: finch_name} do + start_supervised!({Finch, name: finch_name}) + + Bypass.expect_once(bypass, "GET", "/", fn conn -> + Plug.Conn.send_resp(conn, 200, "OK") + end) + + request = Finch.build("GET", endpoint(bypass)) + + assert {:ok, stream} = Finch.actual_stream(request, finch_name) + + assert [ + {:status, 200}, + {:headers, + [ + {"cache-control", "max-age=0, private, must-revalidate"}, + {"content-length", "2"}, + {"date", _}, + {"server", "Cowboy"} + ]}, + {:data, "OK"} + ] = Enum.to_list(stream) + end + + test "Streams response even when sent to another process", %{ + bypass: bypass, + finch_name: finch_name + } do + start_supervised!({Finch, name: finch_name}) + + Bypass.expect_once(bypass, "GET", "/", fn conn -> + Plug.Conn.send_resp(conn, 200, "OK") + end) + + request = Finch.build("GET", endpoint(bypass)) + + assert {:ok, stream} = Finch.actual_stream(request, finch_name) + + owner = self() + + spawn_link(fn -> + assert [ + {:status, 200}, + {:headers, + [ + {"cache-control", "max-age=0, private, must-revalidate"}, + {"content-length", "2"}, + {"date", _}, + {"server", "Cowboy"} + ]}, + {:data, "OK"} + ] = Enum.to_list(stream) + + send(owner, :continue) + end) + + receive do + :continue -> :ok + end + end + + test "Stream can be halted", %{ + bypass: bypass, + finch_name: finch_name + } do + start_supervised!({Finch, name: finch_name}) + + Bypass.expect_once(bypass, "GET", "/", fn conn -> + Plug.Conn.send_resp(conn, 200, "OK") + end) + + request = Finch.build("GET", endpoint(bypass)) + + assert {:ok, stream} = + Finch.actual_stream(request, finch_name, stop_notify: {self(), :stopped}) + + assert [status: 200] = Enum.take_while(stream, &(not match?({:headers, _}, &1))) + + assert_receive :stopped + end + + test "Stream can be suspended", %{ + bypass: bypass, + finch_name: finch_name + } do + start_supervised!({Finch, name: finch_name}) + + Bypass.expect_once(bypass, "GET", "/", fn conn -> + Plug.Conn.send_resp(conn, 200, "OK") + end) + + request = Finch.build("GET", endpoint(bypass)) + + assert {:ok, stream} = + Finch.actual_stream(request, finch_name, stop_notify: {self(), :stopped}) + + stream = Stream.zip(stream, Stream.cycle([1])) + + assert [{{:status, 200}, 1}] = Enum.take_while(stream, &(not match?({{:headers, _}, _}, &1))) + + assert_receive :stopped + end + + test "Raising in stream closes the connection", %{ + bypass: bypass, + finch_name: finch_name + } do + start_supervised!({Finch, name: finch_name}) + + Bypass.expect_once(bypass, "GET", "/", fn conn -> + Plug.Conn.send_resp(conn, 200, "OK") + end) + + request = Finch.build("GET", endpoint(bypass)) + + assert {:ok, stream} = + Finch.actual_stream(request, finch_name, stop_notify: {self(), :stopped}) + + assert_raise RuntimeError, fn -> + Enum.map(stream, fn _ -> raise "oops" end) + end + + assert_receive :stopped + end + + test "Fail safe timeout works when occurs before enumeration", %{ + bypass: bypass, + finch_name: finch_name + } do + start_supervised!({Finch, name: finch_name}) + + Bypass.stub(bypass, "GET", "/", fn conn -> + Process.sleep(1_000) + Plug.Conn.send_resp(conn, 200, "OK") + end) + + request = Finch.build("GET", endpoint(bypass)) + + assert {:ok, stream} = + Finch.actual_stream(request, finch_name, + stop_notify: {self(), :stopped}, + fail_safe_timeout: 100 + ) + + Process.sleep(200) + + assert_raise Mint.TransportError, fn -> + Enum.to_list(stream) + end + + assert_receive :stopped + end + + test "Fail safe timeout works when occurs during enumeration", %{ + bypass: bypass, + finch_name: finch_name + } do + start_supervised!({Finch, name: finch_name}) + Process.flag(:trap_exit, true) + + Bypass.stub(bypass, "GET", "/", fn conn -> + Process.sleep(600) + Plug.Conn.send_resp(conn, 200, "OK") + end) + + request = Finch.build("GET", endpoint(bypass)) + + assert {:ok, stream} = + Finch.actual_stream(request, finch_name, + stop_notify: {self(), :stopped}, + fail_safe_timeout: 500 + ) + + assert_raise Mint.TransportError, fn -> + Enum.to_list(stream) + end + + assert_receive :stopped + end + end + describe "stream/5" do test "successful get request, with query string", %{bypass: bypass, finch_name: finch_name} do start_supervised!({Finch, name: finch_name})