Skip to content

Commit

Permalink
Add more structured Run operations
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Nov 27, 2024
1 parent 6ccc15b commit b0ce1fa
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 0 deletions.
1 change: 1 addition & 0 deletions bench/bench_run.ocaml4.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
let run_suite ~budgetf:_ = []
27 changes: 27 additions & 0 deletions bench/bench_run.ocaml5.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
open Multicore_bench
open Picos_std_structured
module Multififo = Picos_mux_multififo

let run_one_multififo ~budgetf ~n_domains ~n () =
let context = ref (Obj.magic ()) in

let before _ = context := Multififo.context () in
let init _ = !context in
let work i context =
if i <> 0 then Multififo.runner_on_this_thread context
else ignore @@ Multififo.run ~context @@ fun () -> Run.for_n n ignore
in

let config =
Printf.sprintf "%d mfifo%s, run_n %d" n_domains
(if n_domains = 1 then "" else "s")
n
in
Times.record ~budgetf ~n_domains ~before ~init ~work ()
|> Times.to_thruput_metrics ~n ~singular:"ignore" ~config

let run_suite ~budgetf =
Util.cross [ 1; 2; 4; 8 ] [ 1_000; 10_000; 100_000; 1_000_000; 10_000_000 ]
|> List.concat_map @@ fun (n_domains, n) ->
if Picos_domain.recommended_domain_count () < n_domains then []
else run_one_multififo ~budgetf ~n_domains ~n ()
6 changes: 6 additions & 0 deletions bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
(run %{test} -brief "Picos binaries")
(run %{test} -brief "Bounded_q with Picos_sync")
(run %{test} -brief "Memory usage")
(run %{test} -brief "Picos_std_structured.Run")
;;
))
(foreign_stubs
Expand All @@ -49,6 +50,11 @@
from
(picos_mux.fifo -> scheduler.ocaml5.ml)
(picos_mux.thread -> scheduler.ocaml4.ml))
(select
bench_run.ml
from
(picos_mux.multififo -> bench_run.ocaml5.ml)
(-> bench_run.ocaml4.ml))
(select
bench_fib.ml
from
Expand Down
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ let benchmarks =
("Picos binaries", Bench_binaries.run_suite);
("Bounded_q with Picos_sync", Bench_bounded_q.run_suite);
("Memory usage", Bench_memory.run_suite);
("Picos_std_structured.Run", Bench_run.run_suite);
]

let () = Multicore_bench.Cmd.run ~benchmarks ()
7 changes: 7 additions & 0 deletions lib/picos_std.structured/picos_std_structured.mli
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,13 @@ module Run : sig
with Control.Terminate -> ()
]}
but treats the list of actions as a single computation. *)

val for_n : int -> (int -> unit) -> unit
(** [for_n n action], when [0 < n], calls [action i] for each integer [i] from
[0] to [n-1] using up to [n] fibers, including the current fiber. The
actual number of fibers used can be much less than [n] in case the calls
do not block, the calls return quickly, and/or the scheduler doesn't
provide parallelism. *)
end

(** {1 Examples}
Expand Down
81 changes: 81 additions & 0 deletions lib/picos_std.structured/run.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,84 @@ let run actions wrap =

let all actions = run actions wrap_all
let any actions = run actions wrap_any

(* *)

type per_fiber = { mutable lo : int; mutable hi : int }

type _ tdt =
| Empty : [> `Empty ] tdt
| Range : {
mutable _lo : int;
hi : int;
parent : [ `Empty | `Range ] tdt;
}
-> [> `Range ] tdt

external lo_as_atomic : [ `Range ] tdt -> int Atomic.t = "%identity"

let rec for_out (Range r as range : [ `Range ] tdt) per_fiber action =
let lo_before = Atomic.get (lo_as_atomic range) in
let n = r.hi - lo_before in
if 0 < n then begin
let lo_after = lo_before + (n asr 1) + 1 in
if Atomic.compare_and_set (lo_as_atomic range) lo_before lo_after then begin
per_fiber.lo <- lo_before;
per_fiber.hi <- Int.min r.hi lo_after;
while per_fiber.lo < per_fiber.hi do
let i = per_fiber.lo in
per_fiber.lo <- i + 1;
action i
done;
for_out range per_fiber action
end
else begin
(* Contention, bail out... *)
match r.parent with
| Empty -> ()
| Range _ as range -> for_out range per_fiber action
end
end
else
match r.parent with
| Empty -> ()
| Range _ as range -> for_out range per_fiber action

let rec for_in bundle (Range r as range : [ `Range ] tdt) per_fiber action =
let lo_before = Atomic.get (lo_as_atomic range) in
let n = r.hi - lo_before in
if n <= 1 then for_out range per_fiber action
else
let lo_after = lo_before + (n asr 1) in
if Atomic.compare_and_set (lo_as_atomic range) lo_before lo_after then begin
Bundle.fork bundle (fun () -> for_in_enter bundle range action);
let child = Range { _lo = lo_before; hi = lo_after; parent = range } in
for_in bundle child per_fiber action
end
else for_in bundle range per_fiber action

and for_in_enter bundle range action =
let per_fiber = { lo = 0; hi = 0 } in
let effc (type a) :
a Effect.t -> ((a, _) Effect.Deep.continuation -> _) option = function
| Fiber.Spawn _ | Fiber.Current | Computation.Cancel_after _ -> None
| _ ->
(* Might be blocking, so fork any remaining work to another fiber. *)
if per_fiber.lo < per_fiber.hi then begin
let range =
Range { _lo = per_fiber.lo; hi = per_fiber.hi; parent = Empty }
in
per_fiber.lo <- per_fiber.hi;
Bundle.fork bundle (fun () -> for_in_enter bundle range action)
end;
None
in
let handler = Effect.Deep.{ effc } in
Effect.Deep.try_with (for_in bundle range per_fiber) action handler

let for_n n action =
if 0 < n then
if n = 1 then action 0
else
let range = Range { _lo = 0; hi = n; parent = Empty } in
Bundle.join_after @@ fun bundle -> for_in_enter bundle range action
1 change: 1 addition & 0 deletions test/dune
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@
(libraries
alcotest
picos
picos.domain
picos_aux.mpscq
picos_std.finally
picos_std.structured
Expand Down
17 changes: 17 additions & 0 deletions test/test_structured.ml
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,22 @@ let test_race_any () =
(* This is non-deterministic and may need to changed if flaky *)
assert (Atomic.get winner = 1)

let test_for_n_basic () =
Test_scheduler.run ~max_domains:(Picos_domain.recommended_domain_count ())
@@ fun () ->
for n = 0 to 128 do
let bytes = Bytes.create n in
for i = 0 to n - 1 do
Bytes.set bytes i (Char.chr 0)
done;
Run.for_n n (fun i ->
if Random.bool () then Control.yield ();
Bytes.set bytes i (Char.chr (Char.code (Bytes.get bytes i) + 1)));
for i = 0 to n - 1 do
assert (Bytes.get bytes i = Char.chr 1)
done
done

let () =
[
( "Bundle",
Expand Down Expand Up @@ -286,6 +302,7 @@ let () =
Alcotest.test_case "any and all errors" `Quick test_any_and_all_errors;
Alcotest.test_case "any and all returns" `Quick test_any_and_all_returns;
Alcotest.test_case "race any" `Quick test_race_any;
Alcotest.test_case "for_n basic" `Quick test_for_n_basic;
] );
]
|> Alcotest.run "Picos_structured"

0 comments on commit b0ce1fa

Please sign in to comment.