Skip to content

Commit

Permalink
Add an actor model implementation example
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Nov 18, 2024
1 parent f8ab869 commit e567fc0
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 14 deletions.
12 changes: 4 additions & 8 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,15 @@
(>= 0.21.2))
(qcheck-stm
(>= 0.3))
(backoff
(>= 0.1.0))
(multicore-magic
(>= 2.3.0))
;;
(alcotest
(and
(>= 1.7.0)
:with-test))
(backoff
(and
(>= 0.1.0)
:with-test))
(cohttp
(and
(>= 6.0.0~beta2)
Expand Down Expand Up @@ -209,10 +209,6 @@
(and
(>= 0.1.7)
:with-test))
(multicore-magic
(and
(>= 2.3.0)
:with-test))
(multicore-magic-dscheck
(and
(>= 2.3.0)
Expand Down
4 changes: 4 additions & 0 deletions lib/picos_meta.hoot/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(library
(name picos_meta_hoot)
(public_name picos_meta.hoot)
(libraries backoff multicore-magic picos))
168 changes: 168 additions & 0 deletions lib/picos_meta.hoot/hoot.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
open Picos

let[@inline never] impossible () = failwith "impossible"
let[@inline never] not_in_a_hoot () = invalid_arg "Not in a hoot"

module Message = struct
type t = ..
end

type _ tdt =
| Nil : [> `Nil ] tdt
| Message : {
message : Message.t;
next : [ `Nil | `Message ] tdt;
}
-> [> `Message ] tdt
| Wait : Trigger.t -> [> `Wait ] tdt
| Pid : {
computation : unit Computation.t;
terminated : unit Computation.t;
incoming : incoming Atomic.t;
mutable received : [ `Nil | `Message ] tdt;
}
-> [> `Pid ] tdt

and incoming = In : [< `Nil | `Message | `Wait ] tdt -> incoming [@@unboxed]

module Pid = struct
type t = [ `Pid ] tdt

let key : [ `Nil | `Pid ] tdt Fiber.FLS.t = Fiber.FLS.create ()
end

let self_of fiber =
match Fiber.FLS.get_exn fiber Pid.key with
| Pid _ as t -> t
| Nil | (exception Fiber.FLS.Not_set) -> not_in_a_hoot ()

let self () : Pid.t = self_of @@ Fiber.current ()

exception Terminate

let run main =
let fiber = Fiber.current () in
let before = Fiber.FLS.get fiber Pid.key ~default:Nil in
let computation = Computation.create ~mode:`LIFO () in
let inner = Computation.Packed computation in
let (Pid p as t) : Pid.t =
let terminated = Computation.create ~mode:`LIFO ()
and incoming = Atomic.make (In Nil) |> Multicore_magic.copy_as_padded in
Pid { computation; terminated; incoming; received = Nil }
in
Fiber.FLS.set fiber Pid.key t;
let (Packed parent as outer) = Fiber.get_computation fiber in
let canceler = Computation.attach_canceler ~from:parent ~into:computation in
Fiber.set_computation fiber inner;
begin
match main () with
| () | (exception Terminate) -> Computation.finish p.terminated
| exception exn ->
let bt = Printexc.get_raw_backtrace () in
Computation.cancel p.terminated exn bt
end;
Computation.finish p.computation;
Fiber.set_computation fiber outer;
Computation.detach parent canceler;
Fiber.FLS.set fiber Pid.key before;
(* An otherwise unhandled exception except [Terminate] will be raised. *)
Computation.check p.terminated

let wait (Pid p : Pid.t) = Computation.wait p.terminated

let spawn main =
let (Pid p as t) : Pid.t =
let computation = Computation.create ~mode:`LIFO ()
and terminated = Computation.create ~mode:`LIFO ()
and incoming = Atomic.make (In Nil) in
Pid { computation; terminated; incoming; received = Nil }
in
let fiber = Fiber.create ~forbid:false p.computation in
Fiber.FLS.set fiber Pid.key t;
begin
Fiber.spawn fiber @@ fun fiber ->
let (Pid p) : Pid.t = self_of fiber in
(* An unhandled exception except [Terminate] will be treated as a fatal
error. *)
begin
match main () with
| () | (exception Terminate) -> Computation.finish p.terminated
end;
Computation.finish p.computation
end;
t

let rec rev_to (Message _ as ms : [ `Message ] tdt) :
[ `Nil | `Message ] tdt -> _ = function
| Nil -> ms
| Message r -> rev_to (Message { message = r.message; next = ms }) r.next

let rev (Message r : [ `Message ] tdt) =
rev_to (Message { message = r.message; next = Nil }) r.next

let rec receive (Pid p as t : Pid.t) =
match Atomic.get p.incoming with
| In Nil as before ->
let trigger = Trigger.create () in
let after = In (Wait trigger) in
if Atomic.compare_and_set p.incoming before after then begin
match Trigger.await trigger with
| None -> ()
| Some (exn, bt) -> Printexc.raise_with_backtrace exn bt
end;
receive t
| _ -> begin
match Atomic.exchange p.incoming (In Nil) with
| In (Wait _ | Nil) -> impossible ()
| In (Message _ as ms) ->
let (Message r : [ `Message ] tdt) = rev ms in
p.received <- r.next;
r.message
end

let receive () =
let (Pid p as t) = self () in
match p.received with
| Message r ->
p.received <- r.next;
r.message
| Nil -> receive t

let rec send (Pid p as t : Pid.t) message backoff =
match Atomic.get p.incoming with
| In ((Nil | Message _) as before) ->
let after = Message { message; next = before } in
if not (Atomic.compare_and_set p.incoming (In before) (In after)) then
send t message (Backoff.once backoff)
| In (Wait trigger as before) ->
let after = Message { message; next = Nil } in
if Atomic.compare_and_set p.incoming (In before) (In after) then
Trigger.signal trigger
else send t message (Backoff.once backoff)

let[@inline] send t message = send t message Backoff.default

type Message.t += Terminated of Pid.t

let monitor ~at ~the:(Pid the_p as the : Pid.t) =
let[@alert "-handler"] trigger =
Trigger.from_action at the @@ fun _ at the -> send at (Terminated the)
in
if not (Computation.try_attach the_p.terminated trigger) then
send at (Terminated the)

let empty_bt = Printexc.get_callstack 0

let link (Pid p1 as t1 : Pid.t) (Pid p2 as t2 : Pid.t) =
let[@alert "-handler"] trigger =
Trigger.from_action t1 t2 @@ fun _ (Pid p1 : Pid.t) (Pid p2 : Pid.t) ->
Computation.cancel p1.computation Terminate empty_bt;
Computation.cancel p2.computation Terminate empty_bt
in
if
(not (Computation.try_attach p1.terminated trigger))
|| not (Computation.try_attach p2.terminated trigger)
then begin
Computation.cancel p1.computation Terminate empty_bt;
Computation.cancel p2.computation Terminate empty_bt
end
1 change: 1 addition & 0 deletions lib/picos_meta.hoot/picos_meta_hoot.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module Hoot = Hoot
85 changes: 85 additions & 0 deletions lib/picos_meta.hoot/picos_meta_hoot.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
(** An actor model example loosely inspired by
{{:https://github.com/riot-ml/riot} Riot}. *)

module Hoot : sig
(** {{:https://www.merriam-webster.com/thesaurus/riot} Hoot} is an actor model
core loosely inspired by {{:https://github.com/riot-ml/riot} Riot}. *)

exception Terminate
(** Exception used by {!link} to terminate actor processes.
An unhandled [Terminate] exception is not treated as an error. *)

val run : (unit -> unit) -> unit
(** [run main] establishes a new actor process on the current fiber and runs
[main] as the process.
⚠️ An unhandled exception, aside from {!Terminate}, raised from [main ()]
will be raised from [run main]. It is not treated as a fatal error.
This can be called from any fiber, even from another actor process, which
would then effectively get suspended while running [main], but typically
this is used to start a "scope" for running actors. *)

module Pid : sig
(** Actor process or process identifier. *)

type t
end

val spawn : (unit -> unit) -> Pid.t
(** [spawn main] creates a new actor process to run [main].
⚠️ An unhandled exception, aside from {!Terminate}, raised from [main ()]
will be treated as a fatal error and will either exit the entire program
or stop the scheduler without completing other fibers.
This can be called from any fiber, even from fibers that are not actor
processes, but typically this would be used within a "scope" for running
actors. *)

val self : unit -> Pid.t
(** [self ()], when called within an actor process, returns the {{!Pid.t}
process identifier} of the actor process.
@raise Invalid_argument when called outside of an actor process. *)

val wait : Pid.t -> unit
(** [wait pid] blocks until the specified actor process has terminated. *)

module Message : sig
(** Extensible message type. *)

type t = ..
end

val receive : unit -> Message.t
(** [receive ()] waits until at least one message has been added to the
mailbox of the current process and then removes and returns the least
recently added message from the mailbox. *)

val send : Pid.t -> Message.t -> unit
(** [send pid message] adds the given [message] to the mailbox of the process
[pid].
ℹ️ Sending a message to a process that has already terminated is not
considered an error. *)

type Message.t +=
| Terminated of Pid.t
(** Sent by the {!monitor} mechanism to notify of process
termination. *)

val monitor : at:Pid.t -> the:Pid.t -> unit
(** [monitor ~at:observer ~the:subject] makes it so that when the [subject]
process terminates the message {{!Terminated} [Terminated subject]} is
{{!send} sent} to the [observer] process. *)

val link : Pid.t -> Pid.t -> unit
(** [link pid1 pid2] makes it so that when either one of the given processes
terminates the other process will also be terminated with the [Terminate]
exception.
In case either one of the given processes is already terminated when
[link] is called, the other process will then be terminated. *)
end
14 changes: 10 additions & 4 deletions lib/picos_meta/index.mld
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
{0 Integration tests for Picos packages}
{0 Integration tests and additional examples for Picos packages}

This package contains integration (and other kinds of) tests for the core
{!Picos} interface and other Picos libraries. A separate package is used to
allow the dependencies of and between different Picos packages to be simplified.
This package contains additional examples and integration (and other kinds of)
tests for the core {!Picos} interface and other Picos libraries. A separate
package is used to allow the dependencies of and between different Picos
packages to be simplified.

{1 Libraries}

{!modules:
Picos_meta_hoot}
4 changes: 2 additions & 2 deletions picos_meta.opam
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ depends: [
"lwt" {>= "5.7.0"}
"qcheck-core" {>= "0.21.2"}
"qcheck-stm" {>= "0.3"}
"backoff" {>= "0.1.0"}
"multicore-magic" {>= "2.3.0"}
"alcotest" {>= "1.7.0" & with-test}
"backoff" {>= "0.1.0" & with-test}
"cohttp" {>= "6.0.0~beta2" & with-test}
"cohttp-lwt-unix" {>= "6.0.0~beta2" & os != "win32" & with-test}
"conduit-lwt-unix" {>= "6.2.2" & os != "win32" & with-test}
Expand All @@ -31,7 +32,6 @@ depends: [
"js_of_ocaml" {>= "5.4.0" & with-test}
"mdx" {>= "2.4.0" & with-test}
"multicore-bench" {>= "0.1.7" & with-test}
"multicore-magic" {>= "2.3.0" & with-test}
"multicore-magic-dscheck" {>= "2.3.0" & with-test}
"ocaml-version" {>= "3.6.4" & with-test}
"cohttp-lwt" {>= "6.0.0~beta2" & with-test}
Expand Down

0 comments on commit e567fc0

Please sign in to comment.