Skip to content

Commit

Permalink
Refactor Client_connection to use Optional_thunk
Browse files Browse the repository at this point in the history
  • Loading branch information
anmonteiro committed May 11, 2020
1 parent 5de2c74 commit 75f2d7b
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 21 deletions.
2 changes: 1 addition & 1 deletion examples/lwt/lwt_get.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ let main port host =
(Request.create ~headers `GET "/")
in
Body.close_writer request_body;
finished >|= fun () ->
finished >>= fun () ->
Client.shutdown connection
;;

Expand Down
2 changes: 1 addition & 1 deletion examples/lwt/lwt_get_pipelined.ml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ let main port host =
in
Body.close_writer request_body';
Body.close_writer request_body;
Lwt.join [finished; finished'] >|= fun () ->
Lwt.join [finished; finished'] >>= fun () ->
Client.shutdown connection
;;

Expand Down
38 changes: 19 additions & 19 deletions lib/client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ type t =
; request_queue : Respd.t Queue.t
(* invariant: If [request_queue] is not empty, then the head of the queue
has already written the request headers to the wire. *)
; wakeup_writer : (unit -> unit) list ref
; wakeup_reader : (unit -> unit) list ref
; mutable wakeup_writer : Optional_thunk.t
; mutable wakeup_reader : Optional_thunk.t
}

let is_closed t =
Expand All @@ -64,34 +64,38 @@ let is_active t =
let current_respd_exn t =
Queue.peek t.request_queue

let on_wakeup_reader t k =
let yield_reader t k =
if is_closed t
then failwith "on_wakeup_reader on closed conn"
else t.wakeup_reader := k::!(t.wakeup_reader)
else if Optional_thunk.is_some t.wakeup_reader
then failwith "yield_reader: only one callback can be registered at a time"
else t.wakeup_reader <- Optional_thunk.some k

let wakeup_reader t =
let f = t.wakeup_reader in
t.wakeup_reader <- Optional_thunk.none;
Optional_thunk.call_if_some f

let on_wakeup_writer t k =
if is_closed t
then failwith "on_wakeup_writer on closed conn"
else t.wakeup_writer := k::!(t.wakeup_writer)
else if Optional_thunk.is_some t.wakeup_writer
then failwith "yield_writer: only one callback can be registered at a time"
else t.wakeup_writer <- Optional_thunk.some k

let wakeup_writer t =
let fs = !(t.wakeup_writer) in
t.wakeup_writer := [];
List.iter (fun f -> f ()) fs

let wakeup_reader t =
let fs = !(t.wakeup_reader) in
t.wakeup_reader := [];
List.iter (fun f -> f ()) fs
let f = t.wakeup_writer in
t.wakeup_writer <- Optional_thunk.none;
Optional_thunk.call_if_some f

let[@ocaml.warning "-16"] create ?(config=Config.default) =
let request_queue = Queue.create () in
{ config
; reader = Reader.response request_queue
; writer = Writer.create ()
; request_queue
; wakeup_writer = ref []
; wakeup_reader = ref []
; wakeup_writer = Optional_thunk.none
; wakeup_reader = Optional_thunk.none
}

let request t request ~error_handler ~response_handler =
Expand Down Expand Up @@ -280,10 +284,6 @@ let next_write_operation t =
Writer.next t.writer
;;

let yield_reader t k =
on_wakeup_reader t k
;;

let yield_writer t k =
if is_active t then begin
let respd = current_respd_exn t in
Expand Down

0 comments on commit 75f2d7b

Please sign in to comment.