Skip to content

Commit

Permalink
Handle backpressure when reading incoming data (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
anmonteiro committed May 11, 2020
1 parent df633d0 commit 40443d4
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 40 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ Unreleased
([#45](https://github.com/anmonteiro/httpaf/pull/45))
- httpaf: Fix sending streaming error responses; in particular, allow sending
chunk-encoded responses ([#56](https://github.com/anmonteiro/httpaf/pull/56))
- httpaf: handle read backpressure in server and client implementations
([#59](https://github.com/anmonteiro/httpaf/pull/59))

httpaf (upstream) 0.6.5
--------------
Expand Down
19 changes: 18 additions & 1 deletion lib/body.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type _ t =
; mutable write_final_if_chunked : bool
; mutable on_eof : unit -> unit
; mutable on_read : Bigstringaf.t -> off:int -> len:int -> unit
; mutable when_ready_to_read : Optional_thunk.t
; mutable when_ready_to_write : Optional_thunk.t
; buffered_bytes : int ref
}
Expand All @@ -50,6 +51,7 @@ let of_faraday faraday =
; write_final_if_chunked = true
; on_eof = default_on_eof
; on_read = default_on_read
; when_ready_to_read = Optional_thunk.none
; when_ready_to_write = Optional_thunk.none
; buffered_bytes = ref 0
}
Expand Down Expand Up @@ -80,6 +82,11 @@ let write_bigstring t ?off ?len b =
let schedule_bigstring t ?off ?len (b:Bigstringaf.t) =
Faraday.schedule_bigstring ?off ?len t.faraday b

let ready_to_read t =
let callback = t.when_ready_to_read in
t.when_ready_to_read <- Optional_thunk.none;
Optional_thunk.call_if_some callback

let ready_to_write t =
let callback = t.when_ready_to_write in
t.when_ready_to_write <- Optional_thunk.none;
Expand Down Expand Up @@ -128,9 +135,12 @@ let schedule_read t ~on_eof ~on_read =
else begin
t.read_scheduled <- true;
t.on_eof <- on_eof;
t.on_read <- on_read
t.on_read <- on_read;
ready_to_read t;
end

let is_read_scheduled t = t.read_scheduled

let has_pending_output t =
(* Force another write poll to make sure that the final chunk is emitted for
chunk-encoded bodies.
Expand All @@ -150,6 +160,13 @@ let close_reader t =
execute_read t
;;

let when_ready_to_read t callback =
if is_closed t
then callback ()
else if Optional_thunk.is_some t.when_ready_to_read
then failwith "Body.when_ready_to_read: only one callback can be registered at a time"
else t.when_ready_to_read <- Optional_thunk.some callback

let when_ready_to_write t callback =
if is_closed t
then callback ()
Expand Down
27 changes: 24 additions & 3 deletions lib/client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,31 @@ let yield_reader t k =
then failwith "on_wakeup_reader on closed conn"
else if Optional_thunk.is_some t.wakeup_reader
then failwith "yield_reader: only one callback can be registered at a time"
else if is_active t then
let respd = current_respd_exn t in
begin match Respd.input_state respd with
| Wait ->
(* `Wait` means that the response body isn't closed yet (there may be
* more incoming bytes) but the response handler hasn't scheduled a read
* either. *)
Respd.on_more_input_available respd k
| Ready -> t.wakeup_reader <- Optional_thunk.some k
| Complete -> k ()
end
else t.wakeup_reader <- Optional_thunk.some k

let wakeup_reader t =
let f = t.wakeup_reader in
t.wakeup_reader <- Optional_thunk.none;
Optional_thunk.call_if_some f

let transfer_reader_callback t respd =
if Optional_thunk.is_some t.wakeup_reader
then (
let f = t.wakeup_reader in
t.wakeup_reader <- Optional_thunk.none;
Respd.on_more_input_available respd (Optional_thunk.unchecked_value f))

let yield_writer t k =
if is_closed t
then failwith "yield_writer on closed conn"
Expand Down Expand Up @@ -207,7 +225,10 @@ let rec _next_read_operation t =
) else (
let respd = current_respd_exn t in
match Respd.input_state respd with
| Provide -> Reader.next t.reader
| Wait ->
transfer_reader_callback t respd;
`Yield
| Ready -> Reader.next t.reader
| Complete -> _final_read_operation_for t respd
)

Expand Down Expand Up @@ -255,7 +276,7 @@ let read_eof t bs ~off ~len =
if is_active t then begin
let respd = current_respd_exn t in
match Respd.input_state respd with
| Provide -> unexpected_eof t
| Wait | Ready -> unexpected_eof t
| Complete -> ()
end;
bytes_read
Expand Down Expand Up @@ -283,7 +304,7 @@ and _final_write_operation_for t respd =
Writer.next t.writer;
) else (
match Respd.input_state respd with
| Provide ->
| Wait | Ready ->
(* From RFC7230§6.3.2:
* A client that supports persistent connections MAY "pipeline" its
* requests (i.e., send multiple requests without waiting for each
Expand Down
4 changes: 4 additions & 0 deletions lib/input_state.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type t =
| Ready
| Wait
| Complete
4 changes: 4 additions & 0 deletions lib/output_state.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type t =
| Consume
| Wait
| Complete
14 changes: 6 additions & 8 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@
type error =
[ `Bad_request | `Bad_gateway | `Internal_server_error | `Exn of exn ]

module Input_state = struct
type t =
| Ready
| Complete
end

type error_handler =
?request:Request.t -> error -> (Headers.t -> [`write] Body.t) -> unit

Expand Down Expand Up @@ -229,6 +223,9 @@ let error_code t =
| #error as error -> Some error
| `Ok -> None

let on_more_input_available t f =
Body.when_ready_to_read t.request_body f

let on_more_output_available t f =
Response_state.on_more_output_available t.response_state f

Expand All @@ -238,8 +235,9 @@ let persistent_connection t =
let input_state t : Input_state.t =
if Body.is_closed t.request_body
then Complete
else Ready
;;
else if Body.is_read_scheduled t.request_body
then Ready
else Wait

let output_state t = Response_state.output_state t.response_state

Expand Down
36 changes: 19 additions & 17 deletions lib/respd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,6 @@ module Request_state = struct
| Closed
end

module Input_state = struct
type t =
| Provide
| Complete
end

module Output_state = struct
type t =
| Consume
| Wait
| Complete
end

type t =
{ request : Request.t
; request_body : [ `write ] Body.t
Expand Down Expand Up @@ -74,6 +61,19 @@ let write_request t =
Writer.flush t.writer (fun () ->
t.state <- Awaiting_response)

let on_more_input_available t f =
match t.state with
| Received_response (_, response_body) ->
Body.when_ready_to_read response_body f

(* Don't expect to be called in these states. *)
| Uninitialized
| Awaiting_response ->
failwith "httpaf.Respd.on_more_input_available: response hasn't started"
| Upgraded _
| Closed ->
failwith "httpaf.Respd.on_more_input_available: response already complete"

let on_more_output_available { request_body; _ } f =
Body.when_ready_to_write request_body f

Expand Down Expand Up @@ -108,11 +108,13 @@ let close_response_body t =
let input_state t : Input_state.t =
match t.state with
| Uninitialized
| Awaiting_response -> Provide
| Awaiting_response -> Ready
| Received_response (_, response_body) ->
if not (Body.is_closed response_body)
then Provide
else Complete
if Body.is_closed response_body
then Complete
else if Body.is_read_scheduled response_body
then Ready
else Wait
| Upgraded _
| Closed -> Complete

Expand Down
7 changes: 0 additions & 7 deletions lib/response_state.ml
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
module Output_state = struct
type t =
| Consume
| Wait
| Complete
end

type t =
| Waiting of Optional_thunk.t ref
| Complete of Response.t
Expand Down
27 changes: 25 additions & 2 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,19 @@ let yield_reader t k =
then failwith "on_wakeup_reader on closed conn"
else if Optional_thunk.is_some t.wakeup_reader
then failwith "yield_reader: only one callback can be registered at a time"
else t.wakeup_reader <- Optional_thunk.some k
else if is_active t then
let reqd = current_reqd_exn t in
begin match Reqd.input_state reqd with
| Wait ->
(* `Wait` means that the request body isn't closed yet (there may be more
* incoming bytes) but the request handler hasn't scheduled a read
* either. *)
Reqd.on_more_input_available reqd k
| Ready -> t.wakeup_reader <- Optional_thunk.some k
| Complete -> k ()
end
else
t.wakeup_reader <- Optional_thunk.some k
;;

let wakeup_reader t =
Expand All @@ -94,6 +106,14 @@ let wakeup_reader t =
Optional_thunk.call_if_some f
;;

let transfer_reader_callback t reqd =
if Optional_thunk.is_some t.wakeup_reader
then (
let f = t.wakeup_reader in
t.wakeup_reader <- Optional_thunk.none;
Reqd.on_more_input_available reqd (Optional_thunk.unchecked_value f))
;;

let yield_writer t k =
if is_closed t
then failwith "yield_writer on closed conn"
Expand Down Expand Up @@ -252,6 +272,9 @@ let rec _next_read_operation t =
) else (
let reqd = current_reqd_exn t in
match Reqd.input_state reqd with
| Wait ->
transfer_reader_callback t reqd;
`Yield
| Ready -> Reader.next t.reader
| Complete -> _final_read_operation_for t reqd
)
Expand Down Expand Up @@ -354,7 +377,7 @@ and _final_write_operation_for t reqd =
Writer.next t.writer;
) else (
match Reqd.input_state reqd with
| Ready -> assert false
| Wait | Ready -> assert false
| Complete ->
advance_request_queue t;
_next_write_operation t;
Expand Down
75 changes: 73 additions & 2 deletions lib_test/test_client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ let read_response t r =
read_string t response_string
;;

let reader_ready t =
Alcotest.check read_operation "Reader is ready"
let reader_ready ?(msg="Reader is ready") t =
Alcotest.check read_operation msg
`Read (next_read_operation t :> [`Close | `Read | `Yield]);
;;

Expand Down Expand Up @@ -628,6 +628,75 @@ let test_client_upgrade () =
writer_closed t;
;;

let backpressure_response_handler continue_reading expected_response response body =
Alcotest.check (module Response) "expected response" expected_response response;
let rec on_read _buffer ~off:_ ~len:_ =
continue_reading := (fun () ->
Body.schedule_read body ~on_eof ~on_read);
and on_eof () = print_endline "got eof" in
Body.schedule_read body ~on_eof ~on_read
;;

let test_handling_backpressure_when_read_not_scheduled () =
let reader_woken_up = ref false in
let continue_reading = ref (fun () -> ()) in
let request' = Request.create `GET "/" in
let response =
Response.create ~headers:(Headers.of_list ["content-length", "10"]) `OK
in
let t = create ?config:None in
let body =
request
t
request'
~response_handler:(backpressure_response_handler continue_reading response)
~error_handler:no_error_handler
in
write_request t request';
writer_yielded t;
Body.close_writer body;
reader_ready t;
read_response t response;
yield_writer t ignore;
read_string t "five.";
reader_yielded t;
yield_reader t (fun () -> reader_woken_up := true);
!continue_reading ();
reader_ready ~msg:"Reader wants to read if there's a read scheduled in the body" t;
Alcotest.(check bool) "Reader wakes up if scheduling read" true !reader_woken_up;
writer_yielded t;
;;

let test_handling_backpressure_when_read_not_scheduled_early_yield () =
let reader_woken_up = ref false in
let continue_reading = ref (fun () -> ()) in
let request' = Request.create `GET "/" in
let response =
Response.create ~headers:(Headers.of_list ["content-length", "10"]) `OK
in
let t = create ?config:None in
let body =
request
t
request'
~response_handler:(backpressure_response_handler continue_reading response)
~error_handler:no_error_handler
in
write_request t request';
writer_yielded t;
Body.close_writer body;
reader_ready t;
read_response t response;
yield_reader t (fun () -> reader_woken_up := true);
yield_writer t ignore;
read_string t "five.";
reader_yielded t;
!continue_reading ();
reader_ready ~msg:"Reader wants to read if there's a read scheduled in the body" t;
Alcotest.(check bool) "Reader wakes up if scheduling read" true !reader_woken_up;
writer_yielded t;
;;

let tests =
[ "commit parse after every header line", `Quick, test_commit_parse_after_every_header
; "GET" , `Quick, test_get
Expand All @@ -645,4 +714,6 @@ let tests =
; "Fixed body shuts down writer if connection is not persistent", `Quick, test_fixed_body
; "Fixed body doesn't shut down the writer if connection is persistent",`Quick, test_fixed_body_persistent_connection
; "Client support for upgrading a connection", `Quick, test_client_upgrade
; "test yield when read isn't scheduled", `Quick, test_handling_backpressure_when_read_not_scheduled
; "test yield when read isn't scheduled, reader yields early", `Quick, test_handling_backpressure_when_read_not_scheduled_early_yield
]
Loading

0 comments on commit 40443d4

Please sign in to comment.