Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HTTP Upgrades (based on #227) #228

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/headers.ml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ module CI = struct
done;
!equal_so_far
)
;;
end

let ci_equal = CI.equal
Expand Down
13 changes: 12 additions & 1 deletion lib/httpaf.mli
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
1.1 specification, and the basic principles of memory management and
vectorized IO. *)



(** {2 Basic HTTP Types} *)


Expand Down Expand Up @@ -447,6 +449,9 @@ module Body : sig
module Reader : sig
type t

val create : Bigstringaf.t -> t
(** [create bs] creates a [t] using [bs] as the internal buffer. *)

val schedule_read
: t
-> on_eof : (unit -> unit)
Expand All @@ -468,6 +473,9 @@ module Body : sig
val is_closed : t -> bool
(** [is_closed t] is [true] if {!close} has been called on [t] and [false]
otherwise. A closed [t] may still have bytes available for reading. *)

val unsafe_faraday : t -> Faraday.t
(** [unsafe_faraday t] retrieves the raw Faraday object from [t]. Unsafe. *)
end

module Writer : sig
Expand Down Expand Up @@ -656,6 +664,8 @@ module Reqd : sig
val respond_with_bigstring : t -> Response.t -> Bigstringaf.t -> unit
val respond_with_streaming : ?flush_headers_immediately:bool -> t -> Response.t -> Body.Writer.t

val respond_with_upgrade : ?reason:string -> t -> Headers.t -> unit

(** {3 Exception Handling} *)

val report_exn : t -> exn -> unit
Expand Down Expand Up @@ -697,7 +707,7 @@ module Server_connection : sig
(** [create ?config ?error_handler ~request_handler] creates a connection
handler that will service individual requests with [request_handler]. *)

val next_read_operation : t -> [ `Read | `Yield | `Close ]
val next_read_operation : t -> [ `Read | `Yield | `Close | `Upgrade ]
(** [next_read_operation t] returns a value describing the next operation
that the caller should conduct on behalf of the connection. *)

Expand All @@ -724,6 +734,7 @@ module Server_connection : sig
val next_write_operation : t -> [
| `Write of Bigstringaf.t IOVec.t list
| `Yield
| `Upgrade
| `Close of int ]
(** [next_write_operation t] returns a value describing the next operation
that the caller should conduct on behalf of the connection. *)
Expand Down
2 changes: 1 addition & 1 deletion lib/parse.ml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ let response =
lift4 (fun version status reason headers ->
Response.create ~reason ~version ~headers status)
(version <* char ' ')
(status <* char ' ')
(status <* option ' ' (char ' '))
(take_till P.is_cr <* eol <* commit)
(headers <* eol)

Expand Down
60 changes: 53 additions & 7 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,25 @@ type error =
module Response_state = struct
type t =
| Waiting
| Upgrade of Response.t
| Fixed of Response.t
| Streaming of Response.t * Body.Writer.t
end

module Input_state = struct
type t =
| Waiting
| Ready
| Complete
| Upgraded
end

module Output_state = struct
type t =
| Waiting
| Ready
| Complete
| Upgraded
end

type error_handler =
Expand Down Expand Up @@ -111,12 +115,14 @@ let response { response_state; _ } =
match response_state with
| Waiting -> None
| Streaming (response, _)
| Upgrade response
| Fixed response -> Some response

let response_exn { response_state; _ } =
match response_state with
| Waiting -> failwith "httpaf.Reqd.response_exn: response has not started"
| Streaming (response, _)
| Upgrade response
| Fixed response -> response

let respond_with_string t response str =
Expand All @@ -133,6 +139,7 @@ let respond_with_string t response str =
Writer.wakeup t.writer;
| Streaming _ ->
failwith "httpaf.Reqd.respond_with_string: response already started"
| Upgrade _
| Fixed _ ->
failwith "httpaf.Reqd.respond_with_string: response already complete"

Expand All @@ -150,6 +157,7 @@ let respond_with_bigstring t response (bstr:Bigstringaf.t) =
Writer.wakeup t.writer;
| Streaming _ ->
failwith "httpaf.Reqd.respond_with_bigstring: response already started"
| Upgrade _
| Fixed _ ->
failwith "httpaf.Reqd.respond_with_bigstring: response already complete"

Expand All @@ -175,6 +183,7 @@ let unsafe_respond_with_streaming ~flush_headers_immediately t response =
response_body
| Streaming _ ->
failwith "httpaf.Reqd.respond_with_streaming: response already started"
| Upgrade _
| Fixed _ ->
failwith "httpaf.Reqd.respond_with_streaming: response already complete"

Expand All @@ -183,6 +192,23 @@ let respond_with_streaming ?(flush_headers_immediately=false) t response =
failwith "httpaf.Reqd.respond_with_streaming: invalid state, currently handling error";
unsafe_respond_with_streaming ~flush_headers_immediately t response

let respond_with_upgrade ?reason t headers =
match t.response_state with
| Waiting ->
if not (Request.is_upgrade t.request) then
failwith "httpaf.Reqd.respond_with_upgrade: request was not an upgrade request"
else (
let response = Response.create ?reason ~headers `Switching_protocols in
t.response_state <- Upgrade response;
Body.Reader.close t.request_body;
Writer.write_response t.writer response;
Writer.wakeup t.writer);
| Streaming _ ->
failwith "httpaf.Reqd.respond_with_upgrade: response already started"
| Upgrade _
| Fixed _ ->
failwith "httpaf.Reqd.respond_with_upgrade: response already complete"
Comment on lines +206 to +210

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic for @anmonteiro's changes has Streaming | Upgrade grouped together while here it is Upgrade | Fixed

https://github.com/anmonteiro/httpaf/blob/0ddc76b7599a15cf5cc71ae39acf1585f21ed8d5/lib/reqd.ml#L184-L187

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the upgrade response is more similar to the fixed response here than the streaming response.

Streaming has a different error message since you can't assume that the response has completed in this call. Upgrade and Fixed both have concrete payloads that respond in a similar manner so I'd say they both likely to have completed the response.


let report_error t error =
t.persistent <- false;
Body.Reader.close t.request_body;
Expand All @@ -207,15 +233,15 @@ let report_error t error =
| Streaming (_response, response_body), `Exn _ ->
Body.Writer.close response_body;
Writer.close_and_drain t.writer
| (Fixed _ | Streaming _ | Waiting) , _ ->
| (Fixed _ | Streaming _ | Waiting | Upgrade _) , _ ->
(* XXX(seliopou): Once additional logging support is added, log the error
* in case it is not spurious. *)
()

let report_exn t exn =
report_error t (`Exn exn)

let try_with t f : (unit, exn) result =
let try_with t f : (unit, exn) Result.t =
try f (); Ok () with exn -> report_exn t exn; Error exn

(* Private API, not exposed to the user through httpaf.mli *)
Expand All @@ -232,21 +258,41 @@ let persistent_connection t =
t.persistent

let input_state t : Input_state.t =
if Body.Reader.is_closed t.request_body
then Complete
else Ready
let upgrade_status =
match Request.is_upgrade t.request with
| false -> `Not_upgrading
| true ->
match t.response_state with
| Upgrade _ -> `Finished_upgrading
| Fixed _ | Streaming _ -> `Upgrade_declined
| Waiting -> `Upgrade_in_progress
in
match upgrade_status with
| `Finished_upgrading -> Upgraded
| `Not_upgrading | `Upgrade_declined ->
if Body.Reader.is_closed t.request_body
then Complete
else Ready
| `Upgrade_in_progress ->
Waiting
;;

let output_state t : Output_state.t =
match t.response_state with
| Upgrade _ -> Upgraded
| Fixed _ -> Complete
| Streaming (_, response_body) ->
if Body.Writer.has_pending_output response_body
if Writer.is_closed t.writer
then Complete
else if Body.Writer.has_pending_output response_body
then Ready
else if Body.Writer.is_closed response_body
then Complete
else Waiting
| Waiting -> Waiting
| Waiting ->
if Writer.is_closed t.writer
then Complete
else Waiting
;;

let flush_request_body t =
Expand Down
5 changes: 5 additions & 0 deletions lib/request.ml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,8 @@ let persistent_connection ?proxy { version; headers; _ } =
let pp_hum fmt { meth; target; version; headers } =
Format.fprintf fmt "((method \"%a\") (target %S) (version \"%a\") (headers %a))"
Method.pp_hum meth target Version.pp_hum version Headers.pp_hum headers

let is_upgrade t =
match Headers.get t.headers "Connection" with
| None -> false
| Some header_val -> Headers.ci_equal header_val "upgrade"
2 changes: 2 additions & 0 deletions lib/serialize.ml
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,6 @@ module Writer = struct
| `Close -> `Close (drained_bytes t)
| `Yield -> `Yield
| `Writev iovecs -> `Write iovecs

let has_pending_output t = Faraday.has_pending_output t.encoder
end
43 changes: 27 additions & 16 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,17 @@ let create ?(config=Config.default) ?(error_handler=default_error_handler) reque
}

let shutdown_reader t =
if is_active t
then Reqd.close_request_body (current_reqd_exn t);
Reader.force_close t.reader;
wakeup_reader t
if is_active t
then Reqd.close_request_body (current_reqd_exn t)
else wakeup_reader t

let shutdown_writer t =
if is_active t then (
let reqd = current_reqd_exn t in
(* XXX(dpatti): I'm not sure I understand why we close the *request* body
here. Maybe we can write a test such that removing this line causes it to
fail? *)
Reqd.close_request_body reqd;
Reqd.flush_response_body reqd);
if is_active t then Reqd.flush_response_body (current_reqd_exn t);
Writer.close t.writer;
wakeup_writer t
if is_active t
then Reqd.close_request_body (current_reqd_exn t)
else wakeup_writer t
Comment on lines +148 to +152

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the change in ordering important here? It seems like it is doing a second if is_active t check to close the request body in a different order to previously.


let error_code t =
if is_active t
Expand All @@ -162,7 +158,9 @@ let error_code t =

let shutdown t =
shutdown_reader t;
shutdown_writer t
shutdown_writer t;
wakeup_reader t;
wakeup_writer t

let set_error_and_handle ?request t error =
if is_active t then begin
Expand Down Expand Up @@ -225,8 +223,10 @@ let rec _next_read_operation t =
) else (
let reqd = current_reqd_exn t in
match Reqd.input_state reqd with
| Waiting -> `Yield
| Ready -> Reader.next t.reader
| Complete -> _final_read_operation_for t reqd
| Upgraded -> `Upgrade
)

and _final_read_operation_for t reqd =
Expand All @@ -248,6 +248,7 @@ and _final_read_operation_for t reqd =
if Reader.is_closed t.reader
then Reader.next t.reader
else `Yield
| Upgraded -> `Upgrade
| Complete ->
advance_request_queue t;
_next_read_operation t;
Expand All @@ -258,7 +259,7 @@ let next_read_operation t =
match _next_read_operation t with
| `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close
| `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close
| (`Read | `Yield | `Close) as operation -> operation
| (`Read | `Yield | `Close | `Upgrade) as operation -> operation

let rec read_with_more t bs ~off ~len more =
let call_handler = Queue.is_empty t.request_queue in
Expand Down Expand Up @@ -296,17 +297,27 @@ let rec _next_write_operation t =
| Ready ->
Reqd.flush_response_body reqd;
Writer.next t.writer
| Complete -> _final_write_operation_for t reqd
| Complete -> _final_write_operation_for t reqd ~upgrade:false
| Upgraded -> _final_write_operation_for t reqd ~upgrade:true
)

and _final_write_operation_for t reqd =
and _final_write_operation_for t reqd ~upgrade =
let next =
if not (Reqd.persistent_connection reqd) then (
if upgrade then (
if Writer.has_pending_output t.writer then
(* Even in the Upgrade case, we're still responsible for writing the response
header, so we might have work to do. *)
Writer.next t.writer
else
`Upgrade
) else if not (Reqd.persistent_connection reqd) then (
shutdown_writer t;
Writer.next t.writer;
) else (
match Reqd.input_state reqd with
| Waiting -> `Yield
| Ready -> Writer.next t.writer;
| Upgraded -> `Upgrade
| Complete ->
advance_request_queue t;
_next_write_operation t;
Expand Down
Loading