-
Notifications
You must be signed in to change notification settings - Fork 135
HTTP1 stream #281
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
HTTP1 stream #281
Changes from all commits
4ec66f8
045ba72
4aaf557
0dd28a1
6b18ebf
f0a2b97
ef2b350
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,166 @@ defmodule Finch.HTTP1.Pool do | |
| ) | ||
| end | ||
|
|
||
| defp spawn_holder(pool, req, name, opts) do | ||
| metadata = %{request: req, pool: pool, name: name} | ||
| fail_safe_timeout = Keyword.get(opts, :fail_safe_timeout, 15 * 60_000) | ||
| stop_notify = Keyword.get(opts, :stop_notify, nil) | ||
| pool_timeout = Keyword.get(opts, :pool_timeout, 5_000) | ||
|
|
||
| owner = self() | ||
| ref = make_ref() | ||
|
|
||
| start_time = Telemetry.start(:queue, metadata) | ||
|
|
||
| holder = | ||
| spawn_link(fn -> | ||
| try do | ||
| NimblePool.checkout!( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This either needs to happen when the stream starts or you need to check later on that the process that checked out is the one that is streaming. Otherwise someone will pass the stream around to another process and it won't behave as expected.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I've tested it and it worked as expected, I'll push the test in a sec. Take a look at it please, perhaps I've misunderstood it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be clear, it is probably best to do this lazily, if possible. As it is more flexible.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will be bouncing out soon, but I meant this: Even if this works today, because the process doing the streaming is not the one linked to, you could run into situations where the connection is never checked backed in, such as this:
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Unfortunately, I don't see any way to track this without explicit links. This limitation can be reflected in the documentation. But it is generally true for any possible resourse-oriented stream, like
Therefore, I wouldn't take any action except documentation for this one
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yeah, right, I haven't thought about it
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did it
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have reviewed it and I like the new implementation a lot, it is much simpler. Great job! The only thing remaining is making the checkout late and dealing with suspensions (which is used by There are two ways you can do this: one is by moving the Moving all error flow to HTTP.conn is simpler, because suspending would then just be something like this: In other words, streams are easier to implement if they are fully tail recursive, and not relying on try/catch and similar. Instead wrap each invocation of
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've thought about it, and now we're back to the beginning where I've implemented it with
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really. The work is almost all done. Making it tail recursive is a matter of passing an after block to the existing code and adding one single clause to handle suspend. I still think it will be less code than the original PR. :) |
||
| pool, | ||
| :checkout, | ||
| fn from, {state, conn, idle_time} -> | ||
| if fail_safe_timeout != :infinity do | ||
| Process.send_after(self(), :fail_safe_timeout, fail_safe_timeout) | ||
| end | ||
|
|
||
| Telemetry.stop(:queue, start_time, metadata, %{idle_time: idle_time}) | ||
|
|
||
| return = | ||
| case Conn.connect(conn, name) do | ||
| {:ok, conn} -> | ||
| send(owner, {ref, :ok, {conn, idle_time}}) | ||
|
|
||
| receive do | ||
| {^ref, :stop, conn} -> | ||
| with :closed <- transfer_if_open(conn, state, from) do | ||
| {:ok, :closed} | ||
| end | ||
|
|
||
| :fail_safe_timeout -> | ||
| Conn.close(conn) | ||
| {:ok, :closed} | ||
| end | ||
|
|
||
| {:error, conn, error} -> | ||
| {{:error, error}, transfer_if_open(conn, state, from)} | ||
| end | ||
|
|
||
| with {to, message} <- stop_notify do | ||
| send(to, message) | ||
| end | ||
|
|
||
| return | ||
| end, | ||
| pool_timeout | ||
| ) | ||
| rescue | ||
| x -> | ||
| IO.inspect(x) | ||
| catch | ||
| :exit, data -> | ||
| Telemetry.exception(:queue, start_time, :exit, data, __STACKTRACE__, metadata) | ||
| send(owner, {ref, :exit, {data, __STACKTRACE__}}) | ||
| end | ||
| end) | ||
|
|
||
| receive do | ||
| {^ref, :ok, {conn, idle_time}} -> | ||
| Process.link(holder) | ||
| {:ok, holder, ref, conn, idle_time} | ||
|
|
||
| {^ref, :error, reason} -> | ||
| {:error, reason} | ||
|
|
||
| {^ref, :exit, data_trace} -> | ||
| {data, trace} = data_trace | ||
|
|
||
| case data do | ||
| {:timeout, {NimblePool, :checkout, _affected_pids}} -> | ||
| # Provide helpful error messages for known errors | ||
| reraise( | ||
| """ | ||
| Finch was unable to provide a connection within the timeout due to excess queuing \ | ||
| for connections. Consider adjusting the pool size, count, timeout or reducing the \ | ||
| rate of requests if it is possible that the downstream service is unable to keep up \ | ||
| with the current rate. | ||
| """, | ||
| trace | ||
| ) | ||
|
|
||
| _ -> | ||
| exit(data) | ||
| end | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is some code duplication with other functions, those should be extracted out.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, but I think that DRYing should be done after everyone agrees on implementation details |
||
| after | ||
| pool_timeout -> | ||
| # Cleanup late messages | ||
| receive do | ||
| {^ref, _, _} -> :ok | ||
| after | ||
| 0 -> :ok | ||
| end | ||
|
|
||
| raise "Has not received message from pool yet" | ||
| end | ||
| rescue | ||
| x -> | ||
| IO.inspect({x, __STACKTRACE__}) | ||
| end | ||
|
|
||
| @impl Finch.Pool | ||
| def stream(pool, req, name, opts) do | ||
| receive_timeout = Keyword.get(opts, :receive_timeout, 15_000) | ||
| request_timeout = Keyword.get(opts, :request_timeout, 30_000) | ||
|
|
||
| stream = | ||
| fn | ||
| {:cont, acc}, function -> | ||
| case spawn_holder(pool, req, name, opts) do | ||
| {:ok, holder, ref, conn, idle_time} -> | ||
| function = fn x, y -> | ||
| with {:suspend, acc} <- function.(x, y) do | ||
| {:__finch_suspend__, acc, {holder, ref, conn}} | ||
| end | ||
| end | ||
|
|
||
| try do | ||
| with {:ok, conn, acc} <- | ||
| Conn.request( | ||
| conn, | ||
| req, | ||
| acc, | ||
| function, | ||
| name, | ||
| receive_timeout, | ||
| request_timeout, | ||
| idle_time | ||
| ) do | ||
| send(holder, {ref, :stop, conn}) | ||
| {:done, acc} | ||
| else | ||
| {:error, conn, error} -> | ||
| send(holder, {ref, :stop, conn}) | ||
| raise error | ||
|
|
||
| {:suspended, _, _} = suspended -> | ||
| suspended | ||
| end | ||
| catch | ||
| class, reason -> | ||
| send(holder, {ref, :stop, conn}) | ||
| :erlang.raise(class, reason, __STACKTRACE__) | ||
| end | ||
|
|
||
| other -> | ||
| other | ||
| end | ||
|
|
||
| {:halt, acc}, _function -> | ||
| {:halted, acc} | ||
| end | ||
|
|
||
| {:ok, stream} | ||
| end | ||
|
|
||
| @impl Finch.Pool | ||
| def request(pool, req, acc, fun, name, opts) do | ||
| pool_timeout = Keyword.get(opts, :pool_timeout, 5_000) | ||
|
|
@@ -300,6 +460,10 @@ defmodule Finch.HTTP1.Pool do | |
| def handle_cancelled(:queued, _pool_state), do: :ok | ||
|
|
||
| defp transfer_if_open(conn, state, {pid, _} = from) do | ||
| transfer_if_open(conn, state, from, pid) | ||
| end | ||
|
|
||
| defp transfer_if_open(conn, state, from, pid) do | ||
| if Conn.open?(conn) do | ||
| if state == :fresh do | ||
| NimblePool.update(from, conn) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still checking out a connection and holding to it, before the streaming starts. You need to move this inside
fn tagged_acc, function ->.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is a design decision I've made on purpose.
If we perform checkout during stream start, we may end up in a situation, where stream was successfully created, but we can't iterate on it, because pool is busy and there's no free connection. And in this case, developer might be unable to retry the stream creation, because stream is data and it might've been already sent to another process and the request information can already be lost
On the other hand, current solution may lead to situations when connection was checked out, but no request was made since Stream was lost or enumeration has not started. I've implemented the fail-safe timeout for this situation specifically, to return connections to the pool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you mean but I think this may only make things worse. You are saying that, if the system is "overloaded" (we have more requests than resources), you want to be able to retry, which is very valid, but you are also holding on to connections for long than you need, which will only make matters worse.
Ironically
Finch.streamsidesteps both of these problems, because the connection and streaming happen immediately.If the concern is retry, we could add the ability to retry inside the stream instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is both true and not. Infinite recursion bug in
streamcallback will leave the connection checked-out forever, whileactual_streamsolves this problem.Yeah, this feels like a better solution, I agree. Check the connection on start and execute a callback or just retry with exponential backoff
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would you have an infinite bug in the stream callback? Are you saying in case our implementation has a bug? I am not sure those are valid arguments: a bug in
actual_streamcan cause connections to leak, eventually exhausting the pool and making the whole subsystem unusable. I don't think we should use it as an argument against it either, we just need to make sure to address all cases.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I mean something like
This can happen and connection will never be returned in pool. However, server will close the socket if it's unused for a long time, but I am not sure about this, since I am unaware if Mint sends empty ACK's to keep socket open
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Sure, that can happen when consuming both types of streams. However, the fact someone can write this particular code does not justify us holding idle connections until the stream is consumed. Anyway, if we add the retry to the stream, we will be fine either way. So we can close this convo once we add retries and move checkout to the stream.