diff --git a/lib/client_connection.ml b/lib/client_connection.ml index 443aad5e..6d2acd10 100644 --- a/lib/client_connection.ml +++ b/lib/client_connection.ml @@ -31,12 +31,15 @@ POSSIBILITY OF SUCH DAMAGE. ----------------------------------------------------------------------------*) +open Sexplib.Std + module Reader = Parse.Reader module Writer = Serialize.Writer module Oneshot = struct type error = [ `Malformed_response of string | `Invalid_response_body_length of Response.t | `Exn of exn ] + [@@deriving sexp_of] type response_handler = Response.t -> Body.Reader.t -> unit type error_handler = error -> unit diff --git a/lib/dune b/lib/dune index b513e7cb..36c2fcaf 100644 --- a/lib/dune +++ b/lib/dune @@ -1,6 +1,6 @@ (library (name httpaf) (public_name httpaf) - (libraries - angstrom faraday bigstringaf) + (libraries angstrom faraday bigstringaf sexplib) + (preprocess (pps ppx_sexp_conv)) (flags (:standard -safe-string))) diff --git a/lib/headers.ml b/lib/headers.ml index e28d0e7c..8a108bcb 100644 --- a/lib/headers.ml +++ b/lib/headers.ml @@ -31,10 +31,14 @@ POSSIBILITY OF SUCH DAMAGE. ----------------------------------------------------------------------------*) +open Sexplib.Std -type name = string -type value = string -type t = (name * value) list + +type name = string [@@deriving sexp] +type value = string [@@deriving sexp] +type t = (name * value) list [@@deriving sexp] + +let sexp_of_t t = sexp_of_t (List.rev t) let empty : t = [] @@ -70,6 +74,7 @@ module CI = struct done; !equal_so_far ) + ;; end let ci_equal = CI.equal diff --git a/lib/headers.mli b/lib/headers.mli index 9e5ce8a4..d7988ded 100644 --- a/lib/headers.mli +++ b/lib/headers.mli @@ -1,4 +1,4 @@ -type t +type t [@@deriving sexp] type name = string type value = string diff --git a/lib/httpaf.mli b/lib/httpaf.mli index 25fd3499..89c1ee4f 100644 --- a/lib/httpaf.mli +++ b/lib/httpaf.mli @@ -41,6 +41,8 @@ 1.1 specification, and the basic principles of memory management and vectorized IO. *) + + (** {2 Basic HTTP Types} *) @@ -58,6 +60,7 @@ module Version : sig { major : int (** The major protocol number. *) ; minor : int (** The minor protocol number. *) } + [@@deriving sexp] val compare : t -> t -> int @@ -95,12 +98,14 @@ module Method : sig | `TRACE (** {{:https://tools.ietf.org/html/rfc7231#section-4.3.8} RFC7231§4.3.8}. Safe.*) ] + [@@deriving sexp] type t = [ | standard | `Other of string (** Methods defined outside of RFC7231, or custom methods. *) ] + [@@deriving sexp] val is_safe : standard -> bool (** Request methods are considered "safe" if their defined semantics are @@ -147,6 +152,7 @@ module Status : sig | `Continue | `Switching_protocols ] + [@@deriving sexp] (** The 1xx (Informational) class of status code indicates an interim response for communicating connection status or request progress prior to completing the requested action and sending a final @@ -164,6 +170,7 @@ module Status : sig | `Reset_content | `Partial_content ] + [@@deriving sexp] (** The 2xx (Successful) class of status code indicates that the client's request was successfully received, understood, and accepted. @@ -179,6 +186,7 @@ module Status : sig | `Use_proxy | `Temporary_redirect ] + [@@deriving sexp] (** The 3xx (Redirection) class of status code indicates that further action needs to be taken by the user agent in order to fulfill the request. @@ -209,6 +217,7 @@ module Status : sig | `I_m_a_teapot | `Enhance_your_calm ] + [@@deriving sexp] (** The 4xx (Client Error) class of status code indicates that the client seems to have erred. @@ -223,6 +232,7 @@ module Status : sig | `Gateway_timeout | `Http_version_not_supported ] + [@@deriving sexp] (** The 5xx (Server Error) class of status code indicates that the server is aware that it has erred or is incapable of performing the requested method. @@ -237,11 +247,13 @@ module Status : sig | client_error | server_error ] + [@@deriving sexp] (** The status codes defined in the HTTP 1.1 RFCs *) type t = [ | standard | `Code of int ] + [@@deriving sexp] (** The standard codes along with support for custom codes. *) val default_reason_phrase : standard -> string @@ -320,7 +332,7 @@ end See {{:https://tools.ietf.org/html/rfc7230#section-3.2} RFC7230§3.2} for more details. *) module Headers : sig - type t + type t [@@deriving sexp] type name = string (** The type of a case-insensitive header name. *) @@ -447,6 +459,9 @@ module Body : sig module Reader : sig type t + val create : Bigstringaf.t -> t + (** [create bs] creates a [t] using [bs] as the internal buffer. *) + val schedule_read : t -> on_eof : (unit -> unit) @@ -468,6 +483,9 @@ module Body : sig val is_closed : t -> bool (** [is_closed t] is [true] if {!close} has been called on [t] and [false] otherwise. A closed [t] may still have bytes available for reading. *) + + val unsafe_faraday : t -> Faraday.t + (** [unsafe_faraday t] retrieves the raw Faraday object from [t]. Unsafe. *) end module Writer : sig @@ -527,6 +545,7 @@ module Request : sig ; target : string ; version : Version.t ; headers : Headers.t } + [@@deriving sexp] val create : ?version:Version.t (** default is HTTP 1.1 *) @@ -573,6 +592,7 @@ module Response : sig ; status : Status.t ; reason : string ; headers : Headers.t } + [@@deriving sexp] val create : ?reason:string (** default is determined by {!Status.default_reason_phrase} *) @@ -656,6 +676,8 @@ module Reqd : sig val respond_with_bigstring : t -> Response.t -> Bigstringaf.t -> unit val respond_with_streaming : ?flush_headers_immediately:bool -> t -> Response.t -> Body.Writer.t + val respond_with_upgrade : ?reason:string -> t -> Headers.t -> unit + (** {3 Exception Handling} *) val report_exn : t -> exn -> unit @@ -697,7 +719,7 @@ module Server_connection : sig (** [create ?config ?error_handler ~request_handler] creates a connection handler that will service individual requests with [request_handler]. *) - val next_read_operation : t -> [ `Read | `Yield | `Close ] + val next_read_operation : t -> [ `Read | `Yield | `Close | `Upgrade ] (** [next_read_operation t] returns a value describing the next operation that the caller should conduct on behalf of the connection. *) @@ -724,6 +746,7 @@ module Server_connection : sig val next_write_operation : t -> [ | `Write of Bigstringaf.t IOVec.t list | `Yield + | `Upgrade | `Close of int ] (** [next_write_operation t] returns a value describing the next operation that the caller should conduct on behalf of the connection. *) @@ -774,6 +797,7 @@ module Client_connection : sig type error = [ `Malformed_response of string | `Invalid_response_body_length of Response.t | `Exn of exn ] + [@@deriving sexp_of] type response_handler = Response.t -> Body.Reader.t -> unit diff --git a/lib/method.ml b/lib/method.ml index 46ebe44e..71f56948 100644 --- a/lib/method.ml +++ b/lib/method.ml @@ -31,6 +31,7 @@ POSSIBILITY OF SUCH DAMAGE. ----------------------------------------------------------------------------*) +open Sexplib.Std type standard = [ | `GET @@ -42,11 +43,13 @@ type standard = [ | `OPTIONS | `TRACE ] +[@@deriving sexp] type t = [ | standard | `Other of string ] +[@@deriving sexp] let is_safe = function | `GET | `HEAD | `OPTIONS | `TRACE -> true diff --git a/lib/parse.ml b/lib/parse.ml index 20f5a104..c0009a8b 100644 --- a/lib/parse.ml +++ b/lib/parse.ml @@ -139,7 +139,7 @@ let response = lift4 (fun version status reason headers -> Response.create ~reason ~version ~headers status) (version <* char ' ') - (status <* char ' ') + (status <* option ' ' (char ' ')) (take_till P.is_cr <* eol <* commit) (headers <* eol) diff --git a/lib/reqd.ml b/lib/reqd.ml index b025ed61..4acd453e 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -37,14 +37,17 @@ type error = module Response_state = struct type t = | Waiting + | Upgrade of Response.t | Fixed of Response.t | Streaming of Response.t * Body.Writer.t end module Input_state = struct type t = + | Waiting | Ready | Complete + | Upgraded end module Output_state = struct @@ -52,6 +55,7 @@ module Output_state = struct | Waiting | Ready | Complete + | Upgraded end type error_handler = @@ -111,12 +115,14 @@ let response { response_state; _ } = match response_state with | Waiting -> None | Streaming (response, _) + | Upgrade response | Fixed response -> Some response let response_exn { response_state; _ } = match response_state with | Waiting -> failwith "httpaf.Reqd.response_exn: response has not started" | Streaming (response, _) + | Upgrade response | Fixed response -> response let respond_with_string t response str = @@ -133,6 +139,7 @@ let respond_with_string t response str = Writer.wakeup t.writer; | Streaming _ -> failwith "httpaf.Reqd.respond_with_string: response already started" + | Upgrade _ | Fixed _ -> failwith "httpaf.Reqd.respond_with_string: response already complete" @@ -150,6 +157,7 @@ let respond_with_bigstring t response (bstr:Bigstringaf.t) = Writer.wakeup t.writer; | Streaming _ -> failwith "httpaf.Reqd.respond_with_bigstring: response already started" + | Upgrade _ | Fixed _ -> failwith "httpaf.Reqd.respond_with_bigstring: response already complete" @@ -175,6 +183,7 @@ let unsafe_respond_with_streaming ~flush_headers_immediately t response = response_body | Streaming _ -> failwith "httpaf.Reqd.respond_with_streaming: response already started" + | Upgrade _ | Fixed _ -> failwith "httpaf.Reqd.respond_with_streaming: response already complete" @@ -183,6 +192,23 @@ let respond_with_streaming ?(flush_headers_immediately=false) t response = failwith "httpaf.Reqd.respond_with_streaming: invalid state, currently handling error"; unsafe_respond_with_streaming ~flush_headers_immediately t response +let respond_with_upgrade ?reason t headers = + match t.response_state with + | Waiting -> + if not (Request.is_upgrade t.request) then + failwith "httpaf.Reqd.respond_with_upgrade: request was not an upgrade request" + else ( + let response = Response.create ?reason ~headers `Switching_protocols in + t.response_state <- Upgrade response; + Body.Reader.close t.request_body; + Writer.write_response t.writer response; + Writer.wakeup t.writer); + | Streaming _ -> + failwith "httpaf.Reqd.respond_with_upgrade: response already started" + | Upgrade _ + | Fixed _ -> + failwith "httpaf.Reqd.respond_with_upgrade: response already complete" + let report_error t error = t.persistent <- false; Body.Reader.close t.request_body; @@ -207,7 +233,7 @@ let report_error t error = | Streaming (_response, response_body), `Exn _ -> Body.Writer.close response_body; Writer.close_and_drain t.writer - | (Fixed _ | Streaming _ | Waiting) , _ -> + | (Fixed _ | Streaming _ | Waiting | Upgrade _) , _ -> (* XXX(seliopou): Once additional logging support is added, log the error * in case it is not spurious. *) () @@ -215,7 +241,7 @@ let report_error t error = let report_exn t exn = report_error t (`Exn exn) -let try_with t f : (unit, exn) result = +let try_with t f : (unit, exn) Result.t = try f (); Ok () with exn -> report_exn t exn; Error exn (* Private API, not exposed to the user through httpaf.mli *) @@ -232,21 +258,41 @@ let persistent_connection t = t.persistent let input_state t : Input_state.t = - if Body.Reader.is_closed t.request_body - then Complete - else Ready + let upgrade_status = + match Request.is_upgrade t.request with + | false -> `Not_upgrading + | true -> + match t.response_state with + | Upgrade _ -> `Finished_upgrading + | Fixed _ | Streaming _ -> `Upgrade_declined + | Waiting -> `Upgrade_in_progress + in + match upgrade_status with + | `Finished_upgrading -> Upgraded + | `Not_upgrading | `Upgrade_declined -> + if Body.Reader.is_closed t.request_body + then Complete + else Ready + | `Upgrade_in_progress -> + Waiting ;; let output_state t : Output_state.t = match t.response_state with + | Upgrade _ -> Upgraded | Fixed _ -> Complete | Streaming (_, response_body) -> - if Body.Writer.has_pending_output response_body + if Writer.is_closed t.writer + then Complete + else if Body.Writer.has_pending_output response_body then Ready else if Body.Writer.is_closed response_body then Complete else Waiting - | Waiting -> Waiting + | Waiting -> + if Writer.is_closed t.writer + then Complete + else Waiting ;; let flush_request_body t = diff --git a/lib/request.ml b/lib/request.ml index 158fd4c2..385d4bb1 100644 --- a/lib/request.ml +++ b/lib/request.ml @@ -31,11 +31,14 @@ POSSIBILITY OF SUCH DAMAGE. ----------------------------------------------------------------------------*) +open Sexplib.Std + type t = { meth : Method.t ; target : string ; version : Version.t ; headers : Headers.t } +[@@deriving sexp] let create ?(version=Version.v1_1) ?(headers=Headers.empty) meth target = { meth; target; version; headers } @@ -80,3 +83,8 @@ let persistent_connection ?proxy { version; headers; _ } = let pp_hum fmt { meth; target; version; headers } = Format.fprintf fmt "((method \"%a\") (target %S) (version \"%a\") (headers %a))" Method.pp_hum meth target Version.pp_hum version Headers.pp_hum headers + +let is_upgrade t = + match Headers.get t.headers "Connection" with + | None -> false + | Some header_val -> Headers.ci_equal header_val "upgrade" diff --git a/lib/response.ml b/lib/response.ml index 88161943..ee3bfdb9 100644 --- a/lib/response.ml +++ b/lib/response.ml @@ -31,11 +31,14 @@ POSSIBILITY OF SUCH DAMAGE. ----------------------------------------------------------------------------*) +open Sexplib.Std + type t = { version : Version.t ; status : Status.t ; reason : string ; headers : Headers.t } +[@@deriving sexp] let create ?reason ?(version=Version.v1_1) ?(headers=Headers.empty) status = let reason = diff --git a/lib/serialize.ml b/lib/serialize.ml index 61c22131..76039b9d 100644 --- a/lib/serialize.ml +++ b/lib/serialize.ml @@ -195,4 +195,6 @@ module Writer = struct | `Close -> `Close (drained_bytes t) | `Yield -> `Yield | `Writev iovecs -> `Write iovecs + + let has_pending_output t = Faraday.has_pending_output t.encoder end diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 839a10b3..952cc947 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -139,21 +139,17 @@ let create ?(config=Config.default) ?(error_handler=default_error_handler) reque } let shutdown_reader t = - if is_active t - then Reqd.close_request_body (current_reqd_exn t); Reader.force_close t.reader; - wakeup_reader t + if is_active t + then Reqd.close_request_body (current_reqd_exn t) + else wakeup_reader t let shutdown_writer t = - if is_active t then ( - let reqd = current_reqd_exn t in - (* XXX(dpatti): I'm not sure I understand why we close the *request* body - here. Maybe we can write a test such that removing this line causes it to - fail? *) - Reqd.close_request_body reqd; - Reqd.flush_response_body reqd); + if is_active t then Reqd.flush_response_body (current_reqd_exn t); Writer.close t.writer; - wakeup_writer t + if is_active t + then Reqd.close_request_body (current_reqd_exn t) + else wakeup_writer t let error_code t = if is_active t @@ -162,7 +158,9 @@ let error_code t = let shutdown t = shutdown_reader t; - shutdown_writer t + shutdown_writer t; + wakeup_reader t; + wakeup_writer t let set_error_and_handle ?request t error = if is_active t then begin @@ -225,8 +223,10 @@ let rec _next_read_operation t = ) else ( let reqd = current_reqd_exn t in match Reqd.input_state reqd with + | Waiting -> `Yield | Ready -> Reader.next t.reader | Complete -> _final_read_operation_for t reqd + | Upgraded -> `Upgrade ) and _final_read_operation_for t reqd = @@ -248,6 +248,7 @@ and _final_read_operation_for t reqd = if Reader.is_closed t.reader then Reader.next t.reader else `Yield + | Upgraded -> `Upgrade | Complete -> advance_request_queue t; _next_read_operation t; @@ -258,7 +259,7 @@ let next_read_operation t = match _next_read_operation t with | `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close | `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close - | (`Read | `Yield | `Close) as operation -> operation + | (`Read | `Yield | `Close | `Upgrade) as operation -> operation let rec read_with_more t bs ~off ~len more = let call_handler = Queue.is_empty t.request_queue in @@ -296,17 +297,27 @@ let rec _next_write_operation t = | Ready -> Reqd.flush_response_body reqd; Writer.next t.writer - | Complete -> _final_write_operation_for t reqd + | Complete -> _final_write_operation_for t reqd ~upgrade:false + | Upgraded -> _final_write_operation_for t reqd ~upgrade:true ) -and _final_write_operation_for t reqd = +and _final_write_operation_for t reqd ~upgrade = let next = - if not (Reqd.persistent_connection reqd) then ( + if upgrade then ( + if Writer.has_pending_output t.writer then + (* Even in the Upgrade case, we're still responsible for writing the response + header, so we might have work to do. *) + Writer.next t.writer + else + `Upgrade + ) else if not (Reqd.persistent_connection reqd) then ( shutdown_writer t; Writer.next t.writer; ) else ( match Reqd.input_state reqd with + | Waiting -> `Yield | Ready -> Writer.next t.writer; + | Upgraded -> `Upgrade | Complete -> advance_request_queue t; _next_write_operation t; diff --git a/lib/status.ml b/lib/status.ml index 318e4067..ed5ba39a 100644 --- a/lib/status.ml +++ b/lib/status.ml @@ -31,11 +31,13 @@ POSSIBILITY OF SUCH DAMAGE. ----------------------------------------------------------------------------*) +open Sexplib.Std type informational = [ | `Continue | `Switching_protocols ] +[@@deriving sexp] type successful = [ | `OK @@ -46,6 +48,7 @@ type successful = [ | `Reset_content | `Partial_content ] +[@@deriving sexp] type redirection = [ | `Multiple_choices @@ -56,6 +59,7 @@ type redirection = [ | `Use_proxy | `Temporary_redirect ] +[@@deriving sexp] type client_error = [ | `Bad_request @@ -80,6 +84,7 @@ type client_error = [ | `Enhance_your_calm | `Upgrade_required ] +[@@deriving sexp] type server_error = [ | `Internal_server_error @@ -89,6 +94,7 @@ type server_error = [ | `Gateway_timeout | `Http_version_not_supported ] +[@@deriving sexp] type standard = [ | informational @@ -97,10 +103,12 @@ type standard = [ | client_error | server_error ] +[@@deriving sexp] type t = [ | standard | `Code of int ] +[@@deriving sexp] let default_reason_phrase = function (* Informational *) diff --git a/lib/version.ml b/lib/version.ml index 314b8899..e2442d4a 100644 --- a/lib/version.ml +++ b/lib/version.ml @@ -31,10 +31,12 @@ POSSIBILITY OF SUCH DAMAGE. ----------------------------------------------------------------------------*) +open Sexplib.Std type t = { major : int ; minor : int } +[@@deriving sexp] let v1_0 = { major = 1; minor = 0 } let v1_1 = { major = 1; minor = 1 }