From e567fc05cdbe913e22666e518307479815a01a96 Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Thu, 12 Sep 2024 17:08:26 +0300 Subject: [PATCH] Add an actor model implementation example --- dune-project | 12 +- lib/picos_meta.hoot/dune | 4 + lib/picos_meta.hoot/hoot.ml | 168 ++++++++++++++++++++++++ lib/picos_meta.hoot/picos_meta_hoot.ml | 1 + lib/picos_meta.hoot/picos_meta_hoot.mli | 85 ++++++++++++ lib/picos_meta/index.mld | 14 +- picos_meta.opam | 4 +- 7 files changed, 274 insertions(+), 14 deletions(-) create mode 100644 lib/picos_meta.hoot/dune create mode 100644 lib/picos_meta.hoot/hoot.ml create mode 100644 lib/picos_meta.hoot/picos_meta_hoot.ml create mode 100644 lib/picos_meta.hoot/picos_meta_hoot.mli diff --git a/dune-project b/dune-project index 99cd43e1..197b2120 100644 --- a/dune-project +++ b/dune-project @@ -164,15 +164,15 @@ (>= 0.21.2)) (qcheck-stm (>= 0.3)) + (backoff + (>= 0.1.0)) + (multicore-magic + (>= 2.3.0)) ;; (alcotest (and (>= 1.7.0) :with-test)) - (backoff - (and - (>= 0.1.0) - :with-test)) (cohttp (and (>= 6.0.0~beta2) @@ -209,10 +209,6 @@ (and (>= 0.1.7) :with-test)) - (multicore-magic - (and - (>= 2.3.0) - :with-test)) (multicore-magic-dscheck (and (>= 2.3.0) diff --git a/lib/picos_meta.hoot/dune b/lib/picos_meta.hoot/dune new file mode 100644 index 00000000..ef97307d --- /dev/null +++ b/lib/picos_meta.hoot/dune @@ -0,0 +1,4 @@ +(library + (name picos_meta_hoot) + (public_name picos_meta.hoot) + (libraries backoff multicore-magic picos)) diff --git a/lib/picos_meta.hoot/hoot.ml b/lib/picos_meta.hoot/hoot.ml new file mode 100644 index 00000000..76d8a8c2 --- /dev/null +++ b/lib/picos_meta.hoot/hoot.ml @@ -0,0 +1,168 @@ +open Picos + +let[@inline never] impossible () = failwith "impossible" +let[@inline never] not_in_a_hoot () = invalid_arg "Not in a hoot" + +module Message = struct + type t = .. +end + +type _ tdt = + | Nil : [> `Nil ] tdt + | Message : { + message : Message.t; + next : [ `Nil | `Message ] tdt; + } + -> [> `Message ] tdt + | Wait : Trigger.t -> [> `Wait ] tdt + | Pid : { + computation : unit Computation.t; + terminated : unit Computation.t; + incoming : incoming Atomic.t; + mutable received : [ `Nil | `Message ] tdt; + } + -> [> `Pid ] tdt + +and incoming = In : [< `Nil | `Message | `Wait ] tdt -> incoming [@@unboxed] + +module Pid = struct + type t = [ `Pid ] tdt + + let key : [ `Nil | `Pid ] tdt Fiber.FLS.t = Fiber.FLS.create () +end + +let self_of fiber = + match Fiber.FLS.get_exn fiber Pid.key with + | Pid _ as t -> t + | Nil | (exception Fiber.FLS.Not_set) -> not_in_a_hoot () + +let self () : Pid.t = self_of @@ Fiber.current () + +exception Terminate + +let run main = + let fiber = Fiber.current () in + let before = Fiber.FLS.get fiber Pid.key ~default:Nil in + let computation = Computation.create ~mode:`LIFO () in + let inner = Computation.Packed computation in + let (Pid p as t) : Pid.t = + let terminated = Computation.create ~mode:`LIFO () + and incoming = Atomic.make (In Nil) |> Multicore_magic.copy_as_padded in + Pid { computation; terminated; incoming; received = Nil } + in + Fiber.FLS.set fiber Pid.key t; + let (Packed parent as outer) = Fiber.get_computation fiber in + let canceler = Computation.attach_canceler ~from:parent ~into:computation in + Fiber.set_computation fiber inner; + begin + match main () with + | () | (exception Terminate) -> Computation.finish p.terminated + | exception exn -> + let bt = Printexc.get_raw_backtrace () in + Computation.cancel p.terminated exn bt + end; + Computation.finish p.computation; + Fiber.set_computation fiber outer; + Computation.detach parent canceler; + Fiber.FLS.set fiber Pid.key before; + (* An otherwise unhandled exception except [Terminate] will be raised. *) + Computation.check p.terminated + +let wait (Pid p : Pid.t) = Computation.wait p.terminated + +let spawn main = + let (Pid p as t) : Pid.t = + let computation = Computation.create ~mode:`LIFO () + and terminated = Computation.create ~mode:`LIFO () + and incoming = Atomic.make (In Nil) in + Pid { computation; terminated; incoming; received = Nil } + in + let fiber = Fiber.create ~forbid:false p.computation in + Fiber.FLS.set fiber Pid.key t; + begin + Fiber.spawn fiber @@ fun fiber -> + let (Pid p) : Pid.t = self_of fiber in + (* An unhandled exception except [Terminate] will be treated as a fatal + error. *) + begin + match main () with + | () | (exception Terminate) -> Computation.finish p.terminated + end; + Computation.finish p.computation + end; + t + +let rec rev_to (Message _ as ms : [ `Message ] tdt) : + [ `Nil | `Message ] tdt -> _ = function + | Nil -> ms + | Message r -> rev_to (Message { message = r.message; next = ms }) r.next + +let rev (Message r : [ `Message ] tdt) = + rev_to (Message { message = r.message; next = Nil }) r.next + +let rec receive (Pid p as t : Pid.t) = + match Atomic.get p.incoming with + | In Nil as before -> + let trigger = Trigger.create () in + let after = In (Wait trigger) in + if Atomic.compare_and_set p.incoming before after then begin + match Trigger.await trigger with + | None -> () + | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt + end; + receive t + | _ -> begin + match Atomic.exchange p.incoming (In Nil) with + | In (Wait _ | Nil) -> impossible () + | In (Message _ as ms) -> + let (Message r : [ `Message ] tdt) = rev ms in + p.received <- r.next; + r.message + end + +let receive () = + let (Pid p as t) = self () in + match p.received with + | Message r -> + p.received <- r.next; + r.message + | Nil -> receive t + +let rec send (Pid p as t : Pid.t) message backoff = + match Atomic.get p.incoming with + | In ((Nil | Message _) as before) -> + let after = Message { message; next = before } in + if not (Atomic.compare_and_set p.incoming (In before) (In after)) then + send t message (Backoff.once backoff) + | In (Wait trigger as before) -> + let after = Message { message; next = Nil } in + if Atomic.compare_and_set p.incoming (In before) (In after) then + Trigger.signal trigger + else send t message (Backoff.once backoff) + +let[@inline] send t message = send t message Backoff.default + +type Message.t += Terminated of Pid.t + +let monitor ~at ~the:(Pid the_p as the : Pid.t) = + let[@alert "-handler"] trigger = + Trigger.from_action at the @@ fun _ at the -> send at (Terminated the) + in + if not (Computation.try_attach the_p.terminated trigger) then + send at (Terminated the) + +let empty_bt = Printexc.get_callstack 0 + +let link (Pid p1 as t1 : Pid.t) (Pid p2 as t2 : Pid.t) = + let[@alert "-handler"] trigger = + Trigger.from_action t1 t2 @@ fun _ (Pid p1 : Pid.t) (Pid p2 : Pid.t) -> + Computation.cancel p1.computation Terminate empty_bt; + Computation.cancel p2.computation Terminate empty_bt + in + if + (not (Computation.try_attach p1.terminated trigger)) + || not (Computation.try_attach p2.terminated trigger) + then begin + Computation.cancel p1.computation Terminate empty_bt; + Computation.cancel p2.computation Terminate empty_bt + end diff --git a/lib/picos_meta.hoot/picos_meta_hoot.ml b/lib/picos_meta.hoot/picos_meta_hoot.ml new file mode 100644 index 00000000..370a2ed7 --- /dev/null +++ b/lib/picos_meta.hoot/picos_meta_hoot.ml @@ -0,0 +1 @@ +module Hoot = Hoot diff --git a/lib/picos_meta.hoot/picos_meta_hoot.mli b/lib/picos_meta.hoot/picos_meta_hoot.mli new file mode 100644 index 00000000..42955420 --- /dev/null +++ b/lib/picos_meta.hoot/picos_meta_hoot.mli @@ -0,0 +1,85 @@ +(** An actor model example loosely inspired by + {{:https://github.com/riot-ml/riot} Riot}. *) + +module Hoot : sig + (** {{:https://www.merriam-webster.com/thesaurus/riot} Hoot} is an actor model + core loosely inspired by {{:https://github.com/riot-ml/riot} Riot}. *) + + exception Terminate + (** Exception used by {!link} to terminate actor processes. + + An unhandled [Terminate] exception is not treated as an error. *) + + val run : (unit -> unit) -> unit + (** [run main] establishes a new actor process on the current fiber and runs + [main] as the process. + + ⚠️ An unhandled exception, aside from {!Terminate}, raised from [main ()] + will be raised from [run main]. It is not treated as a fatal error. + + This can be called from any fiber, even from another actor process, which + would then effectively get suspended while running [main], but typically + this is used to start a "scope" for running actors. *) + + module Pid : sig + (** Actor process or process identifier. *) + + type t + end + + val spawn : (unit -> unit) -> Pid.t + (** [spawn main] creates a new actor process to run [main]. + + ⚠️ An unhandled exception, aside from {!Terminate}, raised from [main ()] + will be treated as a fatal error and will either exit the entire program + or stop the scheduler without completing other fibers. + + This can be called from any fiber, even from fibers that are not actor + processes, but typically this would be used within a "scope" for running + actors. *) + + val self : unit -> Pid.t + (** [self ()], when called within an actor process, returns the {{!Pid.t} + process identifier} of the actor process. + + @raise Invalid_argument when called outside of an actor process. *) + + val wait : Pid.t -> unit + (** [wait pid] blocks until the specified actor process has terminated. *) + + module Message : sig + (** Extensible message type. *) + + type t = .. + end + + val receive : unit -> Message.t + (** [receive ()] waits until at least one message has been added to the + mailbox of the current process and then removes and returns the least + recently added message from the mailbox. *) + + val send : Pid.t -> Message.t -> unit + (** [send pid message] adds the given [message] to the mailbox of the process + [pid]. + + ℹ️ Sending a message to a process that has already terminated is not + considered an error. *) + + type Message.t += + | Terminated of Pid.t + (** Sent by the {!monitor} mechanism to notify of process + termination. *) + + val monitor : at:Pid.t -> the:Pid.t -> unit + (** [monitor ~at:observer ~the:subject] makes it so that when the [subject] + process terminates the message {{!Terminated} [Terminated subject]} is + {{!send} sent} to the [observer] process. *) + + val link : Pid.t -> Pid.t -> unit + (** [link pid1 pid2] makes it so that when either one of the given processes + terminates the other process will also be terminated with the [Terminate] + exception. + + In case either one of the given processes is already terminated when + [link] is called, the other process will then be terminated. *) +end diff --git a/lib/picos_meta/index.mld b/lib/picos_meta/index.mld index 26031492..b0f2178a 100644 --- a/lib/picos_meta/index.mld +++ b/lib/picos_meta/index.mld @@ -1,5 +1,11 @@ -{0 Integration tests for Picos packages} +{0 Integration tests and additional examples for Picos packages} -This package contains integration (and other kinds of) tests for the core -{!Picos} interface and other Picos libraries. A separate package is used to -allow the dependencies of and between different Picos packages to be simplified. +This package contains additional examples and integration (and other kinds of) +tests for the core {!Picos} interface and other Picos libraries. A separate +package is used to allow the dependencies of and between different Picos +packages to be simplified. + +{1 Libraries} + +{!modules: + Picos_meta_hoot} diff --git a/picos_meta.opam b/picos_meta.opam index b90d47f4..0727fd41 100644 --- a/picos_meta.opam +++ b/picos_meta.opam @@ -20,8 +20,9 @@ depends: [ "lwt" {>= "5.7.0"} "qcheck-core" {>= "0.21.2"} "qcheck-stm" {>= "0.3"} + "backoff" {>= "0.1.0"} + "multicore-magic" {>= "2.3.0"} "alcotest" {>= "1.7.0" & with-test} - "backoff" {>= "0.1.0" & with-test} "cohttp" {>= "6.0.0~beta2" & with-test} "cohttp-lwt-unix" {>= "6.0.0~beta2" & os != "win32" & with-test} "conduit-lwt-unix" {>= "6.2.2" & os != "win32" & with-test} @@ -31,7 +32,6 @@ depends: [ "js_of_ocaml" {>= "5.4.0" & with-test} "mdx" {>= "2.4.0" & with-test} "multicore-bench" {>= "0.1.7" & with-test} - "multicore-magic" {>= "2.3.0" & with-test} "multicore-magic-dscheck" {>= "2.3.0" & with-test} "ocaml-version" {>= "3.6.4" & with-test} "cohttp-lwt" {>= "6.0.0~beta2" & with-test}