From 40443d4b0800b1d15a112714d8b763702c7b27bf Mon Sep 17 00:00:00 2001 From: Antonio Nuno Monteiro Date: Sun, 19 Apr 2020 20:36:33 -0700 Subject: [PATCH] Handle backpressure when reading incoming data (#59) --- CHANGES.md | 2 + lib/body.ml | 19 +++++++- lib/client_connection.ml | 27 +++++++++-- lib/input_state.ml | 4 ++ lib/output_state.ml | 4 ++ lib/reqd.ml | 14 +++--- lib/respd.ml | 36 +++++++------- lib/response_state.ml | 7 --- lib/server_connection.ml | 27 ++++++++++- lib_test/test_client_connection.ml | 75 +++++++++++++++++++++++++++++- lib_test/test_server_connection.ml | 56 ++++++++++++++++++++++ 11 files changed, 231 insertions(+), 40 deletions(-) create mode 100644 lib/input_state.ml create mode 100644 lib/output_state.ml diff --git a/CHANGES.md b/CHANGES.md index 48e141a..e19b6cc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,6 +64,8 @@ Unreleased ([#45](https://github.com/anmonteiro/httpaf/pull/45)) - httpaf: Fix sending streaming error responses; in particular, allow sending chunk-encoded responses ([#56](https://github.com/anmonteiro/httpaf/pull/56)) +- httpaf: handle read backpressure in server and client implementations + ([#59](https://github.com/anmonteiro/httpaf/pull/59)) httpaf (upstream) 0.6.5 -------------- diff --git a/lib/body.ml b/lib/body.ml index 6d9aff9..5918b2a 100644 --- a/lib/body.ml +++ b/lib/body.ml @@ -37,6 +37,7 @@ type _ t = ; mutable write_final_if_chunked : bool ; mutable on_eof : unit -> unit ; mutable on_read : Bigstringaf.t -> off:int -> len:int -> unit + ; mutable when_ready_to_read : Optional_thunk.t ; mutable when_ready_to_write : Optional_thunk.t ; buffered_bytes : int ref } @@ -50,6 +51,7 @@ let of_faraday faraday = ; write_final_if_chunked = true ; on_eof = default_on_eof ; on_read = default_on_read + ; when_ready_to_read = Optional_thunk.none ; when_ready_to_write = Optional_thunk.none ; buffered_bytes = ref 0 } @@ -80,6 +82,11 @@ let write_bigstring t ?off ?len b = let schedule_bigstring t ?off ?len (b:Bigstringaf.t) = Faraday.schedule_bigstring ?off ?len t.faraday b +let ready_to_read t = + let callback = t.when_ready_to_read in + t.when_ready_to_read <- Optional_thunk.none; + Optional_thunk.call_if_some callback + let ready_to_write t = let callback = t.when_ready_to_write in t.when_ready_to_write <- Optional_thunk.none; @@ -128,9 +135,12 @@ let schedule_read t ~on_eof ~on_read = else begin t.read_scheduled <- true; t.on_eof <- on_eof; - t.on_read <- on_read + t.on_read <- on_read; + ready_to_read t; end +let is_read_scheduled t = t.read_scheduled + let has_pending_output t = (* Force another write poll to make sure that the final chunk is emitted for chunk-encoded bodies. @@ -150,6 +160,13 @@ let close_reader t = execute_read t ;; +let when_ready_to_read t callback = + if is_closed t + then callback () + else if Optional_thunk.is_some t.when_ready_to_read + then failwith "Body.when_ready_to_read: only one callback can be registered at a time" + else t.when_ready_to_read <- Optional_thunk.some callback + let when_ready_to_write t callback = if is_closed t then callback () diff --git a/lib/client_connection.ml b/lib/client_connection.ml index c63864f..63d6ada 100644 --- a/lib/client_connection.ml +++ b/lib/client_connection.ml @@ -69,6 +69,17 @@ let yield_reader t k = then failwith "on_wakeup_reader on closed conn" else if Optional_thunk.is_some t.wakeup_reader then failwith "yield_reader: only one callback can be registered at a time" + else if is_active t then + let respd = current_respd_exn t in + begin match Respd.input_state respd with + | Wait -> + (* `Wait` means that the response body isn't closed yet (there may be + * more incoming bytes) but the response handler hasn't scheduled a read + * either. *) + Respd.on_more_input_available respd k + | Ready -> t.wakeup_reader <- Optional_thunk.some k + | Complete -> k () + end else t.wakeup_reader <- Optional_thunk.some k let wakeup_reader t = @@ -76,6 +87,13 @@ let wakeup_reader t = t.wakeup_reader <- Optional_thunk.none; Optional_thunk.call_if_some f +let transfer_reader_callback t respd = + if Optional_thunk.is_some t.wakeup_reader + then ( + let f = t.wakeup_reader in + t.wakeup_reader <- Optional_thunk.none; + Respd.on_more_input_available respd (Optional_thunk.unchecked_value f)) + let yield_writer t k = if is_closed t then failwith "yield_writer on closed conn" @@ -207,7 +225,10 @@ let rec _next_read_operation t = ) else ( let respd = current_respd_exn t in match Respd.input_state respd with - | Provide -> Reader.next t.reader + | Wait -> + transfer_reader_callback t respd; + `Yield + | Ready -> Reader.next t.reader | Complete -> _final_read_operation_for t respd ) @@ -255,7 +276,7 @@ let read_eof t bs ~off ~len = if is_active t then begin let respd = current_respd_exn t in match Respd.input_state respd with - | Provide -> unexpected_eof t + | Wait | Ready -> unexpected_eof t | Complete -> () end; bytes_read @@ -283,7 +304,7 @@ and _final_write_operation_for t respd = Writer.next t.writer; ) else ( match Respd.input_state respd with - | Provide -> + | Wait | Ready -> (* From RFC7230ยง6.3.2: * A client that supports persistent connections MAY "pipeline" its * requests (i.e., send multiple requests without waiting for each diff --git a/lib/input_state.ml b/lib/input_state.ml new file mode 100644 index 0000000..cc9ce4c --- /dev/null +++ b/lib/input_state.ml @@ -0,0 +1,4 @@ +type t = + | Ready + | Wait + | Complete diff --git a/lib/output_state.ml b/lib/output_state.ml new file mode 100644 index 0000000..91fceff --- /dev/null +++ b/lib/output_state.ml @@ -0,0 +1,4 @@ +type t = + | Consume + | Wait + | Complete diff --git a/lib/reqd.ml b/lib/reqd.ml index a50e42c..d59dad6 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -34,12 +34,6 @@ type error = [ `Bad_request | `Bad_gateway | `Internal_server_error | `Exn of exn ] -module Input_state = struct - type t = - | Ready - | Complete -end - type error_handler = ?request:Request.t -> error -> (Headers.t -> [`write] Body.t) -> unit @@ -229,6 +223,9 @@ let error_code t = | #error as error -> Some error | `Ok -> None +let on_more_input_available t f = + Body.when_ready_to_read t.request_body f + let on_more_output_available t f = Response_state.on_more_output_available t.response_state f @@ -238,8 +235,9 @@ let persistent_connection t = let input_state t : Input_state.t = if Body.is_closed t.request_body then Complete - else Ready -;; + else if Body.is_read_scheduled t.request_body + then Ready + else Wait let output_state t = Response_state.output_state t.response_state diff --git a/lib/respd.ml b/lib/respd.ml index 60d2a4a..6c5f990 100644 --- a/lib/respd.ml +++ b/lib/respd.ml @@ -14,19 +14,6 @@ module Request_state = struct | Closed end -module Input_state = struct - type t = - | Provide - | Complete -end - -module Output_state = struct - type t = - | Consume - | Wait - | Complete -end - type t = { request : Request.t ; request_body : [ `write ] Body.t @@ -74,6 +61,19 @@ let write_request t = Writer.flush t.writer (fun () -> t.state <- Awaiting_response) +let on_more_input_available t f = + match t.state with + | Received_response (_, response_body) -> + Body.when_ready_to_read response_body f + + (* Don't expect to be called in these states. *) + | Uninitialized + | Awaiting_response -> + failwith "httpaf.Respd.on_more_input_available: response hasn't started" + | Upgraded _ + | Closed -> + failwith "httpaf.Respd.on_more_input_available: response already complete" + let on_more_output_available { request_body; _ } f = Body.when_ready_to_write request_body f @@ -108,11 +108,13 @@ let close_response_body t = let input_state t : Input_state.t = match t.state with | Uninitialized - | Awaiting_response -> Provide + | Awaiting_response -> Ready | Received_response (_, response_body) -> - if not (Body.is_closed response_body) - then Provide - else Complete + if Body.is_closed response_body + then Complete + else if Body.is_read_scheduled response_body + then Ready + else Wait | Upgraded _ | Closed -> Complete diff --git a/lib/response_state.ml b/lib/response_state.ml index 02d0d4a..760ff34 100644 --- a/lib/response_state.ml +++ b/lib/response_state.ml @@ -1,10 +1,3 @@ -module Output_state = struct - type t = - | Consume - | Wait - | Complete -end - type t = | Waiting of Optional_thunk.t ref | Complete of Response.t diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 83e4b72..f5c5194 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -85,7 +85,19 @@ let yield_reader t k = then failwith "on_wakeup_reader on closed conn" else if Optional_thunk.is_some t.wakeup_reader then failwith "yield_reader: only one callback can be registered at a time" - else t.wakeup_reader <- Optional_thunk.some k + else if is_active t then + let reqd = current_reqd_exn t in + begin match Reqd.input_state reqd with + | Wait -> + (* `Wait` means that the request body isn't closed yet (there may be more + * incoming bytes) but the request handler hasn't scheduled a read + * either. *) + Reqd.on_more_input_available reqd k + | Ready -> t.wakeup_reader <- Optional_thunk.some k + | Complete -> k () + end + else + t.wakeup_reader <- Optional_thunk.some k ;; let wakeup_reader t = @@ -94,6 +106,14 @@ let wakeup_reader t = Optional_thunk.call_if_some f ;; +let transfer_reader_callback t reqd = + if Optional_thunk.is_some t.wakeup_reader + then ( + let f = t.wakeup_reader in + t.wakeup_reader <- Optional_thunk.none; + Reqd.on_more_input_available reqd (Optional_thunk.unchecked_value f)) +;; + let yield_writer t k = if is_closed t then failwith "yield_writer on closed conn" @@ -252,6 +272,9 @@ let rec _next_read_operation t = ) else ( let reqd = current_reqd_exn t in match Reqd.input_state reqd with + | Wait -> + transfer_reader_callback t reqd; + `Yield | Ready -> Reader.next t.reader | Complete -> _final_read_operation_for t reqd ) @@ -354,7 +377,7 @@ and _final_write_operation_for t reqd = Writer.next t.writer; ) else ( match Reqd.input_state reqd with - | Ready -> assert false + | Wait | Ready -> assert false | Complete -> advance_request_queue t; _next_write_operation t; diff --git a/lib_test/test_client_connection.ml b/lib_test/test_client_connection.ml index c4501d6..e806357 100644 --- a/lib_test/test_client_connection.ml +++ b/lib_test/test_client_connection.ml @@ -24,8 +24,8 @@ let read_response t r = read_string t response_string ;; -let reader_ready t = - Alcotest.check read_operation "Reader is ready" +let reader_ready ?(msg="Reader is ready") t = + Alcotest.check read_operation msg `Read (next_read_operation t :> [`Close | `Read | `Yield]); ;; @@ -628,6 +628,75 @@ let test_client_upgrade () = writer_closed t; ;; +let backpressure_response_handler continue_reading expected_response response body = + Alcotest.check (module Response) "expected response" expected_response response; + let rec on_read _buffer ~off:_ ~len:_ = + continue_reading := (fun () -> + Body.schedule_read body ~on_eof ~on_read); + and on_eof () = print_endline "got eof" in + Body.schedule_read body ~on_eof ~on_read +;; + +let test_handling_backpressure_when_read_not_scheduled () = + let reader_woken_up = ref false in + let continue_reading = ref (fun () -> ()) in + let request' = Request.create `GET "/" in + let response = + Response.create ~headers:(Headers.of_list ["content-length", "10"]) `OK + in + let t = create ?config:None in + let body = + request + t + request' + ~response_handler:(backpressure_response_handler continue_reading response) + ~error_handler:no_error_handler + in + write_request t request'; + writer_yielded t; + Body.close_writer body; + reader_ready t; + read_response t response; + yield_writer t ignore; + read_string t "five."; + reader_yielded t; + yield_reader t (fun () -> reader_woken_up := true); + !continue_reading (); + reader_ready ~msg:"Reader wants to read if there's a read scheduled in the body" t; + Alcotest.(check bool) "Reader wakes up if scheduling read" true !reader_woken_up; + writer_yielded t; +;; + +let test_handling_backpressure_when_read_not_scheduled_early_yield () = + let reader_woken_up = ref false in + let continue_reading = ref (fun () -> ()) in + let request' = Request.create `GET "/" in + let response = + Response.create ~headers:(Headers.of_list ["content-length", "10"]) `OK + in + let t = create ?config:None in + let body = + request + t + request' + ~response_handler:(backpressure_response_handler continue_reading response) + ~error_handler:no_error_handler + in + write_request t request'; + writer_yielded t; + Body.close_writer body; + reader_ready t; + read_response t response; + yield_reader t (fun () -> reader_woken_up := true); + yield_writer t ignore; + read_string t "five."; + reader_yielded t; + !continue_reading (); + reader_ready ~msg:"Reader wants to read if there's a read scheduled in the body" t; + Alcotest.(check bool) "Reader wakes up if scheduling read" true !reader_woken_up; + writer_yielded t; +;; + let tests = [ "commit parse after every header line", `Quick, test_commit_parse_after_every_header ; "GET" , `Quick, test_get @@ -645,4 +714,6 @@ let tests = ; "Fixed body shuts down writer if connection is not persistent", `Quick, test_fixed_body ; "Fixed body doesn't shut down the writer if connection is persistent",`Quick, test_fixed_body_persistent_connection ; "Client support for upgrading a connection", `Quick, test_client_upgrade + ; "test yield when read isn't scheduled", `Quick, test_handling_backpressure_when_read_not_scheduled + ; "test yield when read isn't scheduled, reader yields early", `Quick, test_handling_backpressure_when_read_not_scheduled_early_yield ] diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index f5c2d29..73234ff 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -883,6 +883,60 @@ let test_yield_before_starting_a_response () = Alcotest.(check bool) "Reader woken up" true !reader_woken_up; ;; +let backpressure_request_handler continue_reading reqd = + let request_body = Reqd.request_body reqd in + let rec on_read _buffer ~off:_ ~len:_ = + continue_reading := (fun () -> + Body.schedule_read request_body ~on_eof ~on_read); + and on_eof () = print_endline ("got eof" ^ (string_of_bool (Body.is_closed request_body))) in + Body.schedule_read request_body ~on_eof ~on_read + +let test_handling_backpressure_when_read_not_scheduled () = + let reader_woken_up = ref false in + let continue_reading = ref (fun () -> ()) in + let t = create ~error_handler (backpressure_request_handler continue_reading) in + reader_ready t; + writer_yielded t; + let request = + Request.create + `GET + ~headers:(Headers.of_list ["content-length", "10"]) + "/" + in + read_request t request; + yield_writer t ignore; + read_string t "five."; + reader_yielded t; + yield_reader t (fun () -> reader_woken_up := true); + !continue_reading (); + reader_ready ~msg:"Reader wants to read if there's a read scheduled in the body" t; + Alcotest.(check bool) "Reader wakes up if scheduling read" true !reader_woken_up; + writer_yielded t; +;; + +let test_handling_backpressure_when_read_not_scheduled_early_yield () = + let reader_woken_up = ref false in + let continue_reading = ref (fun () -> ()) in + let t = create ~error_handler (backpressure_request_handler continue_reading) in + reader_ready t; + writer_yielded t; + let request = + Request.create + `GET + ~headers:(Headers.of_list ["content-length", "10"]) + "/" + in + read_request t request; + yield_reader t (fun () -> reader_woken_up := true); + yield_writer t ignore; + read_string t "five."; + reader_yielded t; + !continue_reading (); + reader_ready ~msg:"Reader wants to read if there's a read scheduled in the body" t; + Alcotest.(check bool) "Reader wakes up if scheduling read" true !reader_woken_up; + writer_yielded t; +;; + let tests = [ "initial reader state" , `Quick, test_initial_reader_state ; "shutdown reader closed", `Quick, test_reader_is_closed_after_eof @@ -916,4 +970,6 @@ let tests = ; "`flush_headers_immediately` with empty body", `Quick, test_immediate_flush_empty_body ; "empty body with no immediate flush", `Quick, test_empty_body_no_immediate_flush ; "yield before starting a response", `Quick, test_yield_before_starting_a_response + ; "test yield when read isn't scheduled", `Quick, test_handling_backpressure_when_read_not_scheduled + ; "test yield when read isn't scheduled, reader yields early", `Quick, test_handling_backpressure_when_read_not_scheduled_early_yield ]