From ee423fb5233e70ab99d3751a552052d10057179b Mon Sep 17 00:00:00 2001 From: Antonio Nuno Monteiro Date: Sun, 19 Apr 2020 13:32:33 -0700 Subject: [PATCH 1/5] Handle backpressure when reading incoming data --- lib/body.ml | 17 +++++++++++- lib/reqd.ml | 8 +++++- lib/server_connection.ml | 27 +++++++++++++++++-- lib_test/test_httpaf.ml | 57 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 105 insertions(+), 4 deletions(-) diff --git a/lib/body.ml b/lib/body.ml index 7756ba0..13ba9f2 100644 --- a/lib/body.ml +++ b/lib/body.ml @@ -37,6 +37,7 @@ type _ t = ; mutable write_final_if_chunked : bool ; mutable on_eof : unit -> unit ; mutable on_read : Bigstringaf.t -> off:int -> len:int -> unit + ; mutable when_ready_to_read : Optional_thunk.t ; mutable when_ready_to_write : Optional_thunk.t ; buffered_bytes : int ref } @@ -50,6 +51,7 @@ let of_faraday faraday = ; write_final_if_chunked = true ; on_eof = default_on_eof ; on_read = default_on_read + ; when_ready_to_read = Optional_thunk.none ; when_ready_to_write = Optional_thunk.none ; buffered_bytes = ref 0 } @@ -80,6 +82,11 @@ let write_bigstring t ?off ?len b = let schedule_bigstring t ?off ?len (b:Bigstringaf.t) = Faraday.schedule_bigstring ?off ?len t.faraday b +let ready_to_read t = + let callback = t.when_ready_to_read in + t.when_ready_to_read <- Optional_thunk.none; + Optional_thunk.call_if_some callback + let ready_to_write t = let callback = t.when_ready_to_write in t.when_ready_to_write <- Optional_thunk.none; @@ -128,7 +135,8 @@ let schedule_read t ~on_eof ~on_read = else begin t.read_scheduled <- true; t.on_eof <- on_eof; - t.on_read <- on_read + t.on_read <- on_read; + ready_to_read t; end let has_pending_output t = @@ -150,6 +158,13 @@ let close_reader t = execute_read t ;; +let when_ready_to_read t callback = + if is_closed t + then callback () + else if Optional_thunk.is_some t.when_ready_to_read + then failwith "Body.when_ready_to_read: only one callback can be registered at a time" + else t.when_ready_to_read <- Optional_thunk.some callback + let when_ready_to_write t callback = if is_closed t then callback () diff --git a/lib/reqd.ml b/lib/reqd.ml index dfb12c4..0b2b8a2 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -37,6 +37,7 @@ type error = module Input_state = struct type t = | Provide + | Wait | Complete end @@ -234,6 +235,9 @@ let error_code t = | #error as error -> Some error | `Ok -> None +let on_more_input_available t f = + Body.when_ready_to_read t.request_body f + let on_more_output_available t f = Response_state.on_more_output_available t.response_state f @@ -243,7 +247,9 @@ let persistent_connection t = let input_state t : Input_state.t = if Body.is_closed t.request_body then Complete - else Provide + else if t.request_body.read_scheduled + then Provide + else Wait let output_state t = Response_state.output_state t.response_state diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 1c38df6..9ce97e5 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -85,7 +85,19 @@ let yield_reader t k = then failwith "yield_reader on closed conn" else if Optional_thunk.is_some t.wakeup_reader then failwith "yield_reader: only one callback can be registered at a time" - else t.wakeup_reader <- Optional_thunk.some k + else if is_active t then + let reqd = current_reqd_exn t in + begin match Reqd.input_state reqd with + | Wait -> + (* `Wait` means that the request body isn't closed yet (there might be + * more incoming bytes) but the request handler hasn't scheduled a read + * either. *) + Reqd.on_more_input_available reqd k + | Provide -> t.wakeup_reader <- Optional_thunk.some k + | Complete -> k () + end + else + t.wakeup_reader <- Optional_thunk.some k ;; let wakeup_reader t = @@ -94,6 +106,14 @@ let wakeup_reader t = Optional_thunk.call_if_some f ;; +let transfer_reader_callback t reqd = + if Optional_thunk.is_some t.wakeup_reader + then ( + let f = t.wakeup_reader in + t.wakeup_reader <- Optional_thunk.none; + Reqd.on_more_input_available reqd (Optional_thunk.unchecked_value f)) +;; + let yield_writer t k = if is_closed t then failwith "yield_writer on closed conn" @@ -252,6 +272,9 @@ let rec _next_read_operation t = ) else ( let reqd = current_reqd_exn t in match Reqd.input_state reqd with + | Wait -> + transfer_reader_callback t reqd; + `Yield | Provide -> Reader.next t.reader | Complete -> _final_read_operation_for t reqd ) @@ -369,7 +392,7 @@ and _final_write_operation_for t reqd = Writer.next t.writer; ) else ( match Reqd.input_state reqd with - | Provide -> assert false + | Wait | Provide -> assert false | Complete -> advance_request_queue t; _next_write_operation t; diff --git a/lib_test/test_httpaf.ml b/lib_test/test_httpaf.ml index b7b7133..fac15f9 100644 --- a/lib_test/test_httpaf.ml +++ b/lib_test/test_httpaf.ml @@ -1151,6 +1151,61 @@ Accept-Language: en-US,en;q=0.5\r\n\r\n"; Alcotest.(check bool) "Reader woken up" true !reader_woken_up; ;; + let request_handler continue_reading reqd = + let request_body = Reqd.request_body reqd in + let rec on_read _buffer ~off:_ ~len:_ = + continue_reading := (fun () -> + Body.schedule_read request_body ~on_eof ~on_read); + and on_eof () = print_endline "got eof" in + Body.schedule_read request_body ~on_eof ~on_read + + let test_handling_backpressure_when_read_not_scheduled () = + let reader_woken_up = ref false in + let continue_reading = ref (fun () -> ()) in + let t = create ~error_handler (request_handler continue_reading) in + reader_ready t; + writer_yielded t; + let request = + Request.create + `GET + ~headers:(Headers.of_list ["content-length", "10"]) + "/" + in + read_request t request; + yield_writer t ignore; + read_string t "five."; + reader_yielded t; + yield_reader t (fun () -> reader_woken_up := true); + !continue_reading (); + reader_ready ~msg:"Reader wants to read if there's a read scheduled in the body" t; + Alcotest.(check bool) "Reader wakes up if scheduling read" true !reader_woken_up; + writer_yielded t; + ;; + + let test_handling_backpressure_when_read_not_scheduled_early_yield () = + let reader_woken_up = ref false in + let continue_reading = ref (fun () -> ()) in + let t = create ~error_handler (request_handler continue_reading) in + reader_ready t; + writer_yielded t; + let request = + Request.create + `GET + ~headers:(Headers.of_list ["content-length", "10"]) + "/" + in + read_request t request; + yield_reader t (fun () -> reader_woken_up := true); + yield_writer t ignore; + read_string t "five."; + reader_yielded t; + !continue_reading (); + reader_ready ~msg:"Reader wants to read if there's a read scheduled in the body" t; + Alcotest.(check bool) "Reader wakes up if scheduling read" true !reader_woken_up; + writer_yielded t; + ;; + + let tests = [ "initial reader state" , `Quick, test_initial_reader_state ; "shutdown reader closed", `Quick, test_reader_is_closed_after_eof @@ -1183,6 +1238,8 @@ Accept-Language: en-US,en;q=0.5\r\n\r\n"; ; "`flush_headers_immediately` with empty body", `Quick, test_immediate_flush_empty_body ; "empty body with no immediate flush", `Quick, test_empty_body_no_immediate_flush ; "yield before starting a response", `Quick, test_yield_before_starting_a_response + ; "test yield when read isn't scheduled", `Quick, test_handling_backpressure_when_read_not_scheduled + ; "test yield when read isn't scheduled, reader yields early", `Quick, test_handling_backpressure_when_read_not_scheduled_early_yield ] end From 3dfe5b1c3436ce5fba8e22ea6f28ccde92db61e0 Mon Sep 17 00:00:00 2001 From: Antonio Nuno Monteiro Date: Sun, 19 Apr 2020 16:41:46 -0700 Subject: [PATCH 2/5] Handle backpressure in the client --- lib/body.ml | 2 ++ lib/client_connection.ml | 25 ++++++++++++-- lib/input_state.ml | 4 +++ lib/output_state.ml | 4 +++ lib/reqd.ml | 9 +---- lib/respd.ml | 32 +++++++++-------- lib/response_state.ml | 7 ---- lib/server_connection.ml | 4 +-- lib_test/test_httpaf.ml | 75 ++++++++++++++++++++++++++++++++++++++-- 9 files changed, 126 insertions(+), 36 deletions(-) create mode 100644 lib/input_state.ml create mode 100644 lib/output_state.ml diff --git a/lib/body.ml b/lib/body.ml index 13ba9f2..6d79a8d 100644 --- a/lib/body.ml +++ b/lib/body.ml @@ -139,6 +139,8 @@ let schedule_read t ~on_eof ~on_read = ready_to_read t; end +let has_scheduled_read t = t.read_scheduled + let has_pending_output t = (* Force another write poll to make sure that the final chunk is emitted for chunk-encoded bodies. diff --git a/lib/client_connection.ml b/lib/client_connection.ml index 3a1deef..ef76861 100644 --- a/lib/client_connection.ml +++ b/lib/client_connection.ml @@ -69,6 +69,17 @@ let yield_reader t k = then failwith "yield_reader on closed conn" else if Optional_thunk.is_some t.wakeup_reader then failwith "yield_reader: only one callback can be registered at a time" + else if is_active t then + let respd = current_respd_exn t in + begin match Respd.input_state respd with + | Wait -> + (* `Wait` means that the response body isn't closed yet (there may be + * more incoming bytes) but the response handler hasn't scheduled a read + * either. *) + Respd.on_more_input_available respd k + | Provide -> t.wakeup_reader <- Optional_thunk.some k + | Complete -> k () + end else t.wakeup_reader <- Optional_thunk.some k let wakeup_reader t = @@ -76,6 +87,13 @@ let wakeup_reader t = t.wakeup_reader <- Optional_thunk.none; Optional_thunk.call_if_some f +let transfer_reader_callback t respd = + if Optional_thunk.is_some t.wakeup_reader + then ( + let f = t.wakeup_reader in + t.wakeup_reader <- Optional_thunk.none; + Respd.on_more_input_available respd (Optional_thunk.unchecked_value f)) + let yield_writer t k = if is_closed t then failwith "yield_writer on closed conn" @@ -207,6 +225,9 @@ let rec _next_read_operation t = ) else ( let respd = current_respd_exn t in match Respd.input_state respd with + | Wait -> + transfer_reader_callback t respd; + `Yield | Provide -> Reader.next t.reader | Complete -> _final_read_operation_for t respd ) @@ -255,7 +276,7 @@ let read_eof t bs ~off ~len = if is_active t then begin let respd = current_respd_exn t in match Respd.input_state respd with - | Provide -> unexpected_eof t + | Wait | Provide -> unexpected_eof t | Complete -> () end; bytes_read @@ -283,7 +304,7 @@ and _final_write_operation_for t respd = Writer.next t.writer; ) else ( match Respd.input_state respd with - | Provide -> + | Wait | Provide -> (* From RFC7230ยง6.3.2: * A client that supports persistent connections MAY "pipeline" its * requests (i.e., send multiple requests without waiting for each diff --git a/lib/input_state.ml b/lib/input_state.ml new file mode 100644 index 0000000..ebc7996 --- /dev/null +++ b/lib/input_state.ml @@ -0,0 +1,4 @@ +type t = + | Provide + | Wait + | Complete diff --git a/lib/output_state.ml b/lib/output_state.ml new file mode 100644 index 0000000..91fceff --- /dev/null +++ b/lib/output_state.ml @@ -0,0 +1,4 @@ +type t = + | Consume + | Wait + | Complete diff --git a/lib/reqd.ml b/lib/reqd.ml index 0b2b8a2..8e8b5a0 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -34,13 +34,6 @@ type error = [ `Bad_request | `Bad_gateway | `Internal_server_error | `Exn of exn ] -module Input_state = struct - type t = - | Provide - | Wait - | Complete -end - type error_handler = ?request:Request.t -> error -> (Headers.t -> [`write] Body.t) -> unit @@ -247,7 +240,7 @@ let persistent_connection t = let input_state t : Input_state.t = if Body.is_closed t.request_body then Complete - else if t.request_body.read_scheduled + else if Body.has_scheduled_read t.request_body then Provide else Wait diff --git a/lib/respd.ml b/lib/respd.ml index 80bec64..837be05 100644 --- a/lib/respd.ml +++ b/lib/respd.ml @@ -14,19 +14,6 @@ module Request_state = struct | Closed end -module Input_state = struct - type t = - | Provide - | Complete -end - -module Output_state = struct - type t = - | Consume - | Wait - | Complete -end - type t = { request : Request.t ; request_body : [ `write ] Body.t @@ -74,6 +61,19 @@ let write_request t = Writer.flush t.writer (fun () -> t.state <- Awaiting_response) +let on_more_input_available t f = + match t.state with + | Received_response (_, response_body) -> + Body.when_ready_to_read response_body f + + (* Don't expect to be called in these states. *) + | Uninitialized + | Awaiting_response -> + failwith "httpaf.Respd.on_more_input_available: response hasn't started" + | Upgraded _ + | Closed -> + failwith "httpaf.Respd.on_more_input_available: response already complete" + let on_more_output_available { request_body; _ } f = Body.when_ready_to_write request_body f @@ -112,9 +112,11 @@ let input_state t : Input_state.t = | Uninitialized | Awaiting_response -> Provide | Received_response (_, response_body) -> - if not (Body.is_closed response_body) + if Body.is_closed response_body + then Complete + else if Body.has_scheduled_read response_body then Provide - else Complete + else Wait | Upgraded _ | Closed -> Complete diff --git a/lib/response_state.ml b/lib/response_state.ml index ee994da..84a609b 100644 --- a/lib/response_state.ml +++ b/lib/response_state.ml @@ -1,10 +1,3 @@ -module Output_state = struct - type t = - | Consume - | Wait - | Complete -end - type ('handle, 'io) t = | Waiting of Optional_thunk.t ref | Complete of Response.t diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 9ce97e5..c25cfee 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -89,8 +89,8 @@ let yield_reader t k = let reqd = current_reqd_exn t in begin match Reqd.input_state reqd with | Wait -> - (* `Wait` means that the request body isn't closed yet (there might be - * more incoming bytes) but the request handler hasn't scheduled a read + (* `Wait` means that the request body isn't closed yet (there may be more + * incoming bytes) but the request handler hasn't scheduled a read * either. *) Reqd.on_more_input_available reqd k | Provide -> t.wakeup_reader <- Optional_thunk.some k diff --git a/lib_test/test_httpaf.ml b/lib_test/test_httpaf.ml index fac15f9..3d78838 100644 --- a/lib_test/test_httpaf.ml +++ b/lib_test/test_httpaf.ml @@ -1268,8 +1268,8 @@ module Client_connection = struct read_string t response_string ;; - let reader_ready t = - Alcotest.check read_operation "Reader is ready" + let reader_ready ?(msg="Reader is ready") t = + Alcotest.check read_operation msg `Read (next_read_operation t :> [`Close | `Read | `Yield | `Upgrade]); ;; @@ -1842,6 +1842,75 @@ module Client_connection = struct !error_message ;; + let backpressure_response_handler continue_reading expected_response response body = + Alcotest.check (module Response) "expected response" expected_response response; + let rec on_read _buffer ~off:_ ~len:_ = + continue_reading := (fun () -> + Body.schedule_read body ~on_eof ~on_read); + and on_eof () = print_endline "got eof" in + Body.schedule_read body ~on_eof ~on_read + ;; + + let test_handling_backpressure_when_read_not_scheduled () = + let reader_woken_up = ref false in + let continue_reading = ref (fun () -> ()) in + let request' = Request.create `GET "/" in + let response = + Response.create ~headers:(Headers.of_list ["content-length", "10"]) `OK + in + let t = create ?config:None in + let body = + request + t + request' + ~response_handler:(backpressure_response_handler continue_reading response) + ~error_handler:no_error_handler + in + write_request t request'; + writer_yielded t; + Body.close_writer body; + reader_ready t; + read_response t response; + yield_writer t ignore; + read_string t "five."; + reader_yielded t; + yield_reader t (fun () -> reader_woken_up := true); + !continue_reading (); + reader_ready ~msg:"Reader wants to read if there's a read scheduled in the body" t; + Alcotest.(check bool) "Reader wakes up if scheduling read" true !reader_woken_up; + writer_yielded t; + ;; + + let test_handling_backpressure_when_read_not_scheduled_early_yield () = + let reader_woken_up = ref false in + let continue_reading = ref (fun () -> ()) in + let request' = Request.create `GET "/" in + let response = + Response.create ~headers:(Headers.of_list ["content-length", "10"]) `OK + in + let t = create ?config:None in + let body = + request + t + request' + ~response_handler:(backpressure_response_handler continue_reading response) + ~error_handler:no_error_handler + in + write_request t request'; + writer_yielded t; + Body.close_writer body; + reader_ready t; + read_response t response; + yield_reader t (fun () -> reader_woken_up := true); + yield_writer t ignore; + read_string t "five."; + reader_yielded t; + !continue_reading (); + reader_ready ~msg:"Reader wants to read if there's a read scheduled in the body" t; + Alcotest.(check bool) "Reader wakes up if scheduling read" true !reader_woken_up; + writer_yielded t; + ;; + let tests = [ "commit parse after every header line", `Quick, test_commit_parse_after_every_header ; "GET" , `Quick, test_get @@ -1857,6 +1926,8 @@ module Client_connection = struct ; "Fixed body shuts down writer if connection is not persistent", `Quick, test_fixed_body ; "Fixed body doesn't shut down the writer if connection is persistent",`Quick, test_fixed_body_persistent_connection ; "Client support for upgrading a connection", `Quick, test_client_upgrade + ; "test yield when read isn't scheduled", `Quick, test_handling_backpressure_when_read_not_scheduled + ; "test yield when read isn't scheduled, reader yields early", `Quick, test_handling_backpressure_when_read_not_scheduled_early_yield ] end From 166a3c5df9305974ad83265c33f581bf30bfa460 Mon Sep 17 00:00:00 2001 From: Antonio Nuno Monteiro Date: Sun, 19 Apr 2020 18:41:36 -0700 Subject: [PATCH 3/5] tests: rename backpressure request handler --- lib_test/test_httpaf.ml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib_test/test_httpaf.ml b/lib_test/test_httpaf.ml index 3d78838..bf79b75 100644 --- a/lib_test/test_httpaf.ml +++ b/lib_test/test_httpaf.ml @@ -1151,18 +1151,18 @@ Accept-Language: en-US,en;q=0.5\r\n\r\n"; Alcotest.(check bool) "Reader woken up" true !reader_woken_up; ;; - let request_handler continue_reading reqd = + let backpressure_request_handler continue_reading reqd = let request_body = Reqd.request_body reqd in let rec on_read _buffer ~off:_ ~len:_ = continue_reading := (fun () -> Body.schedule_read request_body ~on_eof ~on_read); - and on_eof () = print_endline "got eof" in + and on_eof () = print_endline ("got eof" ^ (string_of_bool (Body.is_closed request_body))) in Body.schedule_read request_body ~on_eof ~on_read let test_handling_backpressure_when_read_not_scheduled () = let reader_woken_up = ref false in let continue_reading = ref (fun () -> ()) in - let t = create ~error_handler (request_handler continue_reading) in + let t = create ~error_handler (backpressure_request_handler continue_reading) in reader_ready t; writer_yielded t; let request = @@ -1185,7 +1185,7 @@ Accept-Language: en-US,en;q=0.5\r\n\r\n"; let test_handling_backpressure_when_read_not_scheduled_early_yield () = let reader_woken_up = ref false in let continue_reading = ref (fun () -> ()) in - let t = create ~error_handler (request_handler continue_reading) in + let t = create ~error_handler (backpressure_request_handler continue_reading) in reader_ready t; writer_yielded t; let request = From ccdde2e1a0800cb75cf66fd01fba3ca471217886 Mon Sep 17 00:00:00 2001 From: Antonio Nuno Monteiro Date: Sun, 19 Apr 2020 20:03:30 -0700 Subject: [PATCH 4/5] changelog --- CHANGES.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 48e141a..e19b6cc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,6 +64,8 @@ Unreleased ([#45](https://github.com/anmonteiro/httpaf/pull/45)) - httpaf: Fix sending streaming error responses; in particular, allow sending chunk-encoded responses ([#56](https://github.com/anmonteiro/httpaf/pull/56)) +- httpaf: handle read backpressure in server and client implementations + ([#59](https://github.com/anmonteiro/httpaf/pull/59)) httpaf (upstream) 0.6.5 -------------- From 1c58ad88898ad4750aaf32ff7ec4bf1e35911575 Mon Sep 17 00:00:00 2001 From: Antonio Nuno Monteiro Date: Sun, 19 Apr 2020 20:28:54 -0700 Subject: [PATCH 5/5] has_scheduled_read -> is_read_scheduled --- lib/body.ml | 2 +- lib/reqd.ml | 2 +- lib/respd.ml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/body.ml b/lib/body.ml index 6d79a8d..557f021 100644 --- a/lib/body.ml +++ b/lib/body.ml @@ -139,7 +139,7 @@ let schedule_read t ~on_eof ~on_read = ready_to_read t; end -let has_scheduled_read t = t.read_scheduled +let is_read_scheduled t = t.read_scheduled let has_pending_output t = (* Force another write poll to make sure that the final chunk is emitted for diff --git a/lib/reqd.ml b/lib/reqd.ml index 8e8b5a0..acde261 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -240,7 +240,7 @@ let persistent_connection t = let input_state t : Input_state.t = if Body.is_closed t.request_body then Complete - else if Body.has_scheduled_read t.request_body + else if Body.is_read_scheduled t.request_body then Provide else Wait diff --git a/lib/respd.ml b/lib/respd.ml index 837be05..2c32f86 100644 --- a/lib/respd.ml +++ b/lib/respd.ml @@ -114,7 +114,7 @@ let input_state t : Input_state.t = | Received_response (_, response_body) -> if Body.is_closed response_body then Complete - else if Body.has_scheduled_read response_body + else if Body.is_read_scheduled response_body then Provide else Wait | Upgraded _