Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle backpressure when reading incoming data #59

Merged
merged 5 commits into from
Apr 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ Unreleased
([#45](https://github.com/anmonteiro/httpaf/pull/45))
- httpaf: Fix sending streaming error responses; in particular, allow sending
chunk-encoded responses ([#56](https://github.com/anmonteiro/httpaf/pull/56))
- httpaf: handle read backpressure in server and client implementations
([#59](https://github.com/anmonteiro/httpaf/pull/59))

httpaf (upstream) 0.6.5
--------------
Expand Down
19 changes: 18 additions & 1 deletion lib/body.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type _ t =
; mutable write_final_if_chunked : bool
; mutable on_eof : unit -> unit
; mutable on_read : Bigstringaf.t -> off:int -> len:int -> unit
; mutable when_ready_to_read : Optional_thunk.t
; mutable when_ready_to_write : Optional_thunk.t
; buffered_bytes : int ref
}
Expand All @@ -50,6 +51,7 @@ let of_faraday faraday =
; write_final_if_chunked = true
; on_eof = default_on_eof
; on_read = default_on_read
; when_ready_to_read = Optional_thunk.none
; when_ready_to_write = Optional_thunk.none
; buffered_bytes = ref 0
}
Expand Down Expand Up @@ -80,6 +82,11 @@ let write_bigstring t ?off ?len b =
let schedule_bigstring t ?off ?len (b:Bigstringaf.t) =
Faraday.schedule_bigstring ?off ?len t.faraday b

let ready_to_read t =
let callback = t.when_ready_to_read in
t.when_ready_to_read <- Optional_thunk.none;
Optional_thunk.call_if_some callback

let ready_to_write t =
let callback = t.when_ready_to_write in
t.when_ready_to_write <- Optional_thunk.none;
Expand Down Expand Up @@ -128,9 +135,12 @@ let schedule_read t ~on_eof ~on_read =
else begin
t.read_scheduled <- true;
t.on_eof <- on_eof;
t.on_read <- on_read
t.on_read <- on_read;
ready_to_read t;
end

let is_read_scheduled t = t.read_scheduled

let has_pending_output t =
(* Force another write poll to make sure that the final chunk is emitted for
chunk-encoded bodies.
Expand All @@ -150,6 +160,13 @@ let close_reader t =
execute_read t
;;

let when_ready_to_read t callback =
if is_closed t
then callback ()
else if Optional_thunk.is_some t.when_ready_to_read
then failwith "Body.when_ready_to_read: only one callback can be registered at a time"
else t.when_ready_to_read <- Optional_thunk.some callback

let when_ready_to_write t callback =
if is_closed t
then callback ()
Expand Down
25 changes: 23 additions & 2 deletions lib/client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,31 @@ let yield_reader t k =
then failwith "yield_reader on closed conn"
else if Optional_thunk.is_some t.wakeup_reader
then failwith "yield_reader: only one callback can be registered at a time"
else if is_active t then
let respd = current_respd_exn t in
begin match Respd.input_state respd with
| Wait ->
(* `Wait` means that the response body isn't closed yet (there may be
* more incoming bytes) but the response handler hasn't scheduled a read
* either. *)
Respd.on_more_input_available respd k
| Provide -> t.wakeup_reader <- Optional_thunk.some k
| Complete -> k ()
end
else t.wakeup_reader <- Optional_thunk.some k

let wakeup_reader t =
let f = t.wakeup_reader in
t.wakeup_reader <- Optional_thunk.none;
Optional_thunk.call_if_some f

let transfer_reader_callback t respd =
if Optional_thunk.is_some t.wakeup_reader
then (
let f = t.wakeup_reader in
t.wakeup_reader <- Optional_thunk.none;
Respd.on_more_input_available respd (Optional_thunk.unchecked_value f))

let yield_writer t k =
if is_closed t
then failwith "yield_writer on closed conn"
Expand Down Expand Up @@ -207,6 +225,9 @@ let rec _next_read_operation t =
) else (
let respd = current_respd_exn t in
match Respd.input_state respd with
| Wait ->
transfer_reader_callback t respd;
`Yield
| Provide -> Reader.next t.reader
| Complete -> _final_read_operation_for t respd
)
Expand Down Expand Up @@ -255,7 +276,7 @@ let read_eof t bs ~off ~len =
if is_active t then begin
let respd = current_respd_exn t in
match Respd.input_state respd with
| Provide -> unexpected_eof t
| Wait | Provide -> unexpected_eof t
| Complete -> ()
end;
bytes_read
Expand Down Expand Up @@ -283,7 +304,7 @@ and _final_write_operation_for t respd =
Writer.next t.writer;
) else (
match Respd.input_state respd with
| Provide ->
| Wait | Provide ->
(* From RFC7230§6.3.2:
* A client that supports persistent connections MAY "pipeline" its
* requests (i.e., send multiple requests without waiting for each
Expand Down
4 changes: 4 additions & 0 deletions lib/input_state.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type t =
| Provide
| Wait
| Complete
4 changes: 4 additions & 0 deletions lib/output_state.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type t =
| Consume
| Wait
| Complete
13 changes: 6 additions & 7 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@
type error =
[ `Bad_request | `Bad_gateway | `Internal_server_error | `Exn of exn ]

module Input_state = struct
type t =
| Provide
| Complete
end

type error_handler =
?request:Request.t -> error -> (Headers.t -> [`write] Body.t) -> unit

Expand Down Expand Up @@ -234,6 +228,9 @@ let error_code t =
| #error as error -> Some error
| `Ok -> None

let on_more_input_available t f =
Body.when_ready_to_read t.request_body f

let on_more_output_available t f =
Response_state.on_more_output_available t.response_state f

Expand All @@ -243,7 +240,9 @@ let persistent_connection t =
let input_state t : Input_state.t =
if Body.is_closed t.request_body
then Complete
else Provide
else if Body.is_read_scheduled t.request_body
then Provide
else Wait

let output_state t = Response_state.output_state t.response_state

Expand Down
32 changes: 17 additions & 15 deletions lib/respd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,6 @@ module Request_state = struct
| Closed
end

module Input_state = struct
type t =
| Provide
| Complete
end

module Output_state = struct
type t =
| Consume
| Wait
| Complete
end

type t =
{ request : Request.t
; request_body : [ `write ] Body.t
Expand Down Expand Up @@ -74,6 +61,19 @@ let write_request t =
Writer.flush t.writer (fun () ->
t.state <- Awaiting_response)

let on_more_input_available t f =
match t.state with
| Received_response (_, response_body) ->
Body.when_ready_to_read response_body f

(* Don't expect to be called in these states. *)
| Uninitialized
| Awaiting_response ->
failwith "httpaf.Respd.on_more_input_available: response hasn't started"
| Upgraded _
| Closed ->
failwith "httpaf.Respd.on_more_input_available: response already complete"

let on_more_output_available { request_body; _ } f =
Body.when_ready_to_write request_body f

Expand Down Expand Up @@ -112,9 +112,11 @@ let input_state t : Input_state.t =
| Uninitialized
| Awaiting_response -> Provide
| Received_response (_, response_body) ->
if not (Body.is_closed response_body)
if Body.is_closed response_body
then Complete
else if Body.is_read_scheduled response_body
then Provide
else Complete
else Wait
| Upgraded _
| Closed -> Complete

Expand Down
7 changes: 0 additions & 7 deletions lib/response_state.ml
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
module Output_state = struct
type t =
| Consume
| Wait
| Complete
end

type ('handle, 'io) t =
| Waiting of Optional_thunk.t ref
| Complete of Response.t
Expand Down
27 changes: 25 additions & 2 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,19 @@ let yield_reader t k =
then failwith "yield_reader on closed conn"
else if Optional_thunk.is_some t.wakeup_reader
then failwith "yield_reader: only one callback can be registered at a time"
else t.wakeup_reader <- Optional_thunk.some k
else if is_active t then
let reqd = current_reqd_exn t in
begin match Reqd.input_state reqd with
| Wait ->
(* `Wait` means that the request body isn't closed yet (there may be more
* incoming bytes) but the request handler hasn't scheduled a read
* either. *)
Reqd.on_more_input_available reqd k
| Provide -> t.wakeup_reader <- Optional_thunk.some k
| Complete -> k ()
end
else
t.wakeup_reader <- Optional_thunk.some k
;;

let wakeup_reader t =
Expand All @@ -94,6 +106,14 @@ let wakeup_reader t =
Optional_thunk.call_if_some f
;;

let transfer_reader_callback t reqd =
if Optional_thunk.is_some t.wakeup_reader
then (
let f = t.wakeup_reader in
t.wakeup_reader <- Optional_thunk.none;
Reqd.on_more_input_available reqd (Optional_thunk.unchecked_value f))
;;

let yield_writer t k =
if is_closed t
then failwith "yield_writer on closed conn"
Expand Down Expand Up @@ -252,6 +272,9 @@ let rec _next_read_operation t =
) else (
let reqd = current_reqd_exn t in
match Reqd.input_state reqd with
| Wait ->
transfer_reader_callback t reqd;
`Yield
| Provide -> Reader.next t.reader
| Complete -> _final_read_operation_for t reqd
)
Expand Down Expand Up @@ -369,7 +392,7 @@ and _final_write_operation_for t reqd =
Writer.next t.writer;
) else (
match Reqd.input_state reqd with
| Provide -> assert false
| Wait | Provide -> assert false
| Complete ->
advance_request_queue t;
_next_write_operation t;
Expand Down
Loading