Skip to content

Commit

Permalink
Refactor client_connection response descriptor queue (inhabitedtype#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
anmonteiro committed May 11, 2020
1 parent e6fb66f commit d4f0b57
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 136 deletions.
213 changes: 104 additions & 109 deletions lib/client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ let wakeup_reader t =
t.wakeup_reader <- Optional_thunk.none;
Optional_thunk.call_if_some f

let on_wakeup_writer t k =
let yield_writer t k =
if is_closed t
then failwith "on_wakeup_writer on closed conn"
then failwith "yield_writer on closed conn"
else if Optional_thunk.is_some t.wakeup_writer
then failwith "yield_writer: only one callback can be registered at a time"
else t.wakeup_writer <- Optional_thunk.some k
Expand Down Expand Up @@ -117,33 +117,13 @@ let request t request ~error_handler ~response_handler =
request_body
;;

let flush_request_body t =
if is_active t then begin
let respd = current_respd_exn t in
Respd.flush_request_body respd
end

let set_error_and_handle_without_shutdown t error =
if is_active t then begin
let respd = current_respd_exn t in
Respd.report_error respd error
end
(* If the connection is not active, we could have requested a shutdown +
* closed the communication channel. Nothing to do here. *)
;;

let unexpected_eof t =
set_error_and_handle_without_shutdown t (`Malformed_response "unexpected eof");
;;

let shutdown_reader t =
Reader.force_close t.reader;
if is_active t
then Respd.close_response_body (current_respd_exn t)
else wakeup_reader t

let shutdown_writer t =
flush_request_body t;
Writer.close t.writer;
if is_active t then begin
let respd = current_respd_exn t in
Expand All @@ -152,6 +132,8 @@ let shutdown_writer t =
;;

let shutdown t =
Queue.iter Respd.close_response_body t.request_queue;
Queue.clear t.request_queue;
shutdown_reader t;
shutdown_writer t;
wakeup_reader t;
Expand All @@ -161,8 +143,15 @@ let shutdown t =
(* TODO: Need to check in the RFC if reporting an error, e.g. in a malformed
* response causes the whole connection to shutdown. *)
let set_error_and_handle t error =
shutdown t;
set_error_and_handle_without_shutdown t error;
if is_active t then begin
let respd = current_respd_exn t in
shutdown t;
Respd.report_error respd error
end
;;

let unexpected_eof t =
set_error_and_handle t (`Malformed_response "unexpected eof");
;;

let report_exn t exn =
Expand All @@ -174,70 +163,63 @@ exception Local
let maybe_pipeline_queued_requests t =
(* Don't bother trying to pipeline if there aren't multiple requests in the
* queue. *)
if Queue.length t.request_queue > 1 then begin
match Queue.fold (fun prev respd ->
begin match prev with
| None -> ()
| Some prev ->
if respd.Respd.state = Uninitialized && not (Respd.requires_output prev)
then Respd.write_request respd
else
(* bail early. If we can't pipeline this request, we can't write
* next ones either. *)
raise Local
end;
Some respd)
None
t.request_queue
if Queue.length t.request_queue > 1 then
try
let _ = Queue.fold (fun prev respd ->
begin match prev with
| None -> ()
| Some prev ->
match respd.Respd.state, Respd.output_state prev with
| Uninitialized, Complete ->
Respd.write_request respd
| _ ->
(* bail early. If we can't pipeline this request, we can't write
* next ones either. *)
raise Local
end;
Some respd)
None
t.request_queue
in ()
with
| _ -> ()
| exception Local -> ()

let advance_request_queue t =
ignore (Queue.take t.request_queue);
if not (Queue.is_empty t.request_queue) then begin
(* write request to the wire *)
Respd.write_request (current_respd_exn t);
wakeup_writer t;
end

let advance_request_queue_if_necessary t =
if is_active t then begin
let respd = current_respd_exn t in
if Respd.persistent_connection respd then begin
if Respd.is_complete respd then begin
ignore (Queue.take t.request_queue);
if not (Queue.is_empty t.request_queue) then begin
(* write request to the wire *)
let respd = current_respd_exn t in
Respd.write_request respd;
end;
wakeup_writer t;
end else if not (Respd.requires_output respd) then
(* From RFC7230§6.3.2:
* A client that supports persistent connections MAY "pipeline" its
* requests (i.e., send multiple requests without waiting for each
* response). *)
maybe_pipeline_queued_requests t
end else begin
ignore (Queue.take t.request_queue);
Queue.iter Respd.close_response_body t.request_queue;
Queue.clear t.request_queue;
Queue.push respd t.request_queue;
wakeup_writer t;
if Respd.is_complete respd
then shutdown t
else if not (Respd.requires_output respd)
then shutdown_writer t
end
end else if Reader.is_closed t.reader
then shutdown t

let _next_read_operation t =
advance_request_queue_if_necessary t;
if is_active t then begin
let rec _next_read_operation t =
if not (is_active t) then (
if Reader.is_closed t.reader
then shutdown t;
Reader.next t.reader
) else (
let respd = current_respd_exn t in
if Respd.requires_input respd then Reader.next t.reader
else if Respd.persistent_connection respd then `Yield
else begin
match Respd.input_state respd with
| Provide -> Reader.next t.reader
| Complete -> _final_read_operation_for t respd
)

and _final_read_operation_for t respd =
let next =
if not (Respd.persistent_connection respd) then (
shutdown_reader t;
Reader.next t.reader
end
end else
Reader.next t.reader
Reader.next t.reader;
) else (
match Respd.output_state respd with
| Wait | Consume -> `Yield
| Complete ->
advance_request_queue t;
_next_read_operation t;
)
in
wakeup_writer t;
next
;;

let next_read_operation t =
match _next_read_operation t with
Expand Down Expand Up @@ -265,39 +247,52 @@ let read_eof t bs ~off ~len =
let bytes_read = read_with_more t bs ~off ~len Complete in
if is_active t then begin
let respd = current_respd_exn t in
(* TODO: could just check for `Respd.requires_input`? *)
match respd.state with
| Uninitialized -> assert false
| Received_response _ | Closed | Upgraded _ -> ()
| Awaiting_response ->
(* TODO: review this. It makes sense to tear down the connection if an
* unexpected EOF is received. *)
shutdown t;
unexpected_eof t
match Respd.input_state respd with
| Provide -> unexpected_eof t
| Complete -> ()
end;
bytes_read
;;

let next_write_operation t =
advance_request_queue_if_necessary t;
flush_request_body t;
Writer.next t.writer
let rec _next_write_operation t =
if not (is_active t) then (
if Reader.is_closed t.reader
then shutdown t;
Writer.next t.writer
) else (
let respd = current_respd_exn t in
match Respd.output_state respd with
| Wait -> `Yield
| Consume ->
Respd.flush_request_body respd;
Writer.next t.writer
| Complete -> _final_write_operation_for t respd
)

and _final_write_operation_for t respd =
let next =
if not (Respd.persistent_connection respd) then (
shutdown_writer t;
Writer.next t.writer;
) else (
match Respd.input_state respd with
| Provide ->
(* From RFC7230§6.3.2:
* A client that supports persistent connections MAY "pipeline" its
* requests (i.e., send multiple requests without waiting for each
* response). *)
maybe_pipeline_queued_requests t;
Writer.next t.writer;
| Complete ->
advance_request_queue t;
_next_write_operation t;
)
in
wakeup_reader t;
next
;;

let yield_writer t k =
if is_active t then begin
let respd = current_respd_exn t in
if Respd.requires_output respd then
Respd.on_more_output_available respd k
else if Respd.persistent_connection respd then
on_wakeup_writer t k
else begin
(* TODO: call shutdown? *)
Writer.close t.writer;
k ()
end
end else
on_wakeup_writer t k
let next_write_operation t = _next_write_operation t

let report_write_result t result =
Writer.report_result t.writer result
69 changes: 43 additions & 26 deletions lib/respd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,27 @@ type error =
| `Invalid_response_body_length of Response.t
| `Exn of exn ]

type state =
| Uninitialized
| Awaiting_response
| Received_response of Response.t * [`read] Body.t
| Upgraded of Response.t
| Closed
module Request_state = struct
type t =
| Uninitialized
| Awaiting_response
| Received_response of Response.t * [`read] Body.t
| Upgraded of Response.t
| Closed
end

module Input_state = struct
type t =
| Provide
| Complete
end

module Output_state = struct
type t =
| Consume
| Wait
| Complete
end

type t =
{ request : Request.t
Expand All @@ -19,16 +34,16 @@ type t =
; error_handler : (error -> unit)
; mutable error_code : [ `Ok | error ]
; writer : Writer.t
; mutable state : state
; mutable persistent : bool
; mutable state : Request_state.t
; mutable persistent : bool
}

let create error_handler request request_body writer response_handler =
let rec handler response body =
let t = Lazy.force t in
if t.persistent then
t.persistent <- Response.persistent_connection response;
let next_state = match response.status with
let next_state : Request_state.t = match response.status with
| `Switching_protocols ->
Upgraded response
| _ ->
Expand Down Expand Up @@ -56,7 +71,8 @@ let request_body { request_body; _ } = request_body

let write_request t =
Writer.write_request t.writer t.request;
t.state <- Awaiting_response
Writer.flush t.writer (fun () ->
t.state <- Awaiting_response)

let on_more_output_available { request_body; _ } f =
Body.when_ready_to_write request_body f
Expand Down Expand Up @@ -89,30 +105,31 @@ let close_response_body t =
Body.close_reader response_body
| Upgraded _ -> t.state <- Closed

let requires_input t =
let input_state t : Input_state.t =
match t.state with
| Uninitialized -> true
| Awaiting_response -> true
| Upgraded _ -> false
| Uninitialized
| Awaiting_response -> Provide
| Received_response (_, response_body) ->
not (Body.is_closed response_body)
| Closed -> false
if not (Body.is_closed response_body)
then Provide
else Complete
| Upgraded _
| Closed -> Complete

let requires_output { request_body; state; _ } =
let output_state { request_body; state; _ } : Output_state.t =
match state with
| Upgraded _ ->
(* XXX(anmonteiro): Connections that have been upgraded "require output"
* forever, but outside the HTTP layer, meaning they're permanently
* "yielding". For now they need to be explicitly shutdown in order to
* transition the response descriptor to the `Closed` state. *)
true
Consume
| state ->
state = Uninitialized ||
not (Body.is_closed request_body) ||
Body.has_pending_output request_body

let is_complete t =
not (requires_input t || requires_output t)
if state = Uninitialized ||
not (Body.is_closed request_body) ||
Body.has_pending_output request_body
then Consume
else Complete

let flush_request_body { request; request_body; writer; _ } =
if Body.has_pending_output request_body then begin
Expand All @@ -130,6 +147,6 @@ let flush_response_body t =
| Received_response(_, response_body) ->
try Body.execute_read response_body
(* TODO: report_exn *)
with _exn ->
Format.eprintf "EXN@."
with exn ->
Format.eprintf "EXN %S@." (Printexc.to_string exn)
(* report_exn t exn *)
Loading

0 comments on commit d4f0b57

Please sign in to comment.