Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tweak bundle representation #337

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions lib/picos_std.structured/bundle.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,24 @@ let[@inline never] completed () = invalid_arg "already completed"
type _ tdt =
| Nothing : [> `Nothing ] tdt
| Bundle : {
config : int Atomic.t;
mutable config : int;
bundle : Computation.packed;
errors : Control.Errors.t;
finished : Trigger.t;
}
-> [> `Bundle ] tdt

type t = [ `Bundle ] tdt

external config_as_atomic : t -> int Atomic.t = "%identity"

let config_terminated_bit = 0x01
and config_callstack_mask = 0x3E
and config_callstack_shift = 1
and config_one = 0x40 (* memory runs out before overflow *)

let flock_key : [ `Bundle | `Nothing ] tdt Fiber.FLS.t = Fiber.FLS.create ()

type t = [ `Bundle ] tdt

let terminate ?callstack (Bundle { bundle = Packed bundle; _ } : t) =
Computation.cancel bundle Control.Terminate
(Control.get_callstack_opt callstack)
Expand All @@ -36,8 +38,8 @@ let error ?callstack (Bundle r as t : t) exn bt =
Control.Errors.push r.errors exn bt
end

let decr (Bundle r : t) =
let n = Atomic.fetch_and_add r.config (-config_one) in
let decr (Bundle r as t : t) =
let n = Atomic.fetch_and_add (config_as_atomic t) (-config_one) in
if n < config_one * 2 then begin
let (Packed bundle) = r.bundle in
Computation.cancel bundle Control.Terminate Control.empty_bt;
Expand Down Expand Up @@ -72,9 +74,8 @@ let[@inline never] raised exn t fiber packed canceler outer =
await t fiber packed canceler outer;
Printexc.raise_with_backtrace exn bt

let[@inline never] returned value (Bundle r as t : t) fiber packed canceler
outer =
let config = Atomic.get r.config in
let[@inline never] returned value (t : t) fiber packed canceler outer =
let config = Atomic.get (config_as_atomic t) in
if config land config_terminated_bit <> 0 then begin
let callstack =
let n = (config land config_callstack_mask) lsr config_callstack_shift in
Expand Down Expand Up @@ -108,7 +109,7 @@ let join_after_pass (type a) ?callstack ?on_return (fn : a -> _) (pass : a pass)
Int.min n (config_callstack_mask lsr config_callstack_shift)
lsl config_callstack_shift
in
let config = Atomic.make (config_one lor callstack lor terminated) in
let config = config_one lor callstack lor terminated in
let bundle = Computation.Packed (Computation.create ~mode:`LIFO ()) in
let errors = Control.Errors.create () in
let finished = Trigger.create () in
Expand All @@ -130,10 +131,12 @@ let join_after_pass (type a) ?callstack ?on_return (fn : a -> _) (pass : a pass)
in
join_after_realloc x fn t fiber packed canceler outer

let rec incr (Bundle r as t : t) backoff =
let before = Atomic.get r.config in
let rec incr (t : t) backoff =
let before = Atomic.get (config_as_atomic t) in
if before < config_one then completed ()
else if not (Atomic.compare_and_set r.config before (before + config_one))
else if
not
(Atomic.compare_and_set (config_as_atomic t) before (before + config_one))
then incr t (Backoff.once backoff)

let finish (Bundle { bundle = Packed bundle; _ } as t : t) canceler =
Expand Down Expand Up @@ -214,5 +217,5 @@ let join_after ?callstack ?on_return fn =
let fork t thunk = fork_pass t thunk Arg
let fork_as_promise t thunk = fork_as_promise_pass t thunk Arg

let unsafe_incr (Bundle r : t) =
Atomic.fetch_and_add r.config config_one |> ignore
let unsafe_incr (t : t) =
Atomic.fetch_and_add (config_as_atomic t) config_one |> ignore