Skip to content

Commit

Permalink
Merge pull request #11 from robur-coop/handle-other-effects
Browse files Browse the repository at this point in the history
Be able to perform unhandled effects
  • Loading branch information
dinosaure authored Sep 19, 2023
2 parents 2bf366a + 403b3ab commit 664e8e2
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 17 deletions.
35 changes: 22 additions & 13 deletions lib/miou.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
let failwith fmt = Format.kasprintf failwith fmt
let str fmt = Format.kasprintf Fun.id fmt

external apply : ('a -> 'b) -> 'a -> 'b = "%apply"
external reraise : exn -> unit = "%reraise"

module Queue = Queue
Expand Down Expand Up @@ -331,16 +332,15 @@ and pool = {

and continue = Continue : Syscall_uid.t * (unit -> unit) -> continue
and events = { select: unit -> continue list; interrupt: unit -> unit }
and effect = Effect : 'a Effect.t -> effect
and handler = { handler: 'u 'v. ('u -> 'v) -> 'u -> 'v } [@@unboxed]
and uid = Syscall_uid.t

type _ Effect.t += Syscall : (unit -> 'a) -> 'a syscall Effect.t
type _ Effect.t += Suspend : 'a syscall -> 'a Effect.t
type _ Effect.t += Syscall_exists : Syscall_uid.t -> bool Effect.t

let dummy_events = { select= Fun.const []; interrupt= ignore }

exception Invalid_effect of effect
let dummy_handler = { handler= apply }

module Domain = struct
module Uid = Domain_uid
Expand Down Expand Up @@ -769,9 +769,7 @@ module Domain = struct
else k (State.Fail Not_a_child)
| Suspend _ -> k State.Intr
| Ownership action -> ownership current k action
| effect ->
Logs.err (fun m -> m "Unhandled effect");
k (State.Fail (Invalid_effect (Effect effect)))
| effect -> k (State.None effect)
in
{ State.perform }

Expand Down Expand Up @@ -857,6 +855,11 @@ module Domain = struct
| State.Suspended _ as state ->
let state = invariant prm state in
add_task domain (Suspended (prm, state))
| State.Unhandled _ as state ->
Logs.debug (fun m ->
m "%a suspended due to unhandled effect" Promise.pp prm);
let state = invariant prm state in
add_task domain (Suspended (prm, state))

let transfer_system_task pool domain (Continue (uid, fn0)) =
match Hashtbl.find domain.system_tasks uid with
Expand Down Expand Up @@ -951,7 +954,7 @@ module Domain = struct
false
with Yes -> true

let run pool domain =
let run pool domain () =
match Heapq.pop_minimum domain.tasks with
| exception Heapq.Empty ->
if system_tasks_suspended domain then
Expand Down Expand Up @@ -995,7 +998,7 @@ module Pool = struct
goes back to sleep until the next signal. Domains can communicate with
[dom0] (the launcher) by signaling that all domains are dormant ([idle]).
Finally, [dom0] can communicate with the domains asking to stop. *)
let worker pool domain =
let worker pool domain _ =
let exception Exit in
try
while true do
Expand All @@ -1007,7 +1010,7 @@ module Pool = struct
transfer_all_tasks pool domain;
pool.working_counter <- pool.working_counter + 1;
Mutex.unlock pool.mutex;
Domain.run pool domain;
Domain.run pool domain ();
Mutex.lock pool.mutex;
pool.working_counter <- pool.working_counter - 1;
if (not pool.stop) && Int.equal pool.working_counter 0 then
Expand Down Expand Up @@ -1055,6 +1058,7 @@ module Pool = struct
wait pool

let make ?(quanta = 2) ?(g = Random.State.make_self_init ())
?(handler = dummy_handler)
?(domains = max 0 (Stdlib.Domain.recommended_domain_count () - 1)) events
=
let domains = List.init domains @@ fun _ -> Domain.make ~quanta ~g events in
Expand All @@ -1070,7 +1074,11 @@ module Pool = struct
; domains
}
in
let spawn domain = Stdlib.Domain.spawn @@ fun () -> worker pool domain in
(* NOTE(dinosaure): we apply the user's handler here but we probably use it
when we call [Domain.run] as [dom0] does. *)
let spawn domain =
Stdlib.Domain.spawn (handler.handler (worker pool domain))
in
(pool, List.map spawn domains)
end

Expand Down Expand Up @@ -1206,14 +1214,15 @@ let await_only_domains dom0 =
else true

let run ?(quanta = quanta) ?(events = Fun.const dummy_events)
?(g = Random.State.make_self_init ()) ?domains fn =
?(g = Random.State.make_self_init ()) ?domains ?(handler = dummy_handler) fn
=
Domain.Uid.reset ();
let dom0 = Domain.make ~quanta ~g events in
let prm0 = Promise.make ~resources:[] ~runner:dom0.uid () in
Domain.add_task dom0 (Arrived (prm0, fn));
let pool, domains = Pool.make ~quanta ~g ?domains events in
let pool, domains = Pool.make ~quanta ~g ?domains ~handler events in
while Promise.is_pending prm0 do
Domain.run pool dom0;
handler.handler (Domain.run pool dom0) ();
if await_only_domains dom0 && Domain.system_tasks_suspended dom0 = false
then Pool.wait pool
done;
Expand Down
46 changes: 46 additions & 0 deletions lib/miou.mli
Original file line number Diff line number Diff line change
Expand Up @@ -782,10 +782,56 @@ val cancel : 'a t -> unit
which should not affect the opportunity for other concurrent tasks to run.
*)

type handler = { handler: 'a 'b. ('a -> 'b) -> 'a -> 'b } [@@unboxed]
(** {2 Composition.}
It is possible to compose Miou with a library that also generates effects.
The user can compose in 2 ways:
- simply apply his/her effect manager with its function in Miou
[Miou.call{,_cc} @@ fun () -> handler fn ()]
- inform Miou of an effect handler that should comply with the {b "quanta"
rule}
Remember that Miou suspends a task as soon as it emits an effect. The second
case can be interesting in order to always ensure the availability of the
application regardless effect handlers. Here's a basic example of how to
compose.
{[
type _ Effect.t += Foo : unit Effect.t
let handler fn v =
let open Effect.Deep in
let retc = Fun.id and exnc = raise in
let effc : type c. c Effect.t -> ((c, 'a) continuation -> 'b) option =
function Foo -> Some (fun k -> continue k ())
| _ -> None in
match_with fn v { retc; exnc; effc; }
let () = Miou.run ~handler:{ Miou.handler } @@ fun () ->
let prm = Miou.call @@ fun () -> Effect.perform Foo in
Miou.await_exn prm
]}
The user can also compose several effects managers:
{[
# let compose { Miou.handler= a } { Miou.handler= b } =
{ Miou.handler= fun fn v -> (a (b fn)) v } ;;
val compose : Miou.handler -> Miou.handler -> Miou.handler = <fun>
]}
{b NOTE}: We want to reiterate that such a composition implies that the
effect will not be executed {i immediately}: the task will be suspended and
the effect will be produced only as soon as the said task has its execution
slot.
*)

val run :
?quanta:int
-> ?events:(Domain.Uid.t -> events)
-> ?g:Random.State.t
-> ?domains:int
-> ?handler:handler
-> (unit -> 'a)
-> 'a
23 changes: 20 additions & 3 deletions lib/state.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ type ('a, 'b) continuation = ('a, 'b) Effect.Shallow.continuation
type 'a t =
| Finished of ('a, exn) result
| Suspended : ('a, 'b) continuation * 'a Effect.t -> 'b t
| Unhandled : ('a, 'b) continuation * 'a -> 'b t

let effc eff k = Suspended (k, eff)

Expand Down Expand Up @@ -36,6 +37,9 @@ let discontinue_with : ('c, 'a) continuation -> exn -> 'a t =
let suspended_with : ('c, 'a) continuation -> 'c Effect.t -> 'a t =
fun k e -> Suspended (k, e)

let unhandled_with : ('c, 'a) continuation -> 'c -> 'a t =
fun k v -> Unhandled (k, v)

let pure res = Finished res

let make k v =
Expand All @@ -47,6 +51,7 @@ type 'a step =
| Fail of exn
| Intr
| Cont : 'a Effect.t -> 'a step
| None : 'a Effect.t -> 'a step
| Yield : unit step

type ('a, 'b) k = ('a step -> 'b t) -> 'a Effect.t -> 'b t
Expand All @@ -55,13 +60,17 @@ type perform = { perform: 'a 'b. ('a, 'b) k } [@@unboxed]
let once : type a. perform:perform -> a t -> a t =
fun ~perform -> function
| Finished _ as finished -> finished
| Unhandled (fn, v) -> continue_with fn v
| Suspended (fn, e) as state ->
let k : type c. (c, a) continuation -> c step -> a t =
fun fn -> function
| Send v -> continue_with fn v
| Fail exn -> discontinue_with fn exn
| Intr -> state
| Cont e -> suspended_with fn e
| None eff ->
let v = Effect.perform eff in
unhandled_with fn v
| Yield -> continue_with fn ()
in
perform.perform (k fn) e
Expand All @@ -80,15 +89,22 @@ let run : type a. quanta:int -> perform:perform -> a t -> a t =
| Send v -> continue_with fn v
| Fail e -> discontinue_with fn e
| Cont e -> suspended_with fn e
| None e ->
let v = Effect.perform e in
unhandled_with fn v
| Intr -> raise_notrace Break
| Yield -> raise_notrace (Yield (continue_with fn ()))
in
let quanta = ref quanta and state = ref state in
try
while !quanta > 0 && is_finished !state = false do
let (Suspended (fn, e)) = !state in
state := perform.perform (k fn) e;
quanta := !quanta - 1
match !state with
| Suspended (fn, e) ->
state := perform.perform (k fn) e;
quanta := !quanta - 1
| Unhandled (fn, v) ->
state := continue_with fn v;
quanta := !quanta - 1
done;
!state
with
Expand All @@ -102,3 +118,4 @@ let run : type a. quanta:int -> perform:perform -> a t -> a t =
let fail ~exn = function
| Finished _ -> Finished (Error exn)
| Suspended (k, _) -> discontinue_with k exn
| Unhandled (k, _) -> discontinue_with k exn
2 changes: 2 additions & 0 deletions lib/state.mli
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type ('a, 'b) continuation
type 'a t = private
| Finished of ('a, exn) result
| Suspended : ('a, 'b) continuation * 'a Effect.t -> 'b t
| Unhandled : ('a, 'b) continuation * 'a -> 'b t

val make : ('a -> 'b) -> 'a -> 'b t
(** [make fn value] makes a new {i function state} by executing the function
Expand All @@ -99,6 +100,7 @@ type 'a step =
| Fail of exn
| Intr
| Cont : 'a Effect.t -> 'a step
| None : 'a Effect.t -> 'a step
| Yield : unit step

type ('a, 'b) k = ('a step -> 'b t) -> 'a Effect.t -> 'b t
Expand Down
1 change: 1 addition & 0 deletions test/core/core.t
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@
Fatal error: exception Miou.Still_has_children
[2]
$ ./t25.exe
$ ./t28.exe
8 changes: 7 additions & 1 deletion test/core/dune
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@
(modules t27)
(libraries miou))

(executable
(name t28)
(modules t28)
(libraries miou))

(cram
(package miou)
(deps
Expand Down Expand Up @@ -155,4 +160,5 @@
t22.exe
t23.exe
t24.exe
t25.exe))
t25.exe
t28.exe))
50 changes: 50 additions & 0 deletions test/core/t28.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
type _ Effect.t += Foo : unit Effect.t

let prgm () =
let prm = Miou.call @@ fun () -> Effect.perform Foo in
Miou.await_exn prm

let handler_foo fn v =
let open Effect.Deep in
let retc = Fun.id in
let exnc = raise in
let effc : type c. c Effect.t -> ((c, 'a) continuation -> 'b) option =
function
| Foo -> Some (fun k -> continue k ())
| _ -> None
in
match_with fn v { retc; exnc; effc }

let () =
Miou.run ~handler:{ Miou.handler= handler_foo } @@ fun () ->
let prm = Miou.call @@ fun () -> Effect.perform Foo in
Miou.await_exn prm

let () =
Miou.run ~handler:{ Miou.handler= handler_foo } @@ fun () ->
let prm = Miou.call_cc @@ fun () -> Effect.perform Foo in
Miou.await_exn prm

type _ Effect.t += Bar : unit Effect.t

let handler_bar fn v =
let open Effect.Deep in
let retc = Fun.id in
let exnc = raise in
let effc : type c. c Effect.t -> ((c, 'a) continuation -> 'b) option =
function
| Bar -> Some (fun k -> continue k ())
| _ -> None
in
match_with fn v { retc; exnc; effc }

let ( <.> ) { Miou.handler= foo } { Miou.handler= bar } =
{ Miou.handler= (fun fn v -> (foo (bar fn)) v) }

let () =
let foo = { Miou.handler= handler_foo } in
let bar = { Miou.handler= handler_bar } in
Miou.run ~handler:(foo <.> bar) @@ fun () ->
let prm0 = Miou.call @@ fun () -> Effect.perform Foo in
let prm1 = Miou.call @@ fun () -> Effect.perform Bar in
Miou.await_exn prm0; Miou.await_exn prm1

0 comments on commit 664e8e2

Please sign in to comment.