diff --git a/lib/headers.ml b/lib/headers.ml index e28d0e7c..6c4707d4 100644 --- a/lib/headers.ml +++ b/lib/headers.ml @@ -70,6 +70,7 @@ module CI = struct done; !equal_so_far ) + ;; end let ci_equal = CI.equal diff --git a/lib/httpaf.mli b/lib/httpaf.mli index 25fd3499..5139a46d 100644 --- a/lib/httpaf.mli +++ b/lib/httpaf.mli @@ -41,6 +41,8 @@ 1.1 specification, and the basic principles of memory management and vectorized IO. *) + + (** {2 Basic HTTP Types} *) @@ -447,6 +449,9 @@ module Body : sig module Reader : sig type t + val create : Bigstringaf.t -> t + (** [create bs] creates a [t] using [bs] as the internal buffer. *) + val schedule_read : t -> on_eof : (unit -> unit) @@ -468,6 +473,9 @@ module Body : sig val is_closed : t -> bool (** [is_closed t] is [true] if {!close} has been called on [t] and [false] otherwise. A closed [t] may still have bytes available for reading. *) + + val unsafe_faraday : t -> Faraday.t + (** [unsafe_faraday t] retrieves the raw Faraday object from [t]. Unsafe. *) end module Writer : sig @@ -656,6 +664,8 @@ module Reqd : sig val respond_with_bigstring : t -> Response.t -> Bigstringaf.t -> unit val respond_with_streaming : ?flush_headers_immediately:bool -> t -> Response.t -> Body.Writer.t + val respond_with_upgrade : ?reason:string -> t -> Headers.t -> unit + (** {3 Exception Handling} *) val report_exn : t -> exn -> unit @@ -697,7 +707,7 @@ module Server_connection : sig (** [create ?config ?error_handler ~request_handler] creates a connection handler that will service individual requests with [request_handler]. *) - val next_read_operation : t -> [ `Read | `Yield | `Close ] + val next_read_operation : t -> [ `Read | `Yield | `Close | `Upgrade ] (** [next_read_operation t] returns a value describing the next operation that the caller should conduct on behalf of the connection. *) @@ -724,6 +734,7 @@ module Server_connection : sig val next_write_operation : t -> [ | `Write of Bigstringaf.t IOVec.t list | `Yield + | `Upgrade | `Close of int ] (** [next_write_operation t] returns a value describing the next operation that the caller should conduct on behalf of the connection. *) diff --git a/lib/parse.ml b/lib/parse.ml index 20f5a104..c0009a8b 100644 --- a/lib/parse.ml +++ b/lib/parse.ml @@ -139,7 +139,7 @@ let response = lift4 (fun version status reason headers -> Response.create ~reason ~version ~headers status) (version <* char ' ') - (status <* char ' ') + (status <* option ' ' (char ' ')) (take_till P.is_cr <* eol <* commit) (headers <* eol) diff --git a/lib/reqd.ml b/lib/reqd.ml index b025ed61..4acd453e 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -37,14 +37,17 @@ type error = module Response_state = struct type t = | Waiting + | Upgrade of Response.t | Fixed of Response.t | Streaming of Response.t * Body.Writer.t end module Input_state = struct type t = + | Waiting | Ready | Complete + | Upgraded end module Output_state = struct @@ -52,6 +55,7 @@ module Output_state = struct | Waiting | Ready | Complete + | Upgraded end type error_handler = @@ -111,12 +115,14 @@ let response { response_state; _ } = match response_state with | Waiting -> None | Streaming (response, _) + | Upgrade response | Fixed response -> Some response let response_exn { response_state; _ } = match response_state with | Waiting -> failwith "httpaf.Reqd.response_exn: response has not started" | Streaming (response, _) + | Upgrade response | Fixed response -> response let respond_with_string t response str = @@ -133,6 +139,7 @@ let respond_with_string t response str = Writer.wakeup t.writer; | Streaming _ -> failwith "httpaf.Reqd.respond_with_string: response already started" + | Upgrade _ | Fixed _ -> failwith "httpaf.Reqd.respond_with_string: response already complete" @@ -150,6 +157,7 @@ let respond_with_bigstring t response (bstr:Bigstringaf.t) = Writer.wakeup t.writer; | Streaming _ -> failwith "httpaf.Reqd.respond_with_bigstring: response already started" + | Upgrade _ | Fixed _ -> failwith "httpaf.Reqd.respond_with_bigstring: response already complete" @@ -175,6 +183,7 @@ let unsafe_respond_with_streaming ~flush_headers_immediately t response = response_body | Streaming _ -> failwith "httpaf.Reqd.respond_with_streaming: response already started" + | Upgrade _ | Fixed _ -> failwith "httpaf.Reqd.respond_with_streaming: response already complete" @@ -183,6 +192,23 @@ let respond_with_streaming ?(flush_headers_immediately=false) t response = failwith "httpaf.Reqd.respond_with_streaming: invalid state, currently handling error"; unsafe_respond_with_streaming ~flush_headers_immediately t response +let respond_with_upgrade ?reason t headers = + match t.response_state with + | Waiting -> + if not (Request.is_upgrade t.request) then + failwith "httpaf.Reqd.respond_with_upgrade: request was not an upgrade request" + else ( + let response = Response.create ?reason ~headers `Switching_protocols in + t.response_state <- Upgrade response; + Body.Reader.close t.request_body; + Writer.write_response t.writer response; + Writer.wakeup t.writer); + | Streaming _ -> + failwith "httpaf.Reqd.respond_with_upgrade: response already started" + | Upgrade _ + | Fixed _ -> + failwith "httpaf.Reqd.respond_with_upgrade: response already complete" + let report_error t error = t.persistent <- false; Body.Reader.close t.request_body; @@ -207,7 +233,7 @@ let report_error t error = | Streaming (_response, response_body), `Exn _ -> Body.Writer.close response_body; Writer.close_and_drain t.writer - | (Fixed _ | Streaming _ | Waiting) , _ -> + | (Fixed _ | Streaming _ | Waiting | Upgrade _) , _ -> (* XXX(seliopou): Once additional logging support is added, log the error * in case it is not spurious. *) () @@ -215,7 +241,7 @@ let report_error t error = let report_exn t exn = report_error t (`Exn exn) -let try_with t f : (unit, exn) result = +let try_with t f : (unit, exn) Result.t = try f (); Ok () with exn -> report_exn t exn; Error exn (* Private API, not exposed to the user through httpaf.mli *) @@ -232,21 +258,41 @@ let persistent_connection t = t.persistent let input_state t : Input_state.t = - if Body.Reader.is_closed t.request_body - then Complete - else Ready + let upgrade_status = + match Request.is_upgrade t.request with + | false -> `Not_upgrading + | true -> + match t.response_state with + | Upgrade _ -> `Finished_upgrading + | Fixed _ | Streaming _ -> `Upgrade_declined + | Waiting -> `Upgrade_in_progress + in + match upgrade_status with + | `Finished_upgrading -> Upgraded + | `Not_upgrading | `Upgrade_declined -> + if Body.Reader.is_closed t.request_body + then Complete + else Ready + | `Upgrade_in_progress -> + Waiting ;; let output_state t : Output_state.t = match t.response_state with + | Upgrade _ -> Upgraded | Fixed _ -> Complete | Streaming (_, response_body) -> - if Body.Writer.has_pending_output response_body + if Writer.is_closed t.writer + then Complete + else if Body.Writer.has_pending_output response_body then Ready else if Body.Writer.is_closed response_body then Complete else Waiting - | Waiting -> Waiting + | Waiting -> + if Writer.is_closed t.writer + then Complete + else Waiting ;; let flush_request_body t = diff --git a/lib/request.ml b/lib/request.ml index 158fd4c2..204a2000 100644 --- a/lib/request.ml +++ b/lib/request.ml @@ -80,3 +80,8 @@ let persistent_connection ?proxy { version; headers; _ } = let pp_hum fmt { meth; target; version; headers } = Format.fprintf fmt "((method \"%a\") (target %S) (version \"%a\") (headers %a))" Method.pp_hum meth target Version.pp_hum version Headers.pp_hum headers + +let is_upgrade t = + match Headers.get t.headers "Connection" with + | None -> false + | Some header_val -> Headers.ci_equal header_val "upgrade" diff --git a/lib/serialize.ml b/lib/serialize.ml index 61c22131..76039b9d 100644 --- a/lib/serialize.ml +++ b/lib/serialize.ml @@ -195,4 +195,6 @@ module Writer = struct | `Close -> `Close (drained_bytes t) | `Yield -> `Yield | `Writev iovecs -> `Write iovecs + + let has_pending_output t = Faraday.has_pending_output t.encoder end diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 839a10b3..952cc947 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -139,21 +139,17 @@ let create ?(config=Config.default) ?(error_handler=default_error_handler) reque } let shutdown_reader t = - if is_active t - then Reqd.close_request_body (current_reqd_exn t); Reader.force_close t.reader; - wakeup_reader t + if is_active t + then Reqd.close_request_body (current_reqd_exn t) + else wakeup_reader t let shutdown_writer t = - if is_active t then ( - let reqd = current_reqd_exn t in - (* XXX(dpatti): I'm not sure I understand why we close the *request* body - here. Maybe we can write a test such that removing this line causes it to - fail? *) - Reqd.close_request_body reqd; - Reqd.flush_response_body reqd); + if is_active t then Reqd.flush_response_body (current_reqd_exn t); Writer.close t.writer; - wakeup_writer t + if is_active t + then Reqd.close_request_body (current_reqd_exn t) + else wakeup_writer t let error_code t = if is_active t @@ -162,7 +158,9 @@ let error_code t = let shutdown t = shutdown_reader t; - shutdown_writer t + shutdown_writer t; + wakeup_reader t; + wakeup_writer t let set_error_and_handle ?request t error = if is_active t then begin @@ -225,8 +223,10 @@ let rec _next_read_operation t = ) else ( let reqd = current_reqd_exn t in match Reqd.input_state reqd with + | Waiting -> `Yield | Ready -> Reader.next t.reader | Complete -> _final_read_operation_for t reqd + | Upgraded -> `Upgrade ) and _final_read_operation_for t reqd = @@ -248,6 +248,7 @@ and _final_read_operation_for t reqd = if Reader.is_closed t.reader then Reader.next t.reader else `Yield + | Upgraded -> `Upgrade | Complete -> advance_request_queue t; _next_read_operation t; @@ -258,7 +259,7 @@ let next_read_operation t = match _next_read_operation t with | `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close | `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close - | (`Read | `Yield | `Close) as operation -> operation + | (`Read | `Yield | `Close | `Upgrade) as operation -> operation let rec read_with_more t bs ~off ~len more = let call_handler = Queue.is_empty t.request_queue in @@ -296,17 +297,27 @@ let rec _next_write_operation t = | Ready -> Reqd.flush_response_body reqd; Writer.next t.writer - | Complete -> _final_write_operation_for t reqd + | Complete -> _final_write_operation_for t reqd ~upgrade:false + | Upgraded -> _final_write_operation_for t reqd ~upgrade:true ) -and _final_write_operation_for t reqd = +and _final_write_operation_for t reqd ~upgrade = let next = - if not (Reqd.persistent_connection reqd) then ( + if upgrade then ( + if Writer.has_pending_output t.writer then + (* Even in the Upgrade case, we're still responsible for writing the response + header, so we might have work to do. *) + Writer.next t.writer + else + `Upgrade + ) else if not (Reqd.persistent_connection reqd) then ( shutdown_writer t; Writer.next t.writer; ) else ( match Reqd.input_state reqd with + | Waiting -> `Yield | Ready -> Writer.next t.writer; + | Upgraded -> `Upgrade | Complete -> advance_request_queue t; _next_write_operation t;