Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support upgrades via I/O operations #159

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion async/httpaf_async.ml
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,29 @@ let read fd buffer =
open Httpaf

module Server = struct
let create_connection_handler ?(config=Config.default) ~request_handler ~error_handler =
module Upgrade = struct
type 'a t =
| Ignore
| Raise
| Handle of (([`Active], 'a) Socket.t -> Httpaf.Request.t -> Httpaf.Response.t -> unit Deferred.t)

let to_handler = function
| Ignore -> (fun socket _request _response -> Fd.close (Socket.fd socket))
| Raise ->
(fun socket _request _response ->
don't_wait_for (Fd.close (Socket.fd socket));
failwith "Upgrades not supported by server")
| Handle handler -> handler
end

let create_connection_handler
?(config=Config.default)
~upgrade_handler
~request_handler
~error_handler
=
fun client_addr socket ->
let upgrade_handler = Upgrade.to_handler upgrade_handler in
let fd = Socket.fd socket in
let writev = Faraday_async.writev_of_fd fd in
let request_handler = request_handler client_addr in
Expand All @@ -101,6 +122,7 @@ module Server = struct
let buffer = Buffer.create config.read_buffer_size in
let rec reader_thread () =
match Server_connection.next_read_operation conn with
| `Upgrade -> ()
| `Read ->
(* Log.Global.printf "read(%d)%!" (Fd.to_int_exn fd); *)
read fd buffer
Expand Down Expand Up @@ -136,6 +158,8 @@ module Server = struct
| `Yield ->
(* Log.Global.printf "write_yield(%d)%!" (Fd.to_int_exn fd); *)
Server_connection.yield_writer conn writer_thread;
| `Upgrade(request, response) ->
upgrade_handler socket request response >>> Ivar.fill write_complete
| `Close _ ->
(* Log.Global.printf "write_close(%d)%!" (Fd.to_int_exn fd); *)
Ivar.fill write_complete ();
Expand Down
10 changes: 8 additions & 2 deletions async/httpaf_async.mli
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
open! Core
open Async

open Httpaf

module Server : sig
module Upgrade : sig
type 'a t =
| Ignore
| Raise
| Handle of (([`Active], 'a) Socket.t -> Request.t -> Response.t -> unit Deferred.t)
end

val create_connection_handler
: ?config : Config.t
-> upgrade_handler : 'a Upgrade.t
-> request_handler : ('a -> Server_connection.request_handler)
-> error_handler : ('a -> Server_connection.error_handler)
-> ([< Socket.Address.t] as 'a)
Expand Down
2 changes: 1 addition & 1 deletion examples/async/async_echo_post.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ let main port max_accepts_per_batch () =
let where_to_listen = Tcp.Where_to_listen.of_port port in
Tcp.(Server.create_sock ~on_handler_error:`Raise
~backlog:10_000 ~max_connections:10_000 ~max_accepts_per_batch where_to_listen)
(Server.create_connection_handler ~request_handler ~error_handler)
(Server.create_connection_handler ~upgrade_handler:Raise ~request_handler ~error_handler)
>>= fun _server ->
Stdio.printf "Listening on port %i and echoing POST requests.\n" port;
Stdio.printf "To send a POST request, try one of the following\n\n";
Expand Down
2 changes: 1 addition & 1 deletion examples/lwt/lwt_echo_post.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ let main port =
Lwt.async (fun () ->
Lwt_io.establish_server_with_client_socket
listen_address
(Server.create_connection_handler ~request_handler ~error_handler)
(Server.create_connection_handler ~upgrade_handler:Raise ~request_handler ~error_handler)
>|= fun _server ->
Stdio.printf "Listening on port %i and echoing POST requests.\n" port;
Stdio.printf "To send a POST request, try one of the following\n\n";
Expand Down
5 changes: 4 additions & 1 deletion lib/httpaf.mli
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,8 @@ module Reqd : sig
val respond_with_bigstring : t -> Response.t -> Bigstringaf.t -> unit
val respond_with_streaming : ?flush_headers_immediately:bool -> t -> Response.t -> [`write] Body.t

val respond_with_upgrade : ?reason:string -> t -> Headers.t -> unit

(** {3 Exception Handling} *)

val report_exn : t -> exn -> unit
Expand Down Expand Up @@ -678,7 +680,7 @@ module Server_connection : sig
(** [create ?config ?error_handler ~request_handler] creates a connection
handler that will service individual requests with [request_handler]. *)

val next_read_operation : t -> [ `Read | `Yield | `Close ]
val next_read_operation : t -> [ `Read | `Yield | `Close | `Upgrade ]
(** [next_read_operation t] returns a value describing the next operation
that the caller should conduct on behalf of the connection. *)

Expand All @@ -705,6 +707,7 @@ module Server_connection : sig
val next_write_operation : t -> [
| `Write of Bigstringaf.t IOVec.t list
| `Yield
| `Upgrade of Request.t * Response.t
| `Close of int ]
(** [next_write_operation t] returns a value describing the next operation
that the caller should conduct on behalf of the connection. *)
Expand Down
90 changes: 69 additions & 21 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,29 @@
type error =
[ `Bad_request | `Bad_gateway | `Internal_server_error | `Exn of exn ]

type response_state =
| Waiting of Optional_thunk.t ref
| Complete of Response.t
| Streaming of Response.t * [`write] Body.t
module Response_state = struct
type t =
| Waiting of Optional_thunk.t ref
| Upgrade of Response.t
| Complete of Response.t
| Streaming of Response.t * [`write] Body.t
end

module Input_state = struct
type t =
| Provide
| Wait
| Complete
| Upgrade
end

module Output_state = struct
type t =
| Ready
| Wait
| Complete
| Upgrade
end

type error_handler =
?request:Request.t -> error -> (Headers.t -> [`write] Body.t) -> unit
Expand Down Expand Up @@ -74,7 +93,7 @@ type t =
; response_body_buffer : Bigstringaf.t
; error_handler : error_handler
; mutable persistent : bool
; mutable response_state : response_state
; mutable response_state : Response_state.t
; mutable error_code : [`Ok | error ]
}

Expand All @@ -101,28 +120,31 @@ let response { response_state; _ } =
match response_state with
| Waiting _ -> None
| Streaming(response, _)
| Complete (response) -> Some response
| Upgrade response
| Complete response -> Some response

let response_exn { response_state; _ } =
match response_state with
| Waiting _ -> failwith "httpaf.Reqd.response_exn: response has not started"
| Streaming(response, _)
| Complete (response) -> response
| Upgrade response
| Complete response -> response

let respond_with_string t response str =
if t.error_code <> `Ok then
failwith "httpaf.Reqd.respond_with_string: invalid state, currently handling error";
match t.response_state with
| Waiting when_done_waiting ->
(* XXX(seliopou): check response body length *)
Writer.write_response t.writer response;
Writer.write_response t.writer response;
Writer.write_string t.writer str;
if t.persistent then
t.persistent <- Response.persistent_connection response;
t.response_state <- Complete response;
done_waiting when_done_waiting
| Streaming _ ->
failwith "httpaf.Reqd.respond_with_string: response already started"
| Upgrade _
| Complete _ ->
failwith "httpaf.Reqd.respond_with_string: response already complete"

Expand All @@ -140,6 +162,7 @@ let respond_with_bigstring t response (bstr:Bigstringaf.t) =
done_waiting when_done_waiting
| Streaming _ ->
failwith "httpaf.Reqd.respond_with_bigstring: response already started"
| Upgrade _
| Complete _ ->
failwith "httpaf.Reqd.respond_with_bigstring: response already complete"

Expand All @@ -156,6 +179,7 @@ let unsafe_respond_with_streaming ~flush_headers_immediately t response =
response_body
| Streaming _ ->
failwith "httpaf.Reqd.respond_with_streaming: response already started"
| Upgrade _
| Complete _ ->
failwith "httpaf.Reqd.respond_with_streaming: response already complete"

Expand All @@ -164,6 +188,19 @@ let respond_with_streaming ?(flush_headers_immediately=false) t response =
failwith "httpaf.Reqd.respond_with_streaming: invalid state, currently handling error";
unsafe_respond_with_streaming ~flush_headers_immediately t response

let respond_with_upgrade ?reason t headers =
match t.response_state with
| Waiting when_done_waiting ->
let response = Response.create ?reason ~headers `Switching_protocols in
t.response_state <- Upgrade response;
Body.close_reader t.request_body;
done_waiting when_done_waiting
| Streaming _ ->
failwith "httpaf.Reqd.respond_with_streaming: response already started"
| Upgrade _
| Complete _ ->
failwith "httpaf.Reqd.respond_with_streaming: response already complete"

let report_error t error =
t.persistent <- false;
Body.close_reader t.request_body;
Expand All @@ -187,7 +224,7 @@ let report_error t error =
| Streaming(_response, response_body), `Exn _ ->
Body.close_writer response_body;
Writer.close_and_drain t.writer
| (Complete _ | Streaming _ | Waiting _) , _ ->
| (Complete _ | Upgrade _ | Streaming _ | Waiting _) , _ ->
(* XXX(seliopou): Once additional logging support is added, log the error
* in case it is not spurious. *)
()
Expand Down Expand Up @@ -216,25 +253,36 @@ let on_more_output_available t f =
when_done_waiting := Optional_thunk.some f
| Streaming(_, response_body) ->
Body.when_ready_to_write response_body f
| Upgrade _
| Complete _ ->
failwith "httpaf.Reqd.on_more_output_available: response already complete"

let persistent_connection t =
t.persistent

let requires_input { request_body; _ } =
not (Body.is_closed request_body)
let input_state t : Input_state.t =
match t.response_state with
| Upgrade _ -> Upgrade
| Waiting _
| Complete _
| Streaming _ ->
if Body.is_closed t.request_body
then Complete
else Provide
;;

let requires_output { response_state; _ } =
match response_state with
| Complete _ -> false
| Streaming (_, response_body) ->
not (Body.is_closed response_body)
|| Body.has_pending_output response_body
| Waiting _ -> true

let is_complete t =
not (requires_input t || requires_output t)
let output_state t : Output_state.t =
match t.response_state with
| Complete _ -> Complete
| Upgrade _ -> Upgrade
| Waiting _ -> Wait
| Streaming(_, response_body) ->
if Body.has_pending_output response_body
then Ready
else if Body.is_closed response_body
then Complete
else Wait
;;

let flush_request_body t =
let request_body = request_body t in
Expand Down
Loading