Skip to content

Commit

Permalink
fix more
Browse files Browse the repository at this point in the history
  • Loading branch information
anmonteiro committed Aug 24, 2024
1 parent 8e67fb1 commit 6d3865f
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 15 deletions.
29 changes: 23 additions & 6 deletions lib/parse.ml
Original file line number Diff line number Diff line change
Expand Up @@ -218,20 +218,37 @@ module Reader = struct
t.parse_state <- Partial continue
| _ -> assert false

let rec read_with_more t bs ~off ~len more =
let rec _read_with_more t bs ~off ~len more =
let initial = match t.parse_state with Done -> true | _ -> false in
let consumed =
match t.parse_state with
| Fail _ -> 0
(* Don't feed empty input when we're at a request boundary *)
| Done when len = 0 -> 0
| Done ->
start t (AU.parse t.parser);
read_with_more t bs ~off ~len more;
_read_with_more t bs ~off ~len more;
| Partial continue ->
transition t (continue bs more ~off ~len)
in
begin match more with
| Complete -> t.closed <- true;
| Incomplete -> ()
end;
(* Special case where the parser just started and was fed a zero-length
* bigstring. Avoid putting them parser in an error state in this scenario.
* If we were already in a `Partial` state, return the error. *)
if initial && len = 0 then t.parse_state <- Done;
match t.parse_state with
| Done when consumed < len ->
let off = off + consumed
and len = len - consumed in
consumed + _read_with_more t bs ~off ~len more
| _ -> consumed
;;

let read_with_more t bs ~off ~len more =
let consumed = _read_with_more t bs ~off ~len more in
(match more with
| Complete ->
t.closed <- true
| Incomplete -> ());
consumed

let force_close t =
Expand Down
22 changes: 17 additions & 5 deletions lib/websocket_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,26 @@ let next_read_operation t =
let next_write_operation t =
Wsd.next t.wsd

let report_exn t exn =
set_error_and_handle t (`Exn exn)

let read_with_more t bs ~off ~len more =
let consumed = Reader.read_with_more t.reader bs ~off ~len more in
if not (Queue.is_empty t.frame_queue)
then (
let _, payload = Queue.peek t.frame_queue in
if Payload.has_pending_output payload
then try Payload.execute_read payload
with exn -> report_exn t exn
);
consumed
;;

let read t bs ~off ~len =
Reader.read_with_more t.reader bs ~off ~len Incomplete
read_with_more t bs ~off ~len Incomplete

let read_eof t bs ~off ~len =
let r = Reader.read_with_more t.reader bs ~off ~len Complete in
let r = read_with_more t bs ~off ~len Complete in
t.eof ();
r

Expand All @@ -162,9 +177,6 @@ let yield_writer t k =
let is_closed { wsd; _ } =
Wsd.is_closed wsd

let report_exn t exn =
set_error_and_handle t (`Exn exn)

let yield_reader t k =
if Reader.is_closed t.reader
then k ()
Expand Down
10 changes: 6 additions & 4 deletions lib_test/test_httpun_ws.ml
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,11 @@ module Websocket = struct
match opcode with
| `Text ->
incr frames_parsed;
Payload.schedule_read payload
~on_eof:ignore
~on_read:(fun bs ~off ~len ->
Wsd.schedule wsd bs ~kind:`Text ~off ~len)
let rec on_read bs ~off ~len =
Wsd.schedule wsd ~kind:`Text bs ~off ~len;
Payload.schedule_read payload ~on_eof:ignore ~on_read
in
Payload.schedule_read payload ~on_eof:ignore ~on_read
| `Binary
| `Continuation
| `Connection_close
Expand All @@ -144,6 +145,7 @@ module Websocket = struct
let len = String.length frames in
let bs = Bigstringaf.of_string ~off:0 ~len frames in
let read = Server_connection.read t bs ~off:0 ~len in
ignore @@ Server_connection.next_read_operation t;
Alcotest.(check int) "Reads both frames" len read;
Alcotest.(check int) "Both frames parsed and handled" 2 !frames_parsed;
;;
Expand Down

0 comments on commit 6d3865f

Please sign in to comment.