Skip to content

Commit

Permalink
Add Picos_std_sync.Queue
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Nov 28, 2024
1 parent 84d5428 commit b397324
Show file tree
Hide file tree
Showing 8 changed files with 542 additions and 0 deletions.
86 changes: 86 additions & 0 deletions bench/bench_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
open Multicore_bench
open Picos_std_sync

let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
let t = Queue.create ~padded:true () in

let op push =
if push then Queue.push t 101
else match Queue.pop_exn t with _ -> () | exception Queue.Empty -> ()
in

let init _ =
assert (
match Queue.pop_exn t with _ -> false | exception Queue.Empty -> true);
Util.generate_push_and_pop_sequence n_msgs
in
let work _ bits = Util.Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let run_one ~budgetf ~n_adders ~n_takers () =
let n_domains = n_adders + n_takers in

let n_msgs = 50 * Util.iter_factor in

let t = Queue.create ~padded:true () in

let n_msgs_to_add = Countdown.create ~n_domains:n_adders () in
let n_msgs_to_take = Countdown.create ~n_domains:n_takers () in

let init _ =
assert (
match Queue.pop_exn t with _ -> false | exception Queue.Empty -> true);
Countdown.non_atomic_set n_msgs_to_add n_msgs;
Countdown.non_atomic_set n_msgs_to_take n_msgs
in
let work i () =
if i < n_adders then
let rec work () =
let n = Countdown.alloc n_msgs_to_add ~domain_index:i ~batch:1000 in
if 0 < n then begin
for i = 1 to n do
Queue.push t i
done;
work ()
end
in
work ()
else
let i = i - n_adders in
let rec work () =
let n = Countdown.alloc n_msgs_to_take ~domain_index:i ~batch:1000 in
if 0 < n then
let rec loop n =
if 0 < n then begin
match Queue.pop_exn t with
| _ -> loop (n - 1)
| exception Queue.Empty ->
Backoff.once Backoff.default |> ignore;
loop n
end
else work ()
in
loop n
in
work ()
in

let config =
let format role n =
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
in
Printf.sprintf "%s, %s"
(format "nb adder" n_adders)
(format "nb taker" n_takers)
in
Times.record ~budgetf ~n_domains ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
run_one_domain ~budgetf ()
@ (Util.cross [ 1; 2; 4 ] [ 1; 2; 4 ]
|> List.concat_map @@ fun (n_adders, n_takers) ->
if Picos_domain.recommended_domain_count () < n_adders + n_takers then []
else run_one ~budgetf ~n_adders ~n_takers ())
1 change: 1 addition & 0 deletions bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
(run %{test} -brief "Picos Mutex")
(run %{test} -brief "Picos Semaphore")
(run %{test} -brief "Picos Spawn")
(run %{test} -brief "Picos Queue")
(run %{test} -brief "Picos Yield")
(run %{test} -brief "Picos Cancel_after with Picos_select")
(run %{test} -brief "Ref with Picos_sync.Mutex")
Expand Down
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ let benchmarks =
("Picos DLS", Bench_dls.run_suite);
("Picos Mutex", Bench_mutex.run_suite);
("Picos Semaphore", Bench_semaphore.run_suite);
("Picos Queue", Bench_queue.run_suite);
("Picos Spawn", Bench_spawn.run_suite);
("Picos Yield", Bench_yield.run_suite);
("Picos Cancel_after with Picos_select", Bench_cancel_after.run_suite);
Expand Down
1 change: 1 addition & 0 deletions lib/picos_std.sync/picos_std_sync.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ module Semaphore = Semaphore
module Lazy = Lazy
module Latch = Latch
module Ivar = Ivar
module Queue = Queue
module Stream = Stream
77 changes: 77 additions & 0 deletions lib/picos_std.sync/picos_std_sync.mli
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,81 @@ module Ivar : sig
variable has either been assigned a value or has been poisoned. *)
end

module Queue : sig
(** A lock-free multi-producer, multi-consumer queue. *)

(** {1 API} *)

type !'a t
(** A multi-producer, multi-consumer queue. *)

val create : ?padded:bool -> unit -> 'a t
(** [create ()] returns a new empty multi-producer, multi-consumer queue. *)

val push : 'a t -> 'a -> unit
(** [push queue value] adds the [value] to the tail of the [queue]. *)

val push_head : 'a t -> 'a -> unit
(** [push_head queue value] adds the [value] to the head of the [queue]. *)

exception Empty
(** Raised by {!pop_exn} in case it finds the queue empty. *)

val pop_exn : 'a t -> 'a
(** [pop_exn queue] tries to remove the value at the head of the [queue].
Returns the removed value or raises {!Empty} in case the queue was empty.
@raise Empty in case the queue was empty. *)

val pop_opt : 'a t -> 'a option
(** [pop_opt queue] tries to remove the value at the head of the [queue].
Returns the removed value or [None] in case the queue was empty. *)

val pop : 'a t -> 'a
(** [pop queue] waits until the queue is not empty, removes the value at the
head of the [queue], and returns it. *)

val length : 'a t -> int
(** [length queue] returns the length or the number of values in the
[queue]. *)

val is_empty : 'a t -> bool
(** [is_empty queue] is equivalent to {{!length} [length queue = 0]}. *)

(** {1 Examples}
An example top-level session:
{[
# let q : int Queue.t =
Queue.create ()
val q : int Picos_std_sync.Queue.t = <abstr>
# Queue.push q 42
- : unit = ()
# Queue.push_head q 76
- : unit = ()
# Queue.length q
- : int = 2
# Queue.push q 101
- : unit = ()
# Queue.pop_exn q
- : int = 76
# Queue.pop_exn q
- : int = 42
# Queue.pop_exn q
- : int = 101
# Queue.pop_exn q
Exception: Picos_std_sync__Queue.Empty.
]} *)
end

module Stream : sig
(** A lock-free, poisonable, many-to-many, stream.
Expand Down Expand Up @@ -423,6 +498,8 @@ end
val push : 'a t -> 'a -> unit
val pop : 'a t -> 'a
end = struct
module Queue = Stdlib.Queue
type 'a t = {
mutex : Mutex.t;
queue : 'a Queue.t;
Expand Down
Loading

0 comments on commit b397324

Please sign in to comment.