Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
anmonteiro committed Aug 24, 2024
1 parent a200351 commit 4bf38e7
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 56 deletions.
30 changes: 16 additions & 14 deletions lib/parse.ml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
type t =
{ payload_length: int
; is_fin: bool
; mask: int32 option
; opcode: Websocket.Opcode.t
}
{ payload_length: int
; is_fin: bool
; mask: int32 option
; opcode: Websocket.Opcode.t
}

let is_fin headers =
let bits = Bigstringaf.unsafe_get headers 0 |> Char.code in
Expand Down Expand Up @@ -159,23 +159,22 @@ module Reader = struct
t.wakeup <- Optional_thunk.none;
Optional_thunk.call_if_some f

let create frame_handler ~on_payload_eof =
let create frame_handler =
let rec parser t =
let open Angstrom in
let buf = Bigstringaf.create 0x1000 in
skip_many
(frame <* commit >>= fun frame ->
let { is_fin; opcode; payload_length; _ } = frame in
let { payload_length; _ } = frame in
let payload =
match payload_length with
| 0 -> Payload.create_empty ~on_payload_eof
| 0 -> Payload.create_empty ()
| _ ->
Payload.create buf
~when_ready_to_read:(Optional_thunk.some (fun () ->
wakeup (Lazy.force t)))
~on_payload_eof
in
frame_handler ~opcode ~is_fin ~len:payload_length payload;
frame_handler frame payload;
payload_parser frame payload)
and t = lazy (
{ parser = parser t
Expand Down Expand Up @@ -235,12 +234,15 @@ module Reader = struct
end;
consumed

let force_close t =
t.closed <- true;
;;

let next t =
match t.parse_state with
| Done ->
if t.closed
then `Close
else `Read
| Fail failure -> `Error failure
| _ when t.closed -> `Close
| Done -> `Read
| Partial _ -> `Read
;;
end
21 changes: 14 additions & 7 deletions lib/payload.ml
Original file line number Diff line number Diff line change
Expand Up @@ -41,31 +41,27 @@ module IOVec = Httpun.IOVec
; mutable eof_has_been_called : bool
; mutable on_read : Bigstringaf.t -> off:int -> len:int -> unit
; when_ready_to_read : Optional_thunk.t
; on_payload_eof : unit -> unit
}

let default_on_eof = Sys.opaque_identity (fun () -> ())
let default_on_read = Sys.opaque_identity (fun _ ~off:_ ~len:_ -> ())

let create buffer ~when_ready_to_read ~on_payload_eof =
let create buffer ~when_ready_to_read =
{ faraday = Faraday.of_bigstring buffer
; read_scheduled = false
; eof_has_been_called = false
; on_eof = default_on_eof
; on_read = default_on_read
; when_ready_to_read
; on_payload_eof
}

let create_empty ~on_payload_eof =
let create_empty () =
let t =
create
Bigstringaf.empty
~when_ready_to_read:Optional_thunk.none
~on_payload_eof
in
Faraday.close t.faraday;
t.on_payload_eof ();
t

let is_closed t =
Expand All @@ -85,7 +81,6 @@ module IOVec = Httpun.IOVec
t.on_read <- default_on_read;
if not t.eof_has_been_called then begin
t.eof_has_been_called <- true;
t.on_payload_eof ();
on_eof ();
end
(* [Faraday.operation] never returns an empty list of iovecs *)
Expand Down Expand Up @@ -122,3 +117,15 @@ module IOVec = Httpun.IOVec
let has_pending_output t = Faraday.has_pending_output t.faraday

let is_read_scheduled t = t.read_scheduled

type input_state =
| Ready
| Wait
| Complete

let input_state t : input_state =
if is_closed t
then Complete
else if is_read_scheduled t
then Ready
else Wait
118 changes: 83 additions & 35 deletions lib/websocket_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,24 @@ type error = [ `Exn of exn ]

type error_handler = Wsd.t -> error -> unit

let default_payload =
Sys.opaque_identity (Payload.create_empty ~on_payload_eof:(fun () -> ()))
type frame_handler =
opcode:Websocket.Opcode.t
-> is_fin:bool
-> len:int
-> Payload.t
-> unit

type t =
{ reader : [`Parse of string list * string] Reader.t
; wsd : Wsd.t
; frame_handler : frame_handler
; eof : unit -> unit
; mutable current_payload: Payload.t
; frame_queue: (Parse.t * Payload.t) Queue.t
}

type input_handlers =
{ frame : opcode:Websocket.Opcode.t -> is_fin:bool -> len:int -> Payload.t -> unit
; eof : unit -> unit }
{ frame : frame_handler
; eof : unit -> unit }

(* TODO: this should be passed as an argument from the runtime, to allow for
* cryptographically secure random number generation. *)
Expand All @@ -42,52 +47,95 @@ let wakeup_reader t = Reader.wakeup t.reader

let create ~mode ?(error_handler = default_error_handler) websocket_handler =
let wsd = Wsd.create ~error_handler mode in
let { frame; eof } = websocket_handler wsd in
let handler t = fun ~opcode ~is_fin ~len payload ->
let t = Lazy.force t in
t.current_payload <- payload;
frame ~opcode ~is_fin ~len payload
let { frame = frame_handler; eof } = websocket_handler wsd in
let frame_queue = Queue.create () in
let handler frame payload =
let call_handler = Queue.is_empty frame_queue in

Queue.push (frame, payload) frame_queue;
if call_handler
then
let { Parse.opcode; is_fin; payload_length; _ } = frame in
frame_handler ~opcode ~is_fin ~len:payload_length payload
in
let rec reader =
lazy (
Reader.create
(fun ~opcode ~is_fin ~len payload -> (handler t ~opcode ~is_fin ~len payload))
~on_payload_eof:(fun () ->
let t = Lazy.force t in
t.current_payload <- default_payload;
)
)
let rec reader = lazy (Reader.create handler)
and t = lazy
{ reader = Lazy.force reader
; wsd
; frame_handler
; eof
; current_payload = default_payload
; frame_queue
}
in
Lazy.force t

let shutdown { wsd; _ } =
Wsd.close wsd
let shutdown_reader t =
Reader.force_close t.reader;
wakeup_reader t

let shutdown t =
shutdown_reader t;
Wsd.close t.wsd

let set_error_and_handle t error =
Wsd.report_error t.wsd error;
shutdown t

let advance_frame_queue t =
ignore (Queue.take t.frame_queue);
if not (Queue.is_empty t.frame_queue)
then
let { Parse.opcode; is_fin; payload_length; _ }, payload = Queue.peek t.frame_queue in
t.frame_handler ~opcode ~is_fin ~len:payload_length payload
;;

let rec _next_read_operation t =
begin match Queue.peek t.frame_queue with
| _, payload ->
begin match Payload.input_state payload with
| Wait ->
begin match Reader.next t.reader with
| (`Error _ | `Close) as operation -> operation
| _ -> `Yield
end
| Ready -> Reader.next t.reader
| Complete ->
(* Don't advance the request queue if in an error state. *)
begin match Reader.next t.reader with
| `Error _ as op ->
(* we just don't advance the request queue in the case of a parser
error. *)
op
| `Read as op ->
(* Keep reading when in a "partial" state (`Read). *)
advance_frame_queue t;
op
| `Close ->
advance_frame_queue t;
_next_read_operation t
end
end;
| exception Queue.Empty ->
let next = Reader.next t.reader in
begin match next with
| `Error _ ->
(* Don't tear down the whole connection if we saw an unrecoverable
* parsing error, as we might be in the process of streaming back the
* error response body to the client. *)
shutdown_reader t
| `Close -> ()
| _ -> ()
end;
next
end

let next_read_operation t =
match Reader.next t.reader with
match _next_read_operation t with
| `Error (`Parse (_, message)) ->
set_error_and_handle t (`Exn (Failure message)); `Close
| `Read ->
begin match t.current_payload == default_payload with
| true -> `Read
| false ->
if Payload.is_read_scheduled t.current_payload
then `Read
else begin
`Yield
end
end
| `Close -> `Close
set_error_and_handle t (`Exn (Failure message));
`Close
| `Read -> `Read
| (`Yield | `Close) as operation -> operation

let next_write_operation t =
Wsd.next t.wsd
Expand Down

0 comments on commit 4bf38e7

Please sign in to comment.