Skip to content

HTTP1 stream #281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions lib/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,39 @@ defmodule Finch do
end
end

@doc """
Like `stream/5`, but returns valid `Stream` structure, safe for passing between processes, but working __only__
on local node. Works only for HTTP1, HTTP2 not supported currently.

## Options

Same as `request/3` plus

* `:fail_safe_timeout` (`t:timeout/0`) (optional) (default is 15 minutes) - timeout in milliseconds.
Since this function returns an Enumerable which is lazily executed, it makes sense to
have a timeout which will close the connection in case it's never read from in erroneous situations.

* `:stop_notify` (`{t:GenServer.name/0 | t:pid/0, t:any/0}`) (optional) - destination and message which
will be notified once connection is returned to pool or closed.
"""
@spec actual_stream(Request.t(), name(), request_opts()) ::
{:ok, Enumerable.t()} | {:error, Exception.t()}
def actual_stream(request, name, opts \\ []) do
{pool, pool_mod} = get_pool(request, name)
pool_mod.stream(pool, request, name, opts)
end

@doc """
Raising version of `actual_stream/3`
"""
@spec actual_stream!(Request.t(), name(), request_opts()) :: Enumerable.t()
def actual_stream!(request, name, opts \\ []) do
case actual_stream(request, name, opts) do
{:ok, stream} -> stream
exception -> raise exception
end
end

@doc """
Builds an HTTP request to be sent with `request/3` or `stream/4`.

Expand Down
156 changes: 156 additions & 0 deletions lib/finch/http1/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,158 @@ defmodule Finch.HTTP1.Pool do
)
end

@impl Finch.Pool
def stream(pool, req, name, opts) do
pool_timeout = Keyword.get(opts, :pool_timeout, 5_000)
receive_timeout = Keyword.get(opts, :receive_timeout, 15_000)
request_timeout = Keyword.get(opts, :request_timeout, 30_000)
fail_safe_timeout = Keyword.get(opts, :fail_safe_timeout, 15 * 60_000)
stop_notify = Keyword.get(opts, :stop_notify, nil)

metadata = %{request: req, pool: pool, name: name}

start_time = Telemetry.start(:queue, metadata)
owner = self()
ref = make_ref()

holder =
spawn_link(fn ->
Copy link
Contributor

@josevalim josevalim Aug 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still checking out a connection and holding to it, before the streaming starts. You need to move this inside fn tagged_acc, function ->.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still checking out a connection and holding to it, before the streaming starts

Yeah, this is a design decision I've made on purpose.

If we perform checkout during stream start, we may end up in a situation, where stream was successfully created, but we can't iterate on it, because pool is busy and there's no free connection. And in this case, developer might be unable to retry the stream creation, because stream is data and it might've been already sent to another process and the request information can already be lost

On the other hand, current solution may lead to situations when connection was checked out, but no request was made since Stream was lost or enumeration has not started. I've implemented the fail-safe timeout for this situation specifically, to return connections to the pool

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean but I think this may only make things worse. You are saying that, if the system is "overloaded" (we have more requests than resources), you want to be able to retry, which is very valid, but you are also holding on to connections for long than you need, which will only make matters worse.

Ironically Finch.stream sidesteps both of these problems, because the connection and streaming happen immediately.

If the concern is retry, we could add the ability to retry inside the stream instead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sidesteps both of these problems, because the connection and streaming happen immediately.

This is both true and not. Infinite recursion bug in stream callback will leave the connection checked-out forever, while actual_stream solves this problem.

If the concern is retry, we could add the ability to retry inside the stream instead.

Yeah, this feels like a better solution, I agree. Check the connection on start and execute a callback or just retry with exponential backoff

Copy link
Contributor

@josevalim josevalim Aug 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Infinite recursion bug in stream callback will leave the connection checked-out forever, while actual_stream solves this problem.

How would you have an infinite bug in the stream callback? Are you saying in case our implementation has a bug? I am not sure those are valid arguments: a bug in actual_stream can cause connections to leak, eventually exhausting the pool and making the whole subsystem unusable. I don't think we should use it as an argument against it either, we just need to make sure to address all cases.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I mean something like

{:ok, stream} = Finch.actual_stream(...)
Enum.each(stream, fn _ -> infinite_loop() end)

This can happen and connection will never be returned in pool. However, server will close the socket if it's unused for a long time, but I am not sure about this, since I am unaware if Mint sends empty ACK's to keep socket open

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Sure, that can happen when consuming both types of streams. However, the fact someone can write this particular code does not justify us holding idle connections until the stream is consumed. Anyway, if we add the retry to the stream, we will be fine either way. So we can close this convo once we add retries and move checkout to the stream.

try do
NimblePool.checkout!(
Copy link
Contributor

@josevalim josevalim Aug 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This either needs to happen when the stream starts or you need to check later on that the process that checked out is the one that is streaming. Otherwise someone will pass the stream around to another process and it won't behave as expected.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise someone will pass the stream around to another process and it won't behave as expected

I've tested it and it worked as expected, I'll push the test in a sec. Take a look at it please, perhaps I've misunderstood it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, it is probably best to do this lazily, if possible. As it is more flexible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will be bouncing out soon, but I meant this:

stream = Finch.actual_stream(fn -> ... end)
spawn(fn -> Enum.to_list(stream) end)

Even if this works today, because the process doing the streaming is not the one linked to, you could run into situations where the connection is never checked backed in, such as this:

stream = Finch.actual_stream(fn -> ... end)
pid = spawn(fn -> Enum.each(stream, fn _ -> Process.sleep(:infinity) end)
Process.sleep(1000)
Process.exit(pid, :shutdown)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could run into situations where the connection is never checked backed in, such as this:

Unfortunately, I don't see any way to track this without explicit links. This limitation can be reflected in the documentation. But it is generally true for any possible resourse-oriented stream, like File.stream!/3, so

  1. No possible implicit solution
  2. True for any resouse-stream

Therefore, I wouldn't take any action except documentation for this one

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so it is done against the process actually consuming the streaming.

Yeah, right, I haven't thought about it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did it

Copy link
Contributor

@josevalim josevalim Aug 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reviewed it and I like the new implementation a lot, it is much simpler. Great job! The only thing remaining is making the checkout late and dealing with suspensions (which is used by zip). You have to be careful because a stream may not emit any item, be then suspended, and halted.

There are two ways you can do this: one is by moving the after callback, aka send(holder, {ref, :stop, conn}), to HTTP.conn. The flow of the code would be something like:

def stream(...) do
  # ...
  fn tagged_acc, fun ->
    conn = NimblePool.checkout!(...)

    HTTP1.request(tagged, fun, fn -> send(holder, {ref, :stop, conn}) end)
  end
end

Moving all error flow to HTTP.conn is simpler, because suspending would then just be something like this:

def request(..., {:suspend, acc}, function, after_fun) do
  {:suspended, acc, &request(..., &1, function, after_fun)}
end

In other words, streams are easier to implement if they are fully tail recursive, and not relying on try/catch and similar. Instead wrap each invocation of fun in try/catch.

Copy link
Author

@hissssst hissssst Aug 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've thought about it, and now we're back to the beginning where I've implemented it with Stream.resourse, but now I also have to implement all of this suspend/halt logic plus change the request implementation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. The work is almost all done. Making it tail recursive is a matter of passing an after block to the existing code and adding one single clause to handle suspend. I still think it will be less code than the original PR. :)

pool,
:checkout,
fn from, {state, conn, idle_time} ->
if fail_safe_timeout != :infinity do
Process.send_after(self(), :fail_safe_timeout, fail_safe_timeout)
end

Telemetry.stop(:queue, start_time, metadata, %{idle_time: idle_time})

return =
case Conn.connect(conn, name) do
{:ok, conn} ->
send(owner, {ref, :ok, {conn, idle_time}})

receive do
{^ref, :stop, conn} ->
with :closed <- transfer_if_open(conn, state, from) do
{:ok, :closed}
end

:fail_safe_timeout ->
Conn.close(conn)
{:fail_safe_timeout, :closed}
end

{:error, conn, error} ->
{{:error, error}, transfer_if_open(conn, state, from)}
end

with {to, message} <- stop_notify do
send(to, message)
end

return
end,
pool_timeout
)
else
:fail_safe_timeout ->
raise "Fail safe timeout was hit"

other ->
other
catch
:exit, data ->
Telemetry.exception(:queue, start_time, :exit, data, __STACKTRACE__, metadata)
send(owner, {ref, :exit, {data, __STACKTRACE__}})
end
end)

receive do
{^ref, :ok, conn_idle_time} ->
{conn, idle_time} = conn_idle_time

stream =
fn
{:cont, acc}, function ->
case Process.alive?(holder) do
true ->
Process.link(holder)

false ->
raise "Process holding the connection in pool died" <>
" before the stream enumeration started." <>
" Most likely fail_safe_timeout occured"
end

try do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current code is not dealing with stream suspensions. Try implementing Stream.zip(stream, stream). It needs to spawn two separate connections (instead of using the same one) and be able to yield out of this loop and come back in.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current code is not dealing with stream suspensions

Yeah, I don't understand what suspension means, that's why I decided to use Stream.resourse in the first place. I've read Enum and Enumerable documentation and haven't found any explanation of what halt and suspend mean and how to treat them

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I've found it, it's in type docs. I'll try to use it, thanks

with {:ok, conn, acc} <-
Conn.request(
conn,
req,
acc,
function,
name,
receive_timeout,
request_timeout,
idle_time
) do
send(holder, {ref, :stop, conn})
{:done, acc}
else
{:error, conn, error} ->
send(holder, {ref, :stop, conn})
raise error
end
catch
class, reason ->
send(holder, {ref, :stop, conn})
:erlang.raise(class, reason, __STACKTRACE__)
end

{_, acc}, _function ->
send(holder, {ref, :stop, conn})
{:done, acc}
end

{:ok, stream}

{^ref, :error, reason} ->
{:error, reason}

{^ref, :exit, data_trace} ->
{data, trace} = data_trace

case data do
{:timeout, {NimblePool, :checkout, _affected_pids}} ->
# Provide helpful error messages for known errors
reraise(
"""
Finch was unable to provide a connection within the timeout due to excess queuing \
for connections. Consider adjusting the pool size, count, timeout or reducing the \
rate of requests if it is possible that the downstream service is unable to keep up \
with the current rate.
""",
trace
)

_ ->
exit(data)
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some code duplication with other functions, those should be extracted out.

Copy link
Author

@hissssst hissssst Aug 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but I think that DRYing should be done after everyone agrees on implementation details

after
pool_timeout + request_timeout ->
# Cleanup late messages
receive do
{^ref, _, _} -> :ok
after
0 -> :ok
end

raise "Has not received message from pool yet"
end
end

@impl Finch.Pool
def request(pool, req, acc, fun, name, opts) do
pool_timeout = Keyword.get(opts, :pool_timeout, 5_000)
Expand Down Expand Up @@ -280,6 +432,10 @@ defmodule Finch.HTTP1.Pool do
def handle_cancelled(:queued, _pool_state), do: :ok

defp transfer_if_open(conn, state, {pid, _} = from) do
transfer_if_open(conn, state, from, pid)
end

defp transfer_if_open(conn, state, from, pid) do
if Conn.open?(conn) do
if state == :fresh do
NimblePool.update(from, conn)
Expand Down
5 changes: 5 additions & 0 deletions lib/finch/http2/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ defmodule Finch.HTTP2.Pool do
}
end

@impl Finch.Pool
def stream(_pool, _request, _name, _opts) do
{:error, %RuntimeError{message: "Streaming is not supported for HTTP2"}}
end

# Call the pool with the request. The pool will multiplex multiple requests
# and stream the result set back to the calling process using `send`
@impl Finch.Pool
Expand Down
3 changes: 3 additions & 0 deletions lib/finch/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ defmodule Finch.Pool do

@type request_ref :: {pool_mod :: module(), cancel_ref :: term()}

@callback stream(pid(), Finch.Request.t(), finch_name :: atom(), list()) ::
{:ok, Stream.t()} | {:error, Exception.t()}

@callback request(
pid(),
Finch.Request.t(),
Expand Down
146 changes: 146 additions & 0 deletions test/finch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,152 @@ defmodule FinchTest do
end
end

describe "actual_stream/3" do
test "Not supported for HTTP2", %{finch_name: finch_name} do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other test names start in lowercase, we should follow the convention.

start_supervised!(
{Finch, name: finch_name, pools: %{default: [protocols: [:http2], size: 10, count: 1]}}
)

request = Finch.build("GET", "http://example.com/")

assert {:error, %RuntimeError{message: "Streaming is not supported for HTTP2"}} =
Finch.actual_stream(request, finch_name)
end

test "Streams response", %{bypass: bypass, finch_name: finch_name} do
start_supervised!({Finch, name: finch_name})

Bypass.expect_once(bypass, "GET", "/", fn conn ->
Plug.Conn.send_resp(conn, 200, "OK")
end)

request = Finch.build("GET", endpoint(bypass))

assert {:ok, stream} = Finch.actual_stream(request, finch_name)

assert [
{:status, 200},
{:headers,
[
{"cache-control", "max-age=0, private, must-revalidate"},
{"content-length", "2"},
{"date", _},
{"server", "Cowboy"}
]},
{:data, "OK"}
] = Enum.to_list(stream)
Copy link
Contributor

@josevalim josevalim Aug 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to test several different scenarios:

  1. What happens if the stream is halted before the response finishes?
  2. What happens if the stream raises?

And we need to make sure the connection and the pool is still "functional" after that. The biggest concern with the implementation is that we check-in a connection with a bad state.

end

test "Streams response even when sent to another process", %{
bypass: bypass,
finch_name: finch_name
} do
start_supervised!({Finch, name: finch_name})

Bypass.expect_once(bypass, "GET", "/", fn conn ->
Plug.Conn.send_resp(conn, 200, "OK")
end)

request = Finch.build("GET", endpoint(bypass))

assert {:ok, stream} = Finch.actual_stream(request, finch_name)

owner = self()

spawn_link(fn ->
assert [
{:status, 200},
{:headers,
[
{"cache-control", "max-age=0, private, must-revalidate"},
{"content-length", "2"},
{"date", _},
{"server", "Cowboy"}
]},
{:data, "OK"}
] = Enum.to_list(stream)

send(owner, :continue)
end)

receive do
:continue -> :ok
end
end

test "Stream can be halted", %{
bypass: bypass,
finch_name: finch_name
} do
start_supervised!({Finch, name: finch_name})

Bypass.expect_once(bypass, "GET", "/", fn conn ->
Plug.Conn.send_resp(conn, 200, "OK")
end)

request = Finch.build("GET", endpoint(bypass))

assert {:ok, stream} =
Finch.actual_stream(request, finch_name, stop_notify: {self(), :stopped})

assert [status: 200] = Enum.take_while(stream, &(not match?({:headers, _}, &1)))

assert_receive :stopped
end

test "Raising in stream closes the connection", %{
bypass: bypass,
finch_name: finch_name
} do
start_supervised!({Finch, name: finch_name})

Bypass.expect_once(bypass, "GET", "/", fn conn ->
Plug.Conn.send_resp(conn, 200, "OK")
end)

request = Finch.build("GET", endpoint(bypass))

assert {:ok, stream} =
Finch.actual_stream(request, finch_name, stop_notify: {self(), :stopped})

assert_raise RuntimeError, fn ->
Enum.map(stream, fn _ -> raise "oops" end)
end

assert_receive :stopped
end

test "Fail safe timeout works", %{
bypass: bypass,
finch_name: finch_name
} do
start_supervised!({Finch, name: finch_name})

Process.flag(:trap_exit, true)

Bypass.stub(bypass, "GET", "/", fn conn ->
Process.sleep(1_000)
Plug.Conn.send_resp(conn, 200, "OK")
end)

request = Finch.build("GET", endpoint(bypass))

assert {:ok, stream} =
Finch.actual_stream(request, finch_name,
stop_notify: {self(), :stopped},
fail_safe_timeout: 100
)

Process.sleep(200)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should avoid sleeping. Can we?


assert_raise RuntimeError, fn ->
Enum.to_list(stream)
end

assert_receive :stopped
end
end

describe "stream/5" do
test "successful get request, with query string", %{bypass: bypass, finch_name: finch_name} do
start_supervised!({Finch, name: finch_name})
Expand Down
Loading