Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Be able to perform unhandled effects #11

Merged
merged 4 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions lib/miou.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
let failwith fmt = Format.kasprintf failwith fmt
let str fmt = Format.kasprintf Fun.id fmt

external apply : ('a -> 'b) -> 'a -> 'b = "%apply"
external reraise : exn -> unit = "%reraise"

module Queue = Queue
Expand Down Expand Up @@ -331,16 +332,15 @@ and pool = {

and continue = Continue : Syscall_uid.t * (unit -> unit) -> continue
and events = { select: unit -> continue list; interrupt: unit -> unit }
and effect = Effect : 'a Effect.t -> effect
and handler = { handler: 'u 'v. ('u -> 'v) -> 'u -> 'v } [@@unboxed]
and uid = Syscall_uid.t

type _ Effect.t += Syscall : (unit -> 'a) -> 'a syscall Effect.t
type _ Effect.t += Suspend : 'a syscall -> 'a Effect.t
type _ Effect.t += Syscall_exists : Syscall_uid.t -> bool Effect.t

let dummy_events = { select= Fun.const []; interrupt= ignore }

exception Invalid_effect of effect
let dummy_handler = { handler= apply }

module Domain = struct
module Uid = Domain_uid
Expand Down Expand Up @@ -769,9 +769,7 @@ module Domain = struct
else k (State.Fail Not_a_child)
| Suspend _ -> k State.Intr
| Ownership action -> ownership current k action
| effect ->
Logs.err (fun m -> m "Unhandled effect");
k (State.Fail (Invalid_effect (Effect effect)))
| effect -> k (State.None effect)
in
{ State.perform }

Expand Down Expand Up @@ -857,6 +855,11 @@ module Domain = struct
| State.Suspended _ as state ->
let state = invariant prm state in
add_task domain (Suspended (prm, state))
| State.Unhandled _ as state ->
Logs.debug (fun m ->
m "%a suspended due to unhandled effect" Promise.pp prm);
let state = invariant prm state in
add_task domain (Suspended (prm, state))

let transfer_system_task pool domain (Continue (uid, fn0)) =
match Hashtbl.find domain.system_tasks uid with
Expand Down Expand Up @@ -951,7 +954,7 @@ module Domain = struct
false
with Yes -> true

let run pool domain =
let run pool domain () =
match Heapq.pop_minimum domain.tasks with
| exception Heapq.Empty ->
if system_tasks_suspended domain then
Expand Down Expand Up @@ -995,7 +998,7 @@ module Pool = struct
goes back to sleep until the next signal. Domains can communicate with
[dom0] (the launcher) by signaling that all domains are dormant ([idle]).
Finally, [dom0] can communicate with the domains asking to stop. *)
let worker pool domain =
let worker pool domain _ =
let exception Exit in
try
while true do
Expand All @@ -1007,7 +1010,7 @@ module Pool = struct
transfer_all_tasks pool domain;
pool.working_counter <- pool.working_counter + 1;
Mutex.unlock pool.mutex;
Domain.run pool domain;
Domain.run pool domain ();
Mutex.lock pool.mutex;
pool.working_counter <- pool.working_counter - 1;
if (not pool.stop) && Int.equal pool.working_counter 0 then
Expand Down Expand Up @@ -1055,6 +1058,7 @@ module Pool = struct
wait pool

let make ?(quanta = 2) ?(g = Random.State.make_self_init ())
?(handler = dummy_handler)
?(domains = max 0 (Stdlib.Domain.recommended_domain_count () - 1)) events
=
let domains = List.init domains @@ fun _ -> Domain.make ~quanta ~g events in
Expand All @@ -1070,7 +1074,11 @@ module Pool = struct
; domains
}
in
let spawn domain = Stdlib.Domain.spawn @@ fun () -> worker pool domain in
(* NOTE(dinosaure): we apply the user's handler here but we probably use it
when we call [Domain.run] as [dom0] does. *)
let spawn domain =
Stdlib.Domain.spawn (handler.handler (worker pool domain))
in
(pool, List.map spawn domains)
end

Expand Down Expand Up @@ -1206,14 +1214,15 @@ let await_only_domains dom0 =
else true

let run ?(quanta = quanta) ?(events = Fun.const dummy_events)
?(g = Random.State.make_self_init ()) ?domains fn =
?(g = Random.State.make_self_init ()) ?domains ?(handler = dummy_handler) fn
=
Domain.Uid.reset ();
let dom0 = Domain.make ~quanta ~g events in
let prm0 = Promise.make ~resources:[] ~runner:dom0.uid () in
Domain.add_task dom0 (Arrived (prm0, fn));
let pool, domains = Pool.make ~quanta ~g ?domains events in
let pool, domains = Pool.make ~quanta ~g ?domains ~handler events in
while Promise.is_pending prm0 do
Domain.run pool dom0;
handler.handler (Domain.run pool dom0) ();
if await_only_domains dom0 && Domain.system_tasks_suspended dom0 = false
then Pool.wait pool
done;
Expand Down
46 changes: 46 additions & 0 deletions lib/miou.mli
Original file line number Diff line number Diff line change
Expand Up @@ -782,10 +782,56 @@ val cancel : 'a t -> unit
which should not affect the opportunity for other concurrent tasks to run.
*)

type handler = { handler: 'a 'b. ('a -> 'b) -> 'a -> 'b } [@@unboxed]
(** {2 Composition.}
It is possible to compose Miou with a library that also generates effects.
The user can compose in 2 ways:
- simply apply his/her effect manager with its function in Miou
[Miou.call{,_cc} @@ fun () -> handler fn ()]
- inform Miou of an effect handler that should comply with the {b "quanta"
rule}
Remember that Miou suspends a task as soon as it emits an effect. The second
case can be interesting in order to always ensure the availability of the
application regardless effect handlers. Here's a basic example of how to
compose.
{[
type _ Effect.t += Foo : unit Effect.t
let handler fn v =
let open Effect.Deep in
let retc = Fun.id and exnc = raise in
let effc : type c. c Effect.t -> ((c, 'a) continuation -> 'b) option =
function Foo -> Some (fun k -> continue k ())
| _ -> None in
match_with fn v { retc; exnc; effc; }
let () = Miou.run ~handler:{ Miou.handler } @@ fun () ->
let prm = Miou.call @@ fun () -> Effect.perform Foo in
Miou.await_exn prm
]}
The user can also compose several effects managers:
{[
# let compose { Miou.handler= a } { Miou.handler= b } =
{ Miou.handler= fun fn v -> (a (b fn)) v } ;;
val compose : Miou.handler -> Miou.handler -> Miou.handler = <fun>
]}
{b NOTE}: We want to reiterate that such a composition implies that the
effect will not be executed {i immediately}: the task will be suspended and
the effect will be produced only as soon as the said task has its execution
slot.
*)

val run :
?quanta:int
-> ?events:(Domain.Uid.t -> events)
-> ?g:Random.State.t
-> ?domains:int
-> ?handler:handler
-> (unit -> 'a)
-> 'a
23 changes: 20 additions & 3 deletions lib/state.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ type ('a, 'b) continuation = ('a, 'b) Effect.Shallow.continuation
type 'a t =
| Finished of ('a, exn) result
| Suspended : ('a, 'b) continuation * 'a Effect.t -> 'b t
| Unhandled : ('a, 'b) continuation * 'a -> 'b t

let effc eff k = Suspended (k, eff)

Expand Down Expand Up @@ -36,6 +37,9 @@ let discontinue_with : ('c, 'a) continuation -> exn -> 'a t =
let suspended_with : ('c, 'a) continuation -> 'c Effect.t -> 'a t =
fun k e -> Suspended (k, e)

let unhandled_with : ('c, 'a) continuation -> 'c -> 'a t =
fun k v -> Unhandled (k, v)

let pure res = Finished res

let make k v =
Expand All @@ -47,6 +51,7 @@ type 'a step =
| Fail of exn
| Intr
| Cont : 'a Effect.t -> 'a step
| None : 'a Effect.t -> 'a step
| Yield : unit step

type ('a, 'b) k = ('a step -> 'b t) -> 'a Effect.t -> 'b t
Expand All @@ -55,13 +60,17 @@ type perform = { perform: 'a 'b. ('a, 'b) k } [@@unboxed]
let once : type a. perform:perform -> a t -> a t =
fun ~perform -> function
| Finished _ as finished -> finished
| Unhandled (fn, v) -> continue_with fn v
| Suspended (fn, e) as state ->
let k : type c. (c, a) continuation -> c step -> a t =
fun fn -> function
| Send v -> continue_with fn v
| Fail exn -> discontinue_with fn exn
| Intr -> state
| Cont e -> suspended_with fn e
| None eff ->
let v = Effect.perform eff in
unhandled_with fn v
| Yield -> continue_with fn ()
in
perform.perform (k fn) e
Expand All @@ -80,15 +89,22 @@ let run : type a. quanta:int -> perform:perform -> a t -> a t =
| Send v -> continue_with fn v
| Fail e -> discontinue_with fn e
| Cont e -> suspended_with fn e
| None e ->
let v = Effect.perform e in
unhandled_with fn v
| Intr -> raise_notrace Break
| Yield -> raise_notrace (Yield (continue_with fn ()))
in
let quanta = ref quanta and state = ref state in
try
while !quanta > 0 && is_finished !state = false do
let (Suspended (fn, e)) = !state in
state := perform.perform (k fn) e;
quanta := !quanta - 1
match !state with
| Suspended (fn, e) ->
state := perform.perform (k fn) e;
quanta := !quanta - 1
| Unhandled (fn, v) ->
state := continue_with fn v;
quanta := !quanta - 1
done;
!state
with
Expand All @@ -102,3 +118,4 @@ let run : type a. quanta:int -> perform:perform -> a t -> a t =
let fail ~exn = function
| Finished _ -> Finished (Error exn)
| Suspended (k, _) -> discontinue_with k exn
| Unhandled (k, _) -> discontinue_with k exn
2 changes: 2 additions & 0 deletions lib/state.mli
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type ('a, 'b) continuation
type 'a t = private
| Finished of ('a, exn) result
| Suspended : ('a, 'b) continuation * 'a Effect.t -> 'b t
| Unhandled : ('a, 'b) continuation * 'a -> 'b t

val make : ('a -> 'b) -> 'a -> 'b t
(** [make fn value] makes a new {i function state} by executing the function
Expand All @@ -99,6 +100,7 @@ type 'a step =
| Fail of exn
| Intr
| Cont : 'a Effect.t -> 'a step
| None : 'a Effect.t -> 'a step
| Yield : unit step

type ('a, 'b) k = ('a step -> 'b t) -> 'a Effect.t -> 'b t
Expand Down
1 change: 1 addition & 0 deletions test/core/core.t
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@
Fatal error: exception Miou.Still_has_children
[2]
$ ./t25.exe
$ ./t28.exe
8 changes: 7 additions & 1 deletion test/core/dune
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@
(modules t27)
(libraries miou))

(executable
(name t28)
(modules t28)
(libraries miou))

(cram
(package miou)
(deps
Expand Down Expand Up @@ -155,4 +160,5 @@
t22.exe
t23.exe
t24.exe
t25.exe))
t25.exe
t28.exe))
50 changes: 50 additions & 0 deletions test/core/t28.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
type _ Effect.t += Foo : unit Effect.t

let prgm () =
let prm = Miou.call @@ fun () -> Effect.perform Foo in
Miou.await_exn prm

let handler_foo fn v =
let open Effect.Deep in
let retc = Fun.id in
let exnc = raise in
let effc : type c. c Effect.t -> ((c, 'a) continuation -> 'b) option =
function
| Foo -> Some (fun k -> continue k ())
| _ -> None
in
match_with fn v { retc; exnc; effc }

let () =
Miou.run ~handler:{ Miou.handler= handler_foo } @@ fun () ->
let prm = Miou.call @@ fun () -> Effect.perform Foo in
Miou.await_exn prm

let () =
Miou.run ~handler:{ Miou.handler= handler_foo } @@ fun () ->
let prm = Miou.call_cc @@ fun () -> Effect.perform Foo in
Miou.await_exn prm

type _ Effect.t += Bar : unit Effect.t

let handler_bar fn v =
let open Effect.Deep in
let retc = Fun.id in
let exnc = raise in
let effc : type c. c Effect.t -> ((c, 'a) continuation -> 'b) option =
function
| Bar -> Some (fun k -> continue k ())
| _ -> None
in
match_with fn v { retc; exnc; effc }

let ( <.> ) { Miou.handler= foo } { Miou.handler= bar } =
{ Miou.handler= (fun fn v -> (foo (bar fn)) v) }

let () =
let foo = { Miou.handler= handler_foo } in
let bar = { Miou.handler= handler_bar } in
Miou.run ~handler:(foo <.> bar) @@ fun () ->
let prm0 = Miou.call @@ fun () -> Effect.perform Foo in
let prm1 = Miou.call @@ fun () -> Effect.perform Bar in
Miou.await_exn prm0; Miou.await_exn prm1