Skip to content

Commit

Permalink
Add more structured Run operations
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Nov 28, 2024
1 parent 6ccc15b commit 4a0ce63
Show file tree
Hide file tree
Showing 12 changed files with 263 additions and 36 deletions.
1 change: 1 addition & 0 deletions bench/bench_run.ocaml4.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
let run_suite ~budgetf:_ = []
28 changes: 28 additions & 0 deletions bench/bench_run.ocaml5.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
open Multicore_bench
open Picos_std_structured
module Multififo = Picos_mux_multififo

let run_one_multififo ~budgetf ~n_domains ~n () =
let context = ref (Obj.magic ()) in

let before _ = context := Multififo.context () in
let init _ = !context in
let work i context =
if i <> 0 then Multififo.runner_on_this_thread context
else ignore @@ Multififo.run ~context @@ fun () -> Run.for_n n ignore
in

let config =
Printf.sprintf "%d mfifo%s, run_n %d" n_domains
(if n_domains = 1 then "" else "s")
n
in
Times.record ~budgetf ~n_domains ~before ~init ~work ()
|> Times.to_thruput_metrics ~n ~singular:"ignore" ~config

let run_suite ~budgetf =
Util.cross [ 1; 2; 4; 8 ]
[ 100; 1_000; 10_000; 100_000; 1_000_000; 10_000_000 ]
|> List.concat_map @@ fun (n_domains, n) ->
if Picos_domain.recommended_domain_count () < n_domains then []
else run_one_multififo ~budgetf ~n_domains ~n ()
6 changes: 6 additions & 0 deletions bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
(run %{test} -brief "Picos binaries")
(run %{test} -brief "Bounded_q with Picos_sync")
(run %{test} -brief "Memory usage")
(run %{test} -brief "Picos_std_structured.Run")
;;
))
(foreign_stubs
Expand All @@ -49,6 +50,11 @@
from
(picos_mux.fifo -> scheduler.ocaml5.ml)
(picos_mux.thread -> scheduler.ocaml4.ml))
(select
bench_run.ml
from
(picos_mux.multififo -> bench_run.ocaml5.ml)
(-> bench_run.ocaml4.ml))
(select
bench_fib.ml
from
Expand Down
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ let benchmarks =
("Picos binaries", Bench_binaries.run_suite);
("Bounded_q with Picos_sync", Bench_bounded_q.run_suite);
("Memory usage", Bench_memory.run_suite);
("Picos_std_structured.Run", Bench_run.run_suite);
]

let () = Multicore_bench.Cmd.run ~benchmarks ()
16 changes: 9 additions & 7 deletions lib/picos_std.structured/control.ml
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ module Errors = struct
| [ (exn, bt) ] -> Printexc.raise_with_backtrace exn bt
| exn_bts -> check exn_bts []

let rec push t exn bt backoff =
let before = Atomic.get t in
let after = (exn, bt) :: before in
if not (Atomic.compare_and_set t before after) then
push t exn bt (Backoff.once backoff)

let push t exn bt = push t exn bt Backoff.default
let push t exn bt =
let backoff = ref Backoff.default in
while
let before = Atomic.get t in
let after = (exn, bt) :: before in
not (Atomic.compare_and_set t before after)
do
backoff := Backoff.once !backoff
done
end

let raise_if_canceled () = Fiber.check (Fiber.current ())
Expand Down
12 changes: 12 additions & 0 deletions lib/picos_std.structured/dune
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
(rule
(enabled_if
(<= 5.0.0 %{ocaml_version}))
(action
(copy for.ocaml5.ml for.ml)))

(rule
(enabled_if
(< %{ocaml_version} 5.0.0))
(action
(copy for.ocaml4.ml for.ml)))

(library
(name picos_std_structured)
(public_name picos_std.structured)
Expand Down
48 changes: 48 additions & 0 deletions lib/picos_std.structured/for.ocaml4.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
type _ tdt =
| Empty : [> `Empty ] tdt
| Range : {
mutable lo : int;
hi : int;
parent : [ `Empty | `Range ] tdt;
}
-> [> `Range ] tdt

let[@poll error] cas_lo (Range r : [ `Range ] tdt) before after =
r.lo == before
&& begin
r.lo <- after;
true
end

let rec for_out (Range r as range : [ `Range ] tdt) action =
let lo_before = r.lo in
let n = r.hi - lo_before in
if 0 < n then begin
let lo_after = lo_before + 1 in
if cas_lo range lo_before lo_after then begin
try action lo_before with Control.Terminate -> ()
end;
for_out range action
end
else
match r.parent with Empty -> () | Range _ as range -> for_out range action

let rec for_in bundle (Range r as range : [ `Range ] tdt) action =
let lo_before = r.lo in
let n = r.hi - lo_before in
if n <= 1 then for_out range action
else
let lo_after = lo_before + (n asr 1) in
if cas_lo range lo_before lo_after then begin
Bundle.fork bundle (fun () -> for_in bundle range action);
let child = Range { lo = lo_before; hi = lo_after; parent = range } in
for_in bundle child action
end
else for_in bundle range action

let for_n n action =
if 0 < n then
if n = 1 then try action 0 with Control.Terminate -> ()
else
let range = Range { lo = 0; hi = n; parent = Empty } in
Bundle.join_after @@ fun bundle -> for_in bundle range action
78 changes: 78 additions & 0 deletions lib/picos_std.structured/for.ocaml5.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
open Picos

type per_fiber = { mutable lo : int; mutable hi : int }

type _ tdt =
| Empty : [> `Empty ] tdt
| Range : {
mutable _lo : int;
hi : int;
parent : [ `Empty | `Range ] tdt;
}
-> [> `Range ] tdt

external lo_as_atomic : [ `Range ] tdt -> int Atomic.t = "%identity"

let rec for_out t (Range r as range : [ `Range ] tdt) per_fiber action =
let lo_before = Atomic.get (lo_as_atomic range) in
let n = r.hi - lo_before in
if 0 < n then begin
let lo_after = lo_before + 1 + (n asr 1) in
if Atomic.compare_and_set (lo_as_atomic range) lo_before lo_after then begin
per_fiber.lo <- lo_before;
per_fiber.hi <- lo_after;
while per_fiber.lo < per_fiber.hi do
try
while per_fiber.lo < per_fiber.hi do
let i = per_fiber.lo in
per_fiber.lo <- i + 1;
action i
done
with exn -> Bundle.error t exn (Printexc.get_raw_backtrace ())
done
end;
for_out t range per_fiber action
end
else
match r.parent with
| Empty -> ()
| Range _ as range -> for_out t range per_fiber action

let rec for_in t (Range r as range : [ `Range ] tdt) per_fiber action =
let lo_before = Atomic.get (lo_as_atomic range) in
let n = r.hi - lo_before in
if n <= 1 then for_out t range per_fiber action
else
let lo_after = lo_before + (n asr 1) in
if Atomic.compare_and_set (lo_as_atomic range) lo_before lo_after then begin
Bundle.fork t (fun () -> for_in_enter t range action);
let child = Range { _lo = lo_before; hi = lo_after; parent = range } in
for_in t child per_fiber action
end
else for_in t range per_fiber action

and for_in_enter bundle range action =
let per_fiber = { lo = 0; hi = 0 } in
let effc (type a) :
a Effect.t -> ((a, _) Effect.Deep.continuation -> _) option = function
| Fiber.Spawn _ | Fiber.Current | Computation.Cancel_after _ -> None
| _ ->
(* Might be blocking, so fork any remaining work to another fiber. *)
if per_fiber.lo < per_fiber.hi then begin
let range =
Range { _lo = per_fiber.lo; hi = per_fiber.hi; parent = Empty }
in
per_fiber.lo <- per_fiber.hi;
Bundle.fork bundle (fun () -> for_in_enter bundle range action)
end;
None
in
let handler = Effect.Deep.{ effc } in
Effect.Deep.try_with (for_in bundle range per_fiber) action handler

let for_n n action =
if 0 < n then
if n = 1 then try action 0 with Control.Terminate -> ()
else
let range = Range { _lo = 0; hi = n; parent = Empty } in
Bundle.join_after @@ fun t -> for_in_enter t range action
80 changes: 51 additions & 29 deletions lib/picos_std.structured/picos_std_structured.mli
Original file line number Diff line number Diff line change
Expand Up @@ -349,43 +349,41 @@ module Flock : sig
end

module Run : sig
(** Operations for running fibers in specific patterns. *)
(** Operations for running actions concurrently.
⚠️ In general, when an action expected to return the unit value [()],
started by an operation in this module raises an unhandled exception,
other than {{!Control.Terminate} [Terminate]}, which is not counted as an
error, the whole operation will be canceled and the exception will be
raised.
⚠️ The operations in this module run their actions such that any action may
block to await without preventing other actions from being run. At the
limit every action may need to be run in a distinct fiber. However, it is
not guaranteed that every action always runs in a distinct fiber. The
actual number of fibers used can be much less than the number of actions
executed in case the actions do not block, complete quickly, and/or the
scheduler doesn't provide parallelism.
⚠️ The operations in this module do not guaranteed that any of the actions
are executed. In particular, after any action raises an unhandled
exception or after the main fiber is canceled, the actions that have not
yet started may be skipped entirely. *)

val all : (unit -> unit) list -> unit
(** [all actions] starts the actions as separate fibers and waits until they
all complete or one of them raises an unhandled exception other than
{{!Control.Terminate} [Terminate]}, which is not counted as an error,
after which the remaining fibers will be canceled.
(** [all actions] starts the actions and waits until they all complete.
⚠️ One of the actions may be run on the current fiber.
⚠️ It is not guaranteed that any of the actions in the list are called. In
particular, after any action raises an unhandled exception or after the
main fiber is canceled, the actions that have not yet started may be
skipped entirely.
[all] is roughly equivalent to
[all] is roughly equivalent to:
{[
let all actions =
Bundle.join_after @@ fun bundle ->
List.iter (Bundle.fork bundle) actions
]}
but treats the list of actions as a single computation. *)
]} *)

val any : (unit -> unit) list -> unit
(** [any actions] starts the actions as separate fibers and waits until one of
them completes or raises an unhandled exception other than
{{!Control.Terminate} [Terminate]}, which is not counted as an error,
after which the rest of the started fibers will be canceled.
⚠️ One of the actions may be run on the current fiber.
(** [any actions] starts the actions and waits until one of them completes.
⚠️ It is not guaranteed that any of the actions in the list are called. In
particular, after the first action returns successfully or after any
action raises an unhandled exception or after the main fiber is canceled,
the actions that have not yet started may be skipped entirely.
[any] is roughly equivalent to
[any] is roughly equivalent to:
{[
let any actions =
Bundle.join_after @@ fun bundle ->
Expand All @@ -396,8 +394,32 @@ module Run : sig
action ();
Bundle.terminate bundle
with Control.Terminate -> ()
]}
but treats the list of actions as a single computation. *)
]} *)

val for_n : int -> (int -> unit) -> unit
(** [for_n n action], when [0 < n], starts [action i] for each integer [i] from
[0] to [n-1] and waits until they all complete.
[for_n] is roughly equivalent to:
{[
let for_n n action =
Bundle.join_after @@ fun bundle ->
for i=0 to n-1 do
Bundle.fork bundle @@ fun () ->
action i
done
]} *)

module Array : sig
(** Concurrent operations over arrays. *)

type 'a t = 'a array
(** Type alias for [array]. *)

val iter : ('a -> unit) -> 'a t -> unit
(** [iter action array] starts [action array.(i)] for each index of the
[array] and waits until they all complete. *)
end
end

(** {1 Examples}
Expand Down
11 changes: 11 additions & 0 deletions lib/picos_std.structured/run.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,14 @@ let run actions wrap =

let all actions = run actions wrap_all
let any actions = run actions wrap_any

(* *)

let for_n = For.for_n

module Array = struct
type 'a t = 'a array

let iter action xs =
for_n (Array.length xs) @@ fun i -> action (Array.unsafe_get xs i)
end
1 change: 1 addition & 0 deletions test/dune
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@
(libraries
alcotest
picos
picos.domain
picos_aux.mpscq
picos_std.finally
picos_std.structured
Expand Down
17 changes: 17 additions & 0 deletions test/test_structured.ml
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,22 @@ let test_race_any () =
(* This is non-deterministic and may need to changed if flaky *)
assert (Atomic.get winner = 1)

let test_for_n_basic () =
Test_scheduler.run ~max_domains:(Picos_domain.recommended_domain_count ())
@@ fun () ->
for n = 0 to 128 do
let bytes = Bytes.create n in
for i = 0 to n - 1 do
Bytes.set bytes i (Char.chr 0)
done;
Run.for_n n (fun i ->
if Random.bool () then Control.yield ();
Bytes.set bytes i (Char.chr (Char.code (Bytes.get bytes i) + 1)));
for i = 0 to n - 1 do
assert (Bytes.get bytes i = Char.chr 1)
done
done

let () =
[
( "Bundle",
Expand Down Expand Up @@ -286,6 +302,7 @@ let () =
Alcotest.test_case "any and all errors" `Quick test_any_and_all_errors;
Alcotest.test_case "any and all returns" `Quick test_any_and_all_returns;
Alcotest.test_case "race any" `Quick test_race_any;
Alcotest.test_case "for_n basic" `Quick test_for_n_basic;
] );
]
|> Alcotest.run "Picos_structured"

0 comments on commit 4a0ce63

Please sign in to comment.