Skip to content

Commit

Permalink
feat: yield the reader if reads not scheduled (#70)
Browse files Browse the repository at this point in the history
* feat: yield the reader if reads not scheduled

* fix tests

* wip

* fix tests

* fix more
  • Loading branch information
anmonteiro authored Aug 24, 2024
1 parent 1c05642 commit 2a08610
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 106 deletions.
15 changes: 11 additions & 4 deletions examples/eio/echo_server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,22 @@ let connection_handler ~sw : Eio.Net.Sockaddr.stream -> _ Eio.Net.stream_socket
let module Status = Httpun.Status in

let websocket_handler _client_address wsd =
let frame ~opcode ~is_fin:_ ~len:_ payload =
let frame ~opcode ~is_fin ~len payload =
Format.eprintf "FRAME %a %d %B@." Httpun_ws.Websocket.Opcode.pp_hum opcode len is_fin;
match (opcode: Httpun_ws.Websocket.Opcode.t) with
| #Httpun_ws.Websocket.Opcode.standard_non_control as opcode ->
let rec on_read bs ~off ~len =
Format.eprintf "do it %d %S@." len (Bigstringaf.substring bs ~off ~len);
Httpun_ws.Wsd.schedule wsd bs ~kind:opcode ~off ~len;
Httpun_ws.Payload.schedule_read payload
~on_eof:ignore
~on_read
in
Httpun_ws.Payload.schedule_read payload
~on_eof:ignore
~on_read:(fun bs ~off ~len ->
Httpun_ws.Wsd.schedule wsd bs ~kind:opcode ~off ~len)
~on_read
| `Connection_close ->
Httpun_ws.Wsd.close wsd
Httpun_ws.Wsd.close ~code:(`Other 1005) wsd
| `Ping ->
Httpun_ws.Wsd.send_pong wsd
| `Pong
Expand Down
4 changes: 2 additions & 2 deletions lib/client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ let next_read_operation t =
(* TODO(anmonteiro): handle this *)
assert false
(* set_error_and_handle t (`Exn (Failure message)); `Close *)
| (`Read | `Close) as operation -> operation
| (`Read | `Yield | `Close) as operation -> operation

let read t bs ~off ~len =
match t.state with
Expand Down Expand Up @@ -152,7 +152,7 @@ let report_exn t exn =
let yield_reader t f =
match t.state with
| Handshake handshake -> Client_handshake.yield_reader handshake f
| Websocket _websocket -> assert false
| Websocket websocket -> Websocket_connection.yield_reader websocket f

let yield_writer t f =
match t.state with
Expand Down
115 changes: 79 additions & 36 deletions lib/parse.ml
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
type t =
{ payload_length: int
; is_fin: bool
; mask: int32 option
; payload: Payload.t
; opcode: Websocket.Opcode.t
}
{ payload_length: int
; is_fin: bool
; mask: int32 option
; opcode: Websocket.Opcode.t
}

let is_fin headers =
let bits = Bigstringaf.unsafe_get headers 0 |> Char.code in
Expand Down Expand Up @@ -87,7 +86,7 @@ let parse_headers =
>>= fun headers_len -> Unsafe.take headers_len Bigstringaf.sub
;;

let payload_parser t =
let payload_parser t payload =
let open Angstrom in
let unmask t bs ~src_off =
match t.mask with
Expand Down Expand Up @@ -120,28 +119,24 @@ let payload_parser t =
available >>= fun m ->
let m' = (min m n) in
let n' = n - m' in
schedule_size ~src_off t.payload m'
schedule_size ~src_off payload m'
>>= fun () -> read_exact (src_off + m') n'
in
fun n -> read_exact 0 n
in
read_exact t.payload_length
>>= fun () -> finish t.payload
>>= fun () -> finish payload
;;

let frame ~buf =
let frame =
let open Angstrom in
parse_headers
>>| fun headers ->
let payload_length = payload_length_of_headers headers
and is_fin = is_fin headers
and opcode = opcode headers
and mask = mask headers in
let payload = match payload_length with
| 0 -> Payload.empty
| _ -> Payload.create buf
in
{ is_fin; opcode; mask; payload_length; payload }
{ is_fin; opcode; mask; payload_length }
;;

module Reader = struct
Expand All @@ -155,25 +150,53 @@ module Reader = struct
type 'error t =
{ parser : unit Angstrom.t
; mutable parse_state : 'error parse_state
; mutable closed : bool }
; mutable closed : bool
; mutable wakeup : Optional_thunk.t
}

let wakeup t =
let f = t.wakeup in
t.wakeup <- Optional_thunk.none;
Optional_thunk.call_if_some f

let create frame_handler =
let parser =
let rec parser t =
let open Angstrom in
let buf = Bigstringaf.create 0x1000 in
skip_many
(frame ~buf <* commit >>= fun frame ->
let payload = frame.payload in
let { is_fin; opcode; payload_length = len; _ } = frame in
frame_handler ~opcode ~is_fin ~len payload;
payload_parser frame)
(frame <* commit >>= fun frame ->
let { payload_length; _ } = frame in
let payload =
match payload_length with
| 0 -> Payload.create_empty ()
| _ ->
Payload.create buf
~when_ready_to_read:(Optional_thunk.some (fun () ->
wakeup (Lazy.force t)))
in
frame_handler frame payload;
payload_parser frame payload)
and t = lazy (
{ parser = parser t
; parse_state = Done
; closed = false
; wakeup = Optional_thunk.none
}
)
in
{ parser
; parse_state = Done
; closed = false
}
Lazy.force t
;;

let is_closed t =
t.closed

let on_wakeup t k =
if is_closed t
then failwith "on_wakeup on closed reader"
else if Optional_thunk.is_some t.wakeup
then failwith "on_wakeup: only one callback can be registered at a time"
else t.wakeup <- Optional_thunk.some k

let transition t state =
match state with
| AU.Done(consumed, ())
Expand All @@ -195,28 +218,48 @@ module Reader = struct
t.parse_state <- Partial continue
| _ -> assert false

let rec read_with_more t bs ~off ~len more =
let rec _read_with_more t bs ~off ~len more =
let initial = match t.parse_state with Done -> true | _ -> false in
let consumed =
match t.parse_state with
| Fail _ -> 0
(* Don't feed empty input when we're at a request boundary *)
| Done when len = 0 -> 0
| Done ->
start t (AU.parse t.parser);
read_with_more t bs ~off ~len more;
_read_with_more t bs ~off ~len more;
| Partial continue ->
transition t (continue bs more ~off ~len)
in
begin match more with
| Complete -> t.closed <- true;
| Incomplete -> ()
end;
(* Special case where the parser just started and was fed a zero-length
* bigstring. Avoid putting them parser in an error state in this scenario.
* If we were already in a `Partial` state, return the error. *)
if initial && len = 0 then t.parse_state <- Done;
match t.parse_state with
| Done when consumed < len ->
let off = off + consumed
and len = len - consumed in
consumed + _read_with_more t bs ~off ~len more
| _ -> consumed
;;

let read_with_more t bs ~off ~len more =
let consumed = _read_with_more t bs ~off ~len more in
(match more with
| Complete ->
t.closed <- true
| Incomplete -> ());
consumed

let force_close t =
t.closed <- true;
;;

let next t =
match t.parse_state with
| Done ->
if t.closed
then `Close
else `Read
| Fail failure -> `Error failure
| _ when t.closed -> `Close
| Done -> `Read
| Partial _ -> `Read
;;
end
52 changes: 39 additions & 13 deletions lib/payload.ml
Original file line number Diff line number Diff line change
Expand Up @@ -38,43 +38,52 @@ module IOVec = Httpun.IOVec
{ faraday : Faraday.t
; mutable read_scheduled : bool
; mutable on_eof : unit -> unit
; mutable eof_has_been_called : bool
; mutable on_read : Bigstringaf.t -> off:int -> len:int -> unit
; when_ready_to_read : Optional_thunk.t
}

let default_on_eof = Sys.opaque_identity (fun () -> ())
let default_on_read = Sys.opaque_identity (fun _ ~off:_ ~len:_ -> ())

let of_faraday faraday =
{ faraday
let create buffer ~when_ready_to_read =
{ faraday = Faraday.of_bigstring buffer
; read_scheduled = false
; eof_has_been_called = false
; on_eof = default_on_eof
; on_read = default_on_read
; when_ready_to_read
}

let create buffer =
of_faraday (Faraday.of_bigstring buffer)

let create_empty () =
let t = create Bigstringaf.empty in
let t =
create
Bigstringaf.empty
~when_ready_to_read:Optional_thunk.none
in
Faraday.close t.faraday;
t

let empty = create_empty ()

let is_closed t =
Faraday.is_closed t.faraday

let unsafe_faraday t =
t.faraday

let ready_to_read t = Optional_thunk.call_if_some t.when_ready_to_read

let rec do_execute_read t on_eof on_read =
match Faraday.operation t.faraday with
| `Yield -> ()
| `Close ->
t.read_scheduled <- false;
t.on_eof <- default_on_eof;
t.on_read <- default_on_read;
on_eof ()
if not t.eof_has_been_called then begin
t.eof_has_been_called <- true;
on_eof ();
end
(* [Faraday.operation] never returns an empty list of iovecs *)
| `Writev [] -> assert false
| `Writev (iovec::_) ->
t.read_scheduled <- false;
Expand All @@ -96,10 +105,27 @@ module IOVec = Httpun.IOVec
t.on_eof <- on_eof;
t.on_read <- on_read;
end;
do_execute_read t on_eof on_read

let is_read_scheduled t = t.read_scheduled
do_execute_read t on_eof on_read;
ready_to_read t

let close t =
Faraday.close t.faraday;
execute_read t
execute_read t;
ready_to_read t
;;

let has_pending_output t = Faraday.has_pending_output t.faraday

let is_read_scheduled t = t.read_scheduled

type input_state =
| Ready
| Wait
| Complete

let input_state t : input_state =
if is_closed t
then Complete
else if is_read_scheduled t
then Ready
else Wait
2 changes: 1 addition & 1 deletion lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ let read_eof t bs ~off ~len =
let yield_reader t f =
match t.state with
| Handshake handshake -> Server_handshake.yield_reader handshake f
| Websocket _ -> assert false
| Websocket websocket -> Websocket_connection.yield_reader websocket f

let next_write_operation t =
match t.state with
Expand Down
Loading

0 comments on commit 2a08610

Please sign in to comment.