Skip to content

Commit

Permalink
feat: add @rivet-gg/actors
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanFlurry committed Nov 27, 2024
1 parent b63ae81 commit f5bba2e
Show file tree
Hide file tree
Showing 40 changed files with 1,044 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

279 changes: 279 additions & 0 deletions examples/actor-simple/actor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
// Connection methods:
// - HTTP
// - Req/Res
// - WebSocket
// - Req/Res
// - Bidirectional
//
// Prior art:
// - DO RPC
// - gRPC
// - PartyKit connections
// - socket.io
// - tRPC
//
// Stateful vs stateless:
// - Stateful is more common for realtime
// - PartyKit
// - socket.io
// - gRPC
// - Stateless can be added later
//
// Features:
// - State
// - Timers
// - Cron
// - Lifecycle (shutdown)
// - Metadata (tags)
// - Bidirectional streaming
// - Broadcasting?
// - Events? (for broadcasting)


interface State {
count: number;
}


class Counter extends Actor<State> {
onConnection(connection: ActorConnection) {

}

onRequest(request: ActorRequest): Response {

}
}

// ===

class Counter extends Actor<State> {
initialize(): State {
return { count: 0 };
}

clientStreamingRpc(stream: ClientStream<number>): number {
while (true) {
let x = await countStream.recv();
}
}

serverStreamingRpc(stream: ServerStream<number, number>) {
setInterval(() => {
stream.send(stream.body * 2);
}, 1000);
}

bidirectionalStreamingRpc(stream: Socket<number, number>) {
while (true) {
const x = await stream.recv();
stream.send(x * 2);
}
}

async increment(count: number): number {

}
}

// ===

class Counter extends Actor<State> {
initialize(): State {
return { count: 0 };
}

onConnection(conn: Connection) {
conn.on("increment", () => {
this.state.count += 1;
this.boradcast(this.state.count);
});

conn.on("close", () => {
this.broadcast("closed", )
});
}

increment() {

}

decrement() {

}
}

// ===

class Counter extends Actor<State> {
constructor() {
super();

this.on("increment", () => {
this.state.connections += 1;
this.state.count += 1;
this.broadcast(this.state);
});

this.on("close", () => {
this.state.connections -= 1;
this.broadcast(this.state);
});

}

initialize(): State {
return { count: 0 };
}

onConnection(conn: Connection) {
conn.on("increment", () => {
this.state.count += 1;
this.boradcast(this.state.count);
});

conn.on("close", () => {
this.broadcast("closed", )
});
}
}

// ===

class Counter extends Actor<State> {
initialize(): State {
return { count: 0 };
}

connect(conn: Socket<number, number>) {
this.state.connections += 1;

conn.on("incr", () => {
this.state.count += 1;
this.boradcast(this.state);
});
conn.on("close", () => {
this.state.connections -= 1;
this.boradcast(this.state);
});
}

getState() {

}

increment(): number {
this.state.count += 1;
}
}

// ===

class Counter extends Actor<State> {
initialize(): State {
return { count: 0 };
}

// Simple unary RPC
async increment(count: number): Promise<number> {
this.state.count += count;
return this.state.count;
}

// Client streaming - receives multiple numbers, returns final sum
async clientStreamingRpc(stream: ClientStream<number>): Promise<number> {
while (true) {
const value = await stream.next();
if (value === null) break;

this.state.count += value;
}
return this.state.count;
}

// Server streaming - multiplies input by 2 and streams result periodically
serverStreamingRpc(stream: ServerStream<number, number>) {
setInterval(() => {
stream.send(stream.body * 2);
}, 1000);
}

// Bidirectional streaming - multiplies each received number by 2 and sends it back
async bidirectionalStreamingRpc(stream: BidirectionalStream<number, number>) {
while (true) {
const value = await stream.read();
if (value === null) break;

stream.send(value * 2);
}
}
}

// ===

// Inspiration:
// - Simplicity of Socket.io
// - Rooms from Socket.io & PartyKit
// - Hybrid approach of gRPC
// - Simplicity of RPC of tRPC and Durable Objects

// TODO: Focus on how you'd do it in raw JS
// Is presence a thing?

class Counter extends Actor<State> {
initialize(): State {
return { count: 0 };
}

async increment(count: number): Promise<number> {
this.state.count += count;
return this.state.count;
}

async observe(socket: Socket<void, number>) {
socket.onMessage("foo", (x) => {
console.log(x);
this.network.broadcast("observe", 5);
socket.close();
});

this.on("stateUpdate", () => {
this.socket.send("state", this.state);
});

socket.onClose(() => {
console.log('close');
});
}
}

// ===

// TODO: Some sort of version management?

class Counter extends Actor<State> {
initialize(): State {
return { count: 0 };
}

async increment(count: number): Promise<number> {
this.state.count += count;
return this.state.count;
}

async observe(socket: Socket<void, number>) {
this.onStateUpdate((state) => {
this.socket.send("state", state);
});

socket.onMessage("foo", (x) => {
console.log(x);
this.network.broadcast("observe", 5);
socket.close();
});

socket.onClose(() => {
console.log('close');
});
}
}
39 changes: 39 additions & 0 deletions examples/actor-simple/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// import ActorClient from "@rivet-gg/actor-client";
//
// const client = new ActorClient(); // TODO:
//
// const actor = cilent.get("counter", { "hello": "world" }); // TODO: Very TBD on this part
//
// const newCount = await actor.rpc("increment", 1);
//
// const socket = actor.socket("observe");
// socket.send("foo", "var");
// socket.on("state", state => {
// console.log('New count', state);
// });
//
// // or a simpler one-liner rpc
// const actor = await cilent.get("counter", { "hello": "world" }).rpc("increment", 1);



// Fetch the actor address from environment variables
const actorAddr = Deno.env.get("ACTOR_ADDR");
if (!actorAddr) throw "Missing ACTOR_ADDR"
console.log("Actor Address:", actorAddr);

// Make an HTTP fetch request to the actor address
const response = await fetch(actorAddr + "/rpc/increment", {
method: "POST",
body: JSON.stringify({
args: [1]
}),
headers: {
"content-type": "application/json"
}
});
if (!response.ok) throw `${response.statusText}: ${await response.text()}`;
const data = await response.json();
console.log("Response from actor address:", data);


34 changes: 34 additions & 0 deletions examples/actor-simple/counter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import Actor from "../../sdks/actors/actors/src/mod.ts";

interface State {
count: number;
}

class Counter extends Actor<State> {
constructor() {
super();

// setInterval(() => {
// this.increment();
// console.log(this.state.count);
// this.forceSaveState();
// }, 1000);
}

public override initialize(): State {
return { count: 0 };
}

increment(): number {
this.state.count += 1;
return this.state.count;
}

observe(socket: unknown) {

}
}

// TODO:
new Counter().run();

12 changes: 12 additions & 0 deletions examples/actor-simple/deno.jsonc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"imports": {
"@core/asyncutil": "jsr:@core/asyncutil@^1.2.0",
"@std/async": "jsr:@std/async@^1.0.9",
"hono": "jsr:@hono/hono@^4.6.12",
"@rivet-gg/actors-core": "jsr:@rivet-gg/[email protected]",
"on-change": "npm:on-change@^5.0.1"
},
"fmt": {
"useTabs": true
}
}
Loading

0 comments on commit f5bba2e

Please sign in to comment.