Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backpressure when client code does not schedule reads #40

Closed
Lupus opened this issue Feb 15, 2020 · 1 comment · Fixed by #59
Closed

Backpressure when client code does not schedule reads #40

Lupus opened this issue Feb 15, 2020 · 1 comment · Fixed by #59

Comments

@Lupus
Copy link
Contributor

Lupus commented Feb 15, 2020

One of the reasons why we run forked Lwt adapter for original httpaf is an ugly workaround for read backpressure - we add an Lwt_mutex per connection, which gets locked from above the http server when next buffer is pushed to Lwt_stream. If adding to stream blocks, mutex remains locked. Also this mutex gets locked from below http server - in Lwt adapter when trying to push data to state machine. This approach works, but slows things down. Since we're working on storage systems, we expect a lot of data to be ingested into our HTTP endpoints, and we need to throttle it in case we can't consume it that fast, instead of running out of memory :)

Original httpaf issue for this matter has repro snippet: inhabitedtype/httpaf#140

@Lupus
Copy link
Contributor Author

Lupus commented Apr 1, 2020

Easy repro case with the following modification to lwt_echo_post example:

diff --git a/examples/lwt/lwt_echo_post.ml b/examples/lwt/lwt_echo_post.ml
index 1830710..4f554ca 100644
--- a/examples/lwt/lwt_echo_post.ml
+++ b/examples/lwt/lwt_echo_post.ml
@@ -4,7 +4,30 @@ module Arg = Caml.Arg
 
 open Httpaf_lwt_unix
 
-let request_handler (_ : Unix.sockaddr) = Httpaf_examples.Server.echo_post
+let request_handler (_ : Unix.sockaddr) reqd =
+    let open Httpaf in
+    match Reqd.request reqd  with
+    | { Request.meth = `POST; headers; _ } ->
+      let response =
+        let content_type =
+          match Headers.get headers "content-type" with
+          | None   -> "application/octet-stream"
+          | Some x -> x
+        in
+        Response.create ~headers:(Headers.of_list ["content-type", content_type; "connection", "close"]) `OK
+      in
+      let _request_body  = Reqd.request_body reqd in
+      let response_body = Reqd.respond_with_streaming reqd response in
+      let on_read _buffer ~off:_  ~len:_ = () in
+      let on_eof () =
+        Body.close_writer response_body
+      in
+      Body.schedule_read (Reqd.request_body reqd) ~on_eof ~on_read
+    | _ ->
+      let headers = Headers.of_list [ "connection", "close" ] in
+      Reqd.respond_with_string reqd (Response.create ~headers `Method_not_allowed) ""
+  ;;
+
 let error_handler (_ : Unix.sockaddr) = Httpaf_examples.Server.error_handler
 
 let main port =

And the following curl command:

curl -v -XPOST -H 'Expect:' -H 'Transfer-encoding: chunked' http://127.0.0.1:8080/echo -T /dev/zero -o /dev/null

Observed behavior is unbounded memory growth.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant