From d4f0b5744bd2f574c90f9ac74e398b4c65cbf1d3 Mon Sep 17 00:00:00 2001 From: Antonio Nuno Monteiro Date: Sat, 18 Apr 2020 12:59:07 -0700 Subject: [PATCH] Refactor client_connection response descriptor queue (#52) --- lib/client_connection.ml | 213 ++++++++++++++--------------- lib/respd.ml | 69 ++++++---- lib_test/test_client_connection.ml | 2 +- 3 files changed, 148 insertions(+), 136 deletions(-) diff --git a/lib/client_connection.ml b/lib/client_connection.ml index 47d66ab1..154c645b 100644 --- a/lib/client_connection.ml +++ b/lib/client_connection.ml @@ -76,9 +76,9 @@ let wakeup_reader t = t.wakeup_reader <- Optional_thunk.none; Optional_thunk.call_if_some f -let on_wakeup_writer t k = +let yield_writer t k = if is_closed t - then failwith "on_wakeup_writer on closed conn" + then failwith "yield_writer on closed conn" else if Optional_thunk.is_some t.wakeup_writer then failwith "yield_writer: only one callback can be registered at a time" else t.wakeup_writer <- Optional_thunk.some k @@ -117,25 +117,6 @@ let request t request ~error_handler ~response_handler = request_body ;; -let flush_request_body t = - if is_active t then begin - let respd = current_respd_exn t in - Respd.flush_request_body respd - end - -let set_error_and_handle_without_shutdown t error = - if is_active t then begin - let respd = current_respd_exn t in - Respd.report_error respd error - end - (* If the connection is not active, we could have requested a shutdown + - * closed the communication channel. Nothing to do here. *) -;; - -let unexpected_eof t = - set_error_and_handle_without_shutdown t (`Malformed_response "unexpected eof"); -;; - let shutdown_reader t = Reader.force_close t.reader; if is_active t @@ -143,7 +124,6 @@ let shutdown_reader t = else wakeup_reader t let shutdown_writer t = - flush_request_body t; Writer.close t.writer; if is_active t then begin let respd = current_respd_exn t in @@ -152,6 +132,8 @@ let shutdown_writer t = ;; let shutdown t = + Queue.iter Respd.close_response_body t.request_queue; + Queue.clear t.request_queue; shutdown_reader t; shutdown_writer t; wakeup_reader t; @@ -161,8 +143,15 @@ let shutdown t = (* TODO: Need to check in the RFC if reporting an error, e.g. in a malformed * response causes the whole connection to shutdown. *) let set_error_and_handle t error = - shutdown t; - set_error_and_handle_without_shutdown t error; + if is_active t then begin + let respd = current_respd_exn t in + shutdown t; + Respd.report_error respd error + end +;; + +let unexpected_eof t = + set_error_and_handle t (`Malformed_response "unexpected eof"); ;; let report_exn t exn = @@ -174,70 +163,63 @@ exception Local let maybe_pipeline_queued_requests t = (* Don't bother trying to pipeline if there aren't multiple requests in the * queue. *) - if Queue.length t.request_queue > 1 then begin - match Queue.fold (fun prev respd -> - begin match prev with - | None -> () - | Some prev -> - if respd.Respd.state = Uninitialized && not (Respd.requires_output prev) - then Respd.write_request respd - else - (* bail early. If we can't pipeline this request, we can't write - * next ones either. *) - raise Local - end; - Some respd) - None - t.request_queue + if Queue.length t.request_queue > 1 then + try + let _ = Queue.fold (fun prev respd -> + begin match prev with + | None -> () + | Some prev -> + match respd.Respd.state, Respd.output_state prev with + | Uninitialized, Complete -> + Respd.write_request respd + | _ -> + (* bail early. If we can't pipeline this request, we can't write + * next ones either. *) + raise Local + end; + Some respd) + None + t.request_queue + in () with | _ -> () - | exception Local -> () + +let advance_request_queue t = + ignore (Queue.take t.request_queue); + if not (Queue.is_empty t.request_queue) then begin + (* write request to the wire *) + Respd.write_request (current_respd_exn t); + wakeup_writer t; end -let advance_request_queue_if_necessary t = - if is_active t then begin - let respd = current_respd_exn t in - if Respd.persistent_connection respd then begin - if Respd.is_complete respd then begin - ignore (Queue.take t.request_queue); - if not (Queue.is_empty t.request_queue) then begin - (* write request to the wire *) - let respd = current_respd_exn t in - Respd.write_request respd; - end; - wakeup_writer t; - end else if not (Respd.requires_output respd) then - (* From RFC7230§6.3.2: - * A client that supports persistent connections MAY "pipeline" its - * requests (i.e., send multiple requests without waiting for each - * response). *) - maybe_pipeline_queued_requests t - end else begin - ignore (Queue.take t.request_queue); - Queue.iter Respd.close_response_body t.request_queue; - Queue.clear t.request_queue; - Queue.push respd t.request_queue; - wakeup_writer t; - if Respd.is_complete respd - then shutdown t - else if not (Respd.requires_output respd) - then shutdown_writer t - end - end else if Reader.is_closed t.reader - then shutdown t - -let _next_read_operation t = - advance_request_queue_if_necessary t; - if is_active t then begin +let rec _next_read_operation t = + if not (is_active t) then ( + if Reader.is_closed t.reader + then shutdown t; + Reader.next t.reader + ) else ( let respd = current_respd_exn t in - if Respd.requires_input respd then Reader.next t.reader - else if Respd.persistent_connection respd then `Yield - else begin + match Respd.input_state respd with + | Provide -> Reader.next t.reader + | Complete -> _final_read_operation_for t respd + ) + +and _final_read_operation_for t respd = + let next = + if not (Respd.persistent_connection respd) then ( shutdown_reader t; - Reader.next t.reader - end - end else - Reader.next t.reader + Reader.next t.reader; + ) else ( + match Respd.output_state respd with + | Wait | Consume -> `Yield + | Complete -> + advance_request_queue t; + _next_read_operation t; + ) + in + wakeup_writer t; + next +;; let next_read_operation t = match _next_read_operation t with @@ -265,39 +247,52 @@ let read_eof t bs ~off ~len = let bytes_read = read_with_more t bs ~off ~len Complete in if is_active t then begin let respd = current_respd_exn t in - (* TODO: could just check for `Respd.requires_input`? *) - match respd.state with - | Uninitialized -> assert false - | Received_response _ | Closed | Upgraded _ -> () - | Awaiting_response -> - (* TODO: review this. It makes sense to tear down the connection if an - * unexpected EOF is received. *) - shutdown t; - unexpected_eof t + match Respd.input_state respd with + | Provide -> unexpected_eof t + | Complete -> () end; bytes_read ;; -let next_write_operation t = - advance_request_queue_if_necessary t; - flush_request_body t; - Writer.next t.writer +let rec _next_write_operation t = + if not (is_active t) then ( + if Reader.is_closed t.reader + then shutdown t; + Writer.next t.writer + ) else ( + let respd = current_respd_exn t in + match Respd.output_state respd with + | Wait -> `Yield + | Consume -> + Respd.flush_request_body respd; + Writer.next t.writer + | Complete -> _final_write_operation_for t respd + ) + +and _final_write_operation_for t respd = + let next = + if not (Respd.persistent_connection respd) then ( + shutdown_writer t; + Writer.next t.writer; + ) else ( + match Respd.input_state respd with + | Provide -> + (* From RFC7230§6.3.2: + * A client that supports persistent connections MAY "pipeline" its + * requests (i.e., send multiple requests without waiting for each + * response). *) + maybe_pipeline_queued_requests t; + Writer.next t.writer; + | Complete -> + advance_request_queue t; + _next_write_operation t; + ) + in + wakeup_reader t; + next ;; -let yield_writer t k = - if is_active t then begin - let respd = current_respd_exn t in - if Respd.requires_output respd then - Respd.on_more_output_available respd k - else if Respd.persistent_connection respd then - on_wakeup_writer t k - else begin - (* TODO: call shutdown? *) - Writer.close t.writer; - k () - end - end else - on_wakeup_writer t k +let next_write_operation t = _next_write_operation t let report_write_result t result = Writer.report_result t.writer result diff --git a/lib/respd.ml b/lib/respd.ml index 780539e5..cb221fa0 100644 --- a/lib/respd.ml +++ b/lib/respd.ml @@ -5,12 +5,27 @@ type error = | `Invalid_response_body_length of Response.t | `Exn of exn ] -type state = - | Uninitialized - | Awaiting_response - | Received_response of Response.t * [`read] Body.t - | Upgraded of Response.t - | Closed +module Request_state = struct + type t = + | Uninitialized + | Awaiting_response + | Received_response of Response.t * [`read] Body.t + | Upgraded of Response.t + | Closed +end + +module Input_state = struct + type t = + | Provide + | Complete +end + +module Output_state = struct + type t = + | Consume + | Wait + | Complete +end type t = { request : Request.t @@ -19,8 +34,8 @@ type t = ; error_handler : (error -> unit) ; mutable error_code : [ `Ok | error ] ; writer : Writer.t - ; mutable state : state - ; mutable persistent : bool + ; mutable state : Request_state.t + ; mutable persistent : bool } let create error_handler request request_body writer response_handler = @@ -28,7 +43,7 @@ let create error_handler request request_body writer response_handler = let t = Lazy.force t in if t.persistent then t.persistent <- Response.persistent_connection response; - let next_state = match response.status with + let next_state : Request_state.t = match response.status with | `Switching_protocols -> Upgraded response | _ -> @@ -56,7 +71,8 @@ let request_body { request_body; _ } = request_body let write_request t = Writer.write_request t.writer t.request; - t.state <- Awaiting_response + Writer.flush t.writer (fun () -> + t.state <- Awaiting_response) let on_more_output_available { request_body; _ } f = Body.when_ready_to_write request_body f @@ -89,30 +105,31 @@ let close_response_body t = Body.close_reader response_body | Upgraded _ -> t.state <- Closed -let requires_input t = +let input_state t : Input_state.t = match t.state with - | Uninitialized -> true - | Awaiting_response -> true - | Upgraded _ -> false + | Uninitialized + | Awaiting_response -> Provide | Received_response (_, response_body) -> - not (Body.is_closed response_body) - | Closed -> false + if not (Body.is_closed response_body) + then Provide + else Complete + | Upgraded _ + | Closed -> Complete -let requires_output { request_body; state; _ } = +let output_state { request_body; state; _ } : Output_state.t = match state with | Upgraded _ -> (* XXX(anmonteiro): Connections that have been upgraded "require output" * forever, but outside the HTTP layer, meaning they're permanently * "yielding". For now they need to be explicitly shutdown in order to * transition the response descriptor to the `Closed` state. *) - true + Consume | state -> - state = Uninitialized || - not (Body.is_closed request_body) || - Body.has_pending_output request_body - -let is_complete t = - not (requires_input t || requires_output t) + if state = Uninitialized || + not (Body.is_closed request_body) || + Body.has_pending_output request_body + then Consume + else Complete let flush_request_body { request; request_body; writer; _ } = if Body.has_pending_output request_body then begin @@ -130,6 +147,6 @@ let flush_response_body t = | Received_response(_, response_body) -> try Body.execute_read response_body (* TODO: report_exn *) - with _exn -> - Format.eprintf "EXN@." + with exn -> + Format.eprintf "EXN %S@." (Printexc.to_string exn) (* report_exn t exn *) diff --git a/lib_test/test_client_connection.ml b/lib_test/test_client_connection.ml index e793181e..c933e279 100644 --- a/lib_test/test_client_connection.ml +++ b/lib_test/test_client_connection.ml @@ -460,7 +460,7 @@ let test_input_shrunk () = | _ -> assert false) in Body.close_writer body; - write_request t request'; + write_request t request'; writer_yielded t; reader_ready t; let c = feed_string t "HTTP/1.1 200 OK\r\nDate" in