diff --git a/bench/bench_queue.ml b/bench/bench_queue.ml new file mode 100644 index 00000000..a51f6376 --- /dev/null +++ b/bench/bench_queue.ml @@ -0,0 +1,86 @@ +open Multicore_bench +open Picos_std_sync + +let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () = + let t = Queue.create ~padded:true () in + + let op push = + if push then Queue.push t 101 + else match Queue.pop_exn t with _ -> () | exception Queue.Empty -> () + in + + let init _ = + assert ( + match Queue.pop_exn t with _ -> false | exception Queue.Empty -> true); + Util.generate_push_and_pop_sequence n_msgs + in + let work _ bits = Util.Bits.iter op bits in + + Times.record ~budgetf ~n_domains:1 ~init ~work () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain" + +let run_one ~budgetf ~n_adders ~n_takers () = + let n_domains = n_adders + n_takers in + + let n_msgs = 50 * Util.iter_factor in + + let t = Queue.create ~padded:true () in + + let n_msgs_to_add = Countdown.create ~n_domains:n_adders () in + let n_msgs_to_take = Countdown.create ~n_domains:n_takers () in + + let init _ = + assert ( + match Queue.pop_exn t with _ -> false | exception Queue.Empty -> true); + Countdown.non_atomic_set n_msgs_to_add n_msgs; + Countdown.non_atomic_set n_msgs_to_take n_msgs + in + let work i () = + if i < n_adders then + let rec work () = + let n = Countdown.alloc n_msgs_to_add ~domain_index:i ~batch:1000 in + if 0 < n then begin + for i = 1 to n do + Queue.push t i + done; + work () + end + in + work () + else + let i = i - n_adders in + let rec work () = + let n = Countdown.alloc n_msgs_to_take ~domain_index:i ~batch:1000 in + if 0 < n then + let rec loop n = + if 0 < n then begin + match Queue.pop_exn t with + | _ -> loop (n - 1) + | exception Queue.Empty -> + Backoff.once Backoff.default |> ignore; + loop n + end + else work () + in + loop n + in + work () + in + + let config = + let format role n = + Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s") + in + Printf.sprintf "%s, %s" + (format "nb adder" n_adders) + (format "nb taker" n_takers) + in + Times.record ~budgetf ~n_domains ~init ~work () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config + +let run_suite ~budgetf = + run_one_domain ~budgetf () + @ (Util.cross [ 1; 2; 4 ] [ 1; 2; 4 ] + |> List.concat_map @@ fun (n_adders, n_takers) -> + if Picos_domain.recommended_domain_count () < n_adders + n_takers then [] + else run_one ~budgetf ~n_adders ~n_takers ()) diff --git a/bench/dune b/bench/dune index 3d27fe7e..d2d85eca 100644 --- a/bench/dune +++ b/bench/dune @@ -11,6 +11,7 @@ (run %{test} -brief "Picos Mutex") (run %{test} -brief "Picos Semaphore") (run %{test} -brief "Picos Spawn") + (run %{test} -brief "Picos Queue") (run %{test} -brief "Picos Yield") (run %{test} -brief "Picos Cancel_after with Picos_select") (run %{test} -brief "Ref with Picos_sync.Mutex") diff --git a/bench/main.ml b/bench/main.ml index 423c3774..4ca2ea79 100644 --- a/bench/main.ml +++ b/bench/main.ml @@ -9,6 +9,7 @@ let benchmarks = ("Picos DLS", Bench_dls.run_suite); ("Picos Mutex", Bench_mutex.run_suite); ("Picos Semaphore", Bench_semaphore.run_suite); + ("Picos Queue", Bench_queue.run_suite); ("Picos Spawn", Bench_spawn.run_suite); ("Picos Yield", Bench_yield.run_suite); ("Picos Cancel_after with Picos_select", Bench_cancel_after.run_suite); diff --git a/lib/picos_std.sync/picos_std_sync.ml b/lib/picos_std.sync/picos_std_sync.ml index 007b8cf0..198e6396 100644 --- a/lib/picos_std.sync/picos_std_sync.ml +++ b/lib/picos_std.sync/picos_std_sync.ml @@ -4,4 +4,5 @@ module Semaphore = Semaphore module Lazy = Lazy module Latch = Latch module Ivar = Ivar +module Queue = Queue module Stream = Stream diff --git a/lib/picos_std.sync/picos_std_sync.mli b/lib/picos_std.sync/picos_std_sync.mli index c4cd77d8..c78eaa68 100644 --- a/lib/picos_std.sync/picos_std_sync.mli +++ b/lib/picos_std.sync/picos_std_sync.mli @@ -351,6 +351,81 @@ module Ivar : sig variable has either been assigned a value or has been poisoned. *) end +module Queue : sig + (** A lock-free multi-producer, multi-consumer queue. *) + + (** {1 API} *) + + type !'a t + (** A multi-producer, multi-consumer queue. *) + + val create : ?padded:bool -> unit -> 'a t + (** [create ()] returns a new empty multi-producer, multi-consumer queue. *) + + val push : 'a t -> 'a -> unit + (** [push queue value] adds the [value] to the tail of the [queue]. *) + + val push_head : 'a t -> 'a -> unit + (** [push_head queue value] adds the [value] to the head of the [queue]. *) + + exception Empty + (** Raised by {!pop_exn} in case it finds the queue empty. *) + + val pop_exn : 'a t -> 'a + (** [pop_exn queue] tries to remove the value at the head of the [queue]. + Returns the removed value or raises {!Empty} in case the queue was empty. + + @raise Empty in case the queue was empty. *) + + val pop_opt : 'a t -> 'a option + (** [pop_opt queue] tries to remove the value at the head of the [queue]. + Returns the removed value or [None] in case the queue was empty. *) + + val pop : 'a t -> 'a + (** [pop queue] waits until the queue is not empty, removes the value at the + head of the [queue], and returns it. *) + + val length : 'a t -> int + (** [length queue] returns the length or the number of values in the + [queue]. *) + + val is_empty : 'a t -> bool + (** [is_empty queue] is equivalent to {{!length} [length queue = 0]}. *) + + (** {1 Examples} + + An example top-level session: + {[ + # let q : int Queue.t = + Queue.create () + val q : int Picos_std_sync.Queue.t = + + # Queue.push q 42 + - : unit = () + + # Queue.push_head q 76 + - : unit = () + + # Queue.length q + - : int = 2 + + # Queue.push q 101 + - : unit = () + + # Queue.pop_exn q + - : int = 76 + + # Queue.pop_exn q + - : int = 42 + + # Queue.pop_exn q + - : int = 101 + + # Queue.pop_exn q + Exception: Picos_std_sync__Queue.Empty. + ]} *) +end + module Stream : sig (** A lock-free, poisonable, many-to-many, stream. @@ -423,6 +498,8 @@ end val push : 'a t -> 'a -> unit val pop : 'a t -> 'a end = struct + module Queue = Stdlib.Queue + type 'a t = { mutex : Mutex.t; queue : 'a Queue.t; diff --git a/lib/picos_std.sync/queue.ml b/lib/picos_std.sync/queue.ml new file mode 100644 index 00000000..da83488b --- /dev/null +++ b/lib/picos_std.sync/queue.ml @@ -0,0 +1,282 @@ +open Picos +module Atomic = Multicore_magic.Transparent_atomic + +type 'a t = { head : 'a head Atomic.t; tail : 'a tail Atomic.t } + +and ('a, _) tdt = + | Cons : { + counter : int; + value : 'a; + suffix : ('a, [ `Cons | `Head ]) tdt; + } + -> ('a, [> `Cons ]) tdt + | Head : { counter : int } -> ('a, [> `Head ]) tdt + | Snoc : { + counter : int; + prefix : ('a, [ `Snoc | `Tail ]) tdt; + value : 'a; + } + -> ('a, [> `Snoc ]) tdt + | Tail : { + counter : int; + mutable move : ('a, [ `Snoc | `Used | `Wait ]) tdt; + } + -> ('a, [> `Tail ]) tdt + | Used : ('a, [> `Used ]) tdt + | Wait : { + trigger : Trigger.t; + next : ('a, [ `Wait | `Used ]) tdt; + } + -> ('a, [> `Wait ]) tdt + +and 'a head = H : ('a, [< `Cons | `Head ]) tdt -> 'a head [@@unboxed] +and 'a tail = T : ('a, [< `Snoc | `Tail ]) tdt -> 'a tail [@@unboxed] + +let create ?padded () = + let head = + Atomic.make (H (Head { counter = 1 })) |> Multicore_magic.copy_as ?padded + in + let tail = + Atomic.make (T (Tail { counter = 0; move = Used })) + |> Multicore_magic.copy_as ?padded + in + Multicore_magic.copy_as ?padded { head; tail } + +let rec signal = function + | Used -> () + | Wait wait_r -> + Trigger.signal wait_r.trigger; + signal wait_r.next + +let[@inline] signal (Tail tail_r : (_, [ `Tail ]) tdt) = + match tail_r.move with + | Used | Snoc _ -> () + | Wait _ as wait -> + tail_r.move <- Used; + signal wait + +let rec rev (Cons _ as suffix : (_, [< `Cons ]) tdt) = function + | Snoc { counter; prefix; value } -> + rev (Cons { counter; value; suffix }) prefix + | Tail _ -> suffix + +let rev : _ -> (_, [ `Cons ]) tdt = function + | (Snoc { counter; prefix; value } : (_, [< `Snoc ]) tdt) -> + rev + (Cons { counter; value; suffix = Head { counter = counter + 1 } }) + prefix + +let rec push t value backoff = function + | T (Snoc snoc_r as prefix) -> + let after = Snoc { counter = snoc_r.counter + 1; prefix; value } in + if not (Atomic.compare_and_set t.tail (T prefix) (T after)) then + let backoff = Backoff.once backoff in + push t value backoff (Atomic.fenceless_get t.tail) + | T (Tail tail_r as prefix) -> begin + match tail_r.move with + | Used | Wait _ -> + let after = Snoc { counter = tail_r.counter + 1; prefix; value } in + if Atomic.compare_and_set t.tail (T prefix) (T after) then + signal prefix + else + let backoff = Backoff.once backoff in + push t value backoff (Atomic.fenceless_get t.tail) + | Snoc move_r as move -> + begin + match Atomic.get t.head with + | H (Head head_r as head) when head_r.counter < move_r.counter -> + let after = rev move in + if + Atomic.fenceless_get t.head == H head + && Atomic.compare_and_set t.head (H head) (H after) + then tail_r.move <- Used + | _ -> tail_r.move <- Used + end; + push t value backoff (Atomic.get t.tail) + end + +exception Empty + +type ('a, _) on_empty = + | Return_None : ('a, 'a option) on_empty + | Raise_Empty : ('a, 'a) on_empty + | Await : ('a, 'a) on_empty + +let rec pop_as : type a r. a t -> (a, r) on_empty -> _ -> a head -> r = + fun t on_empty backoff -> function + | H (Cons cons_r as cons) -> + if Atomic.compare_and_set t.head (H cons) (H cons_r.suffix) then + match on_empty with + | Return_None -> Some cons_r.value + | Raise_Empty -> cons_r.value + | Await -> cons_r.value + else + let backoff = Backoff.once backoff in + pop_as t on_empty backoff (Atomic.fenceless_get t.head) + | H (Head head_r as head) -> begin + match Atomic.get t.tail with + | T (Snoc snoc_r as move) -> + if head_r.counter = snoc_r.counter then + if Atomic.compare_and_set t.tail (T move) (T snoc_r.prefix) then + match on_empty with + | Return_None -> Some snoc_r.value + | Raise_Empty -> snoc_r.value + | Await -> snoc_r.value + else pop_as t on_empty backoff (Atomic.fenceless_get t.head) + else + let (Tail tail_r as tail : (_, [ `Tail ]) tdt) = + Tail { counter = snoc_r.counter; move } + in + let new_head = Atomic.get t.head in + if new_head != H head then pop_as t on_empty backoff new_head + else if Atomic.compare_and_set t.tail (T move) (T tail) then + let (Cons cons_r) = rev move in + let after = cons_r.suffix in + let new_head = Atomic.get t.head in + if new_head != H head then pop_as t on_empty backoff new_head + else if Atomic.compare_and_set t.head (H head) (H after) then begin + tail_r.move <- Used; + match on_empty with + | Return_None -> Some cons_r.value + | Raise_Empty -> cons_r.value + | Await -> cons_r.value + end + else + let backoff = Backoff.once backoff in + pop_as t on_empty backoff (Atomic.fenceless_get t.head) + else pop_as t on_empty backoff (Atomic.fenceless_get t.head) + | T (Tail tail_r as tail) -> begin + match tail_r.move with + | (Used | Wait _) as next -> begin + let new_head = Atomic.get t.head in + if new_head != H head then pop_as t on_empty backoff new_head + else + match on_empty with + | Return_None -> None + | Raise_Empty -> raise_notrace Empty + | Await -> await t on_empty backoff tail tail_r.counter next + end + | Snoc move_r as move -> begin + if head_r.counter < move_r.counter then + let (Cons cons_r) = rev move in + let after = cons_r.suffix in + let new_head = Atomic.get t.head in + if new_head != H head then pop_as t on_empty backoff new_head + else if Atomic.compare_and_set t.head (H head) (H after) then begin + tail_r.move <- Used; + match on_empty with + | Return_None -> Some cons_r.value + | Raise_Empty -> cons_r.value + | Await -> cons_r.value + end + else + let backoff = Backoff.once backoff in + pop_as t on_empty backoff (Atomic.fenceless_get t.head) + else + let new_head = Atomic.get t.head in + if new_head != H head then pop_as t on_empty backoff new_head + else + match on_empty with + | Return_None -> None + | Raise_Empty -> raise_notrace Empty + | Await -> await t on_empty backoff tail tail_r.counter Used + end + end + end + +and await : + type a r. + a t -> (a, r) on_empty -> _ -> (a, _) tdt -> int -> (a, _) tdt -> r = + fun t on_empty backoff tail counter next -> + let trigger = Trigger.create () in + let (Wait wait_r as move : (_, [ `Wait ]) tdt) = Wait { trigger; next } in + let after = Tail { counter; move } in + if Atomic.compare_and_set t.tail (T tail) (T after) then + match Trigger.await wait_r.trigger with + | None -> pop_as t on_empty Backoff.default (Atomic.fenceless_get t.head) + | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt (* TODO *) + else + let backoff = Backoff.once backoff in + pop_as t on_empty backoff (Atomic.fenceless_get t.head) + +let rec push_head t value backoff = + match Atomic.get t.head with + | H (Cons cons_r as suffix) -> + let after = Cons { counter = cons_r.counter - 1; value; suffix } in + if not (Atomic.compare_and_set t.head (H suffix) (H after)) then + push_head t value (Backoff.once backoff) + | H (Head head_r as head) -> begin + match Atomic.get t.tail with + | T (Snoc snoc_r as move) -> + if Atomic.get t.head != H head then push_head t value backoff + else if head_r.counter = snoc_r.counter then begin + let prefix = Snoc { snoc_r with value } in + let after = + Snoc { snoc_r with counter = snoc_r.counter + 1; prefix } + in + if not (Atomic.compare_and_set t.tail (T move) (T after)) then + push_head t value (Backoff.once backoff) + end + else + let tail = Tail { counter = snoc_r.counter; move } in + let backoff = + if Atomic.compare_and_set t.tail (T move) (T tail) then backoff + else Backoff.once backoff + in + push_head t value backoff + | T (Tail tail_r as prefix) -> begin + match tail_r.move with + | Used | Wait _ -> + if Atomic.get t.head == H head then begin + let tail = + Snoc { counter = tail_r.counter + 1; value; prefix } + in + if Atomic.compare_and_set t.tail (T prefix) (T tail) then + signal prefix + else push_head t value (Backoff.once backoff) + end + else push_head t value backoff + | Snoc move_r as move -> + begin + match Atomic.get t.head with + | H (Head head_r as head) when head_r.counter < move_r.counter + -> + let after = rev move in + if + Atomic.fenceless_get t.head == H head + && Atomic.compare_and_set t.head (H head) (H after) + then tail_r.move <- Used + | _ -> tail_r.move <- Used + end; + push_head t value backoff + end + end + +let rec length t = + let head = Atomic.get t.head in + let tail = Atomic.fenceless_get t.tail in + if head != Atomic.get t.head then length t + else + let head_at = + match head with H (Cons r) -> r.counter | H (Head r) -> r.counter + in + let tail_at = + match tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter + in + tail_at - head_at + 1 + +let[@inline] is_empty t = length t == 0 + +let[@inline] pop_exn t = + pop_as t Raise_Empty Backoff.default (Atomic.fenceless_get t.head) + +let[@inline] pop_opt t = + pop_as t Return_None Backoff.default (Atomic.fenceless_get t.head) + +let[@inline] pop t = + pop_as t Await Backoff.default (Atomic.fenceless_get t.head) + +let[@inline] push t value = + push t value Backoff.default (Atomic.fenceless_get t.tail) + +let[@inline] push_head t value = push_head t value Backoff.default diff --git a/test/dune b/test/dune index 48b81eb4..09bc9be5 100644 --- a/test/dune +++ b/test/dune @@ -66,6 +66,17 @@ ;; ))) +(test + (package picos_meta) + (name test_sync_queue) + (modules test_sync_queue) + (libraries + picos_std.sync + qcheck-core + qcheck-multicoretests-util + qcheck-stm.stm + stm_run)) + ;; (test diff --git a/test/test_sync_queue.ml b/test/test_sync_queue.ml new file mode 100644 index 00000000..14116e61 --- /dev/null +++ b/test/test_sync_queue.ml @@ -0,0 +1,83 @@ +open QCheck +open STM +open Picos_std_sync + +let () = + let q = Queue.create () in + Queue.push q 101; + Queue.push q 42; + assert (Queue.pop_exn q = 101); + Queue.push q 76; + assert (Queue.pop_exn q = 42); + assert (Queue.pop_exn q = 76); + match Queue.pop_exn q with _ -> assert false | exception Queue.Empty -> () + +module Spec = struct + type cmd = Push of int | Push_head of int | Pop_opt | Length + + let show_cmd = function + | Push x -> "Push " ^ string_of_int x + | Push_head x -> "Push_head " ^ string_of_int x + | Pop_opt -> "Pop_opt" + | Length -> "Length" + + module State = struct + type t = int list * int list + + let push x (h, t) = if h == [] then ([ x ], []) else (h, x :: t) + let push_head x (h, t) = (x :: h, t) + let peek_opt (h, _) = match h with x :: _ -> Some x | [] -> None + + let drop ((h, t) as s) = + match h with [] -> s | [ _ ] -> (List.rev t, []) | _ :: h -> (h, t) + + let length (h, t) = List.length h + List.length t + end + + type state = State.t + type sut = int Queue.t + + let arb_cmd _s = + [ + Gen.int_range 1 1000 |> Gen.map (fun x -> Push x); + Gen.int_range 1 1000 |> Gen.map (fun x -> Push_head x); + Gen.return Pop_opt; + Gen.return Length; + ] + |> Gen.oneof |> make ~print:show_cmd + + let init_state = ([], []) + let init_sut () = Queue.create ~padded:true () + let cleanup _ = () + + let next_state c s = + match c with + | Push x -> State.push x s + | Push_head x -> State.push_head x s + | Pop_opt -> State.drop s + | Length -> s + + let precond _ _ = true + + let run c d = + match c with + | Push x -> Res (unit, Queue.push d x) + | Push_head x -> Res (unit, Queue.push_head d x) + | Pop_opt -> + Res + ( option int, + match Queue.pop_exn d with + | v -> Some v + | exception Queue.Empty -> None ) + | Length -> Res (int, Queue.length d) + + let postcond c (s : state) res = + match (c, res) with + | Push _x, Res ((Unit, _), ()) -> true + | Push_head _x, Res ((Unit, _), ()) -> true + | Pop_opt, Res ((Option Int, _), res) -> res = State.peek_opt s + | Length, Res ((Int, _), res) -> res = State.length s + | _, _ -> false +end + +let () = Stm_run.run ~name:"Picos_std_sync.Queue" (module Spec) |> exit