Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: yield the reader if reads not scheduled #70

Merged
merged 5 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions examples/eio/echo_server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,22 @@ let connection_handler ~sw : Eio.Net.Sockaddr.stream -> _ Eio.Net.stream_socket
let module Status = Httpun.Status in

let websocket_handler _client_address wsd =
let frame ~opcode ~is_fin:_ ~len:_ payload =
let frame ~opcode ~is_fin ~len payload =
Format.eprintf "FRAME %a %d %B@." Httpun_ws.Websocket.Opcode.pp_hum opcode len is_fin;
match (opcode: Httpun_ws.Websocket.Opcode.t) with
| #Httpun_ws.Websocket.Opcode.standard_non_control as opcode ->
let rec on_read bs ~off ~len =
Format.eprintf "do it %d %S@." len (Bigstringaf.substring bs ~off ~len);
Httpun_ws.Wsd.schedule wsd bs ~kind:opcode ~off ~len;
Httpun_ws.Payload.schedule_read payload
~on_eof:ignore
~on_read
in
Httpun_ws.Payload.schedule_read payload
~on_eof:ignore
~on_read:(fun bs ~off ~len ->
Httpun_ws.Wsd.schedule wsd bs ~kind:opcode ~off ~len)
~on_read
| `Connection_close ->
Httpun_ws.Wsd.close wsd
Httpun_ws.Wsd.close ~code:(`Other 1005) wsd
| `Ping ->
Httpun_ws.Wsd.send_pong wsd
| `Pong
Expand Down
4 changes: 2 additions & 2 deletions lib/client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ let next_read_operation t =
(* TODO(anmonteiro): handle this *)
assert false
(* set_error_and_handle t (`Exn (Failure message)); `Close *)
| (`Read | `Close) as operation -> operation
| (`Read | `Yield | `Close) as operation -> operation

let read t bs ~off ~len =
match t.state with
Expand Down Expand Up @@ -152,7 +152,7 @@ let report_exn t exn =
let yield_reader t f =
match t.state with
| Handshake handshake -> Client_handshake.yield_reader handshake f
| Websocket _websocket -> assert false
| Websocket websocket -> Websocket_connection.yield_reader websocket f

let yield_writer t f =
match t.state with
Expand Down
115 changes: 79 additions & 36 deletions lib/parse.ml
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
type t =
{ payload_length: int
; is_fin: bool
; mask: int32 option
; payload: Payload.t
; opcode: Websocket.Opcode.t
}
{ payload_length: int
; is_fin: bool
; mask: int32 option
; opcode: Websocket.Opcode.t
}

let is_fin headers =
let bits = Bigstringaf.unsafe_get headers 0 |> Char.code in
Expand Down Expand Up @@ -87,7 +86,7 @@ let parse_headers =
>>= fun headers_len -> Unsafe.take headers_len Bigstringaf.sub
;;

let payload_parser t =
let payload_parser t payload =
let open Angstrom in
let unmask t bs ~src_off =
match t.mask with
Expand Down Expand Up @@ -120,28 +119,24 @@ let payload_parser t =
available >>= fun m ->
let m' = (min m n) in
let n' = n - m' in
schedule_size ~src_off t.payload m'
schedule_size ~src_off payload m'
>>= fun () -> read_exact (src_off + m') n'
in
fun n -> read_exact 0 n
in
read_exact t.payload_length
>>= fun () -> finish t.payload
>>= fun () -> finish payload
;;

let frame ~buf =
let frame =
let open Angstrom in
parse_headers
>>| fun headers ->
let payload_length = payload_length_of_headers headers
and is_fin = is_fin headers
and opcode = opcode headers
and mask = mask headers in
let payload = match payload_length with
| 0 -> Payload.empty
| _ -> Payload.create buf
in
{ is_fin; opcode; mask; payload_length; payload }
{ is_fin; opcode; mask; payload_length }
;;

module Reader = struct
Expand All @@ -155,25 +150,53 @@ module Reader = struct
type 'error t =
{ parser : unit Angstrom.t
; mutable parse_state : 'error parse_state
; mutable closed : bool }
; mutable closed : bool
; mutable wakeup : Optional_thunk.t
}

let wakeup t =
let f = t.wakeup in
t.wakeup <- Optional_thunk.none;
Optional_thunk.call_if_some f

let create frame_handler =
let parser =
let rec parser t =
let open Angstrom in
let buf = Bigstringaf.create 0x1000 in
skip_many
(frame ~buf <* commit >>= fun frame ->
let payload = frame.payload in
let { is_fin; opcode; payload_length = len; _ } = frame in
frame_handler ~opcode ~is_fin ~len payload;
payload_parser frame)
(frame <* commit >>= fun frame ->
let { payload_length; _ } = frame in
let payload =
match payload_length with
| 0 -> Payload.create_empty ()
| _ ->
Payload.create buf
~when_ready_to_read:(Optional_thunk.some (fun () ->
wakeup (Lazy.force t)))
in
frame_handler frame payload;
payload_parser frame payload)
and t = lazy (
{ parser = parser t
; parse_state = Done
; closed = false
; wakeup = Optional_thunk.none
}
)
in
{ parser
; parse_state = Done
; closed = false
}
Lazy.force t
;;

let is_closed t =
t.closed

let on_wakeup t k =
if is_closed t
then failwith "on_wakeup on closed reader"
else if Optional_thunk.is_some t.wakeup
then failwith "on_wakeup: only one callback can be registered at a time"
else t.wakeup <- Optional_thunk.some k

let transition t state =
match state with
| AU.Done(consumed, ())
Expand All @@ -195,28 +218,48 @@ module Reader = struct
t.parse_state <- Partial continue
| _ -> assert false

let rec read_with_more t bs ~off ~len more =
let rec _read_with_more t bs ~off ~len more =
let initial = match t.parse_state with Done -> true | _ -> false in
let consumed =
match t.parse_state with
| Fail _ -> 0
(* Don't feed empty input when we're at a request boundary *)
| Done when len = 0 -> 0
| Done ->
start t (AU.parse t.parser);
read_with_more t bs ~off ~len more;
_read_with_more t bs ~off ~len more;
| Partial continue ->
transition t (continue bs more ~off ~len)
in
begin match more with
| Complete -> t.closed <- true;
| Incomplete -> ()
end;
(* Special case where the parser just started and was fed a zero-length
* bigstring. Avoid putting them parser in an error state in this scenario.
* If we were already in a `Partial` state, return the error. *)
if initial && len = 0 then t.parse_state <- Done;
match t.parse_state with
| Done when consumed < len ->
let off = off + consumed
and len = len - consumed in
consumed + _read_with_more t bs ~off ~len more
| _ -> consumed
;;

let read_with_more t bs ~off ~len more =
let consumed = _read_with_more t bs ~off ~len more in
(match more with
| Complete ->
t.closed <- true
| Incomplete -> ());
consumed

let force_close t =
t.closed <- true;
;;

let next t =
match t.parse_state with
| Done ->
if t.closed
then `Close
else `Read
| Fail failure -> `Error failure
| _ when t.closed -> `Close
| Done -> `Read
| Partial _ -> `Read
;;
end
52 changes: 39 additions & 13 deletions lib/payload.ml
Original file line number Diff line number Diff line change
Expand Up @@ -38,43 +38,52 @@ module IOVec = Httpun.IOVec
{ faraday : Faraday.t
; mutable read_scheduled : bool
; mutable on_eof : unit -> unit
; mutable eof_has_been_called : bool
; mutable on_read : Bigstringaf.t -> off:int -> len:int -> unit
; when_ready_to_read : Optional_thunk.t
}

let default_on_eof = Sys.opaque_identity (fun () -> ())
let default_on_read = Sys.opaque_identity (fun _ ~off:_ ~len:_ -> ())

let of_faraday faraday =
{ faraday
let create buffer ~when_ready_to_read =
{ faraday = Faraday.of_bigstring buffer
; read_scheduled = false
; eof_has_been_called = false
; on_eof = default_on_eof
; on_read = default_on_read
; when_ready_to_read
}

let create buffer =
of_faraday (Faraday.of_bigstring buffer)

let create_empty () =
let t = create Bigstringaf.empty in
let t =
create
Bigstringaf.empty
~when_ready_to_read:Optional_thunk.none
in
Faraday.close t.faraday;
t

let empty = create_empty ()

let is_closed t =
Faraday.is_closed t.faraday

let unsafe_faraday t =
t.faraday

let ready_to_read t = Optional_thunk.call_if_some t.when_ready_to_read

let rec do_execute_read t on_eof on_read =
match Faraday.operation t.faraday with
| `Yield -> ()
| `Close ->
t.read_scheduled <- false;
t.on_eof <- default_on_eof;
t.on_read <- default_on_read;
on_eof ()
if not t.eof_has_been_called then begin
t.eof_has_been_called <- true;
on_eof ();
end
(* [Faraday.operation] never returns an empty list of iovecs *)
| `Writev [] -> assert false
| `Writev (iovec::_) ->
t.read_scheduled <- false;
Expand All @@ -96,10 +105,27 @@ module IOVec = Httpun.IOVec
t.on_eof <- on_eof;
t.on_read <- on_read;
end;
do_execute_read t on_eof on_read

let is_read_scheduled t = t.read_scheduled
do_execute_read t on_eof on_read;
ready_to_read t

let close t =
Faraday.close t.faraday;
execute_read t
execute_read t;
ready_to_read t
;;

let has_pending_output t = Faraday.has_pending_output t.faraday

let is_read_scheduled t = t.read_scheduled

type input_state =
| Ready
| Wait
| Complete

let input_state t : input_state =
if is_closed t
then Complete
else if is_read_scheduled t
then Ready
else Wait
2 changes: 1 addition & 1 deletion lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ let read_eof t bs ~off ~len =
let yield_reader t f =
match t.state with
| Handshake handshake -> Server_handshake.yield_reader handshake f
| Websocket _ -> assert false
| Websocket websocket -> Websocket_connection.yield_reader websocket f

let next_write_operation t =
match t.state with
Expand Down
Loading