diff --git a/Cargo.lock b/Cargo.lock index 6c0a0faa24..59fe882922 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -204,6 +204,7 @@ dependencies = [ "hyper 0.14.31", "lazy_static", "region-get", + "region-recommend", "reqwest 0.11.27", "rivet-api", "rivet-cache", diff --git a/examples/actor-simple/actor.ts b/examples/actor-simple/actor.ts new file mode 100644 index 0000000000..9038e3cdcb --- /dev/null +++ b/examples/actor-simple/actor.ts @@ -0,0 +1,279 @@ +// Connection methods: +// - HTTP +// - Req/Res +// - WebSocket +// - Req/Res +// - Bidirectional +// +// Prior art: +// - DO RPC +// - gRPC +// - PartyKit connections +// - socket.io +// - tRPC +// +// Stateful vs stateless: +// - Stateful is more common for realtime +// - PartyKit +// - socket.io +// - gRPC +// - Stateless can be added later +// +// Features: +// - State +// - Timers +// - Cron +// - Lifecycle (shutdown) +// - Metadata (tags) +// - Bidirectional streaming +// - Broadcasting? +// - Events? (for broadcasting) + + +interface State { + count: number; +} + + +class Counter extends Actor { + onConnection(connection: ActorConnection) { + + } + + onRequest(request: ActorRequest): Response { + + } +} + +// === + +class Counter extends Actor { + initialize(): State { + return { count: 0 }; + } + + clientStreamingRpc(stream: ClientStream): number { + while (true) { + let x = await countStream.recv(); + } + } + + serverStreamingRpc(stream: ServerStream) { + setInterval(() => { + stream.send(stream.body * 2); + }, 1000); + } + + bidirectionalStreamingRpc(stream: Socket) { + while (true) { + const x = await stream.recv(); + stream.send(x * 2); + } + } + + async increment(count: number): number { + + } +} + +// === + +class Counter extends Actor { + initialize(): State { + return { count: 0 }; + } + + onConnection(conn: Connection) { + conn.on("increment", () => { + this.state.count += 1; + this.boradcast(this.state.count); + }); + + conn.on("close", () => { + this.broadcast("closed", ) + }); + } + + increment() { + + } + + decrement() { + + } +} + +// === + +class Counter extends Actor { + constructor() { + super(); + + this.on("increment", () => { + this.state.connections += 1; + this.state.count += 1; + this.broadcast(this.state); + }); + + this.on("close", () => { + this.state.connections -= 1; + this.broadcast(this.state); + }); + + } + + initialize(): State { + return { count: 0 }; + } + + onConnection(conn: Connection) { + conn.on("increment", () => { + this.state.count += 1; + this.boradcast(this.state.count); + }); + + conn.on("close", () => { + this.broadcast("closed", ) + }); + } +} + +// === + +class Counter extends Actor { + initialize(): State { + return { count: 0 }; + } + + connect(conn: Socket) { + this.state.connections += 1; + + conn.on("incr", () => { + this.state.count += 1; + this.boradcast(this.state); + }); + conn.on("close", () => { + this.state.connections -= 1; + this.boradcast(this.state); + }); + } + + getState() { + + } + + increment(): number { + this.state.count += 1; + } +} + +// === + +class Counter extends Actor { + initialize(): State { + return { count: 0 }; + } + + // Simple unary RPC + async increment(count: number): Promise { + this.state.count += count; + return this.state.count; + } + + // Client streaming - receives multiple numbers, returns final sum + async clientStreamingRpc(stream: ClientStream): Promise { + while (true) { + const value = await stream.next(); + if (value === null) break; + + this.state.count += value; + } + return this.state.count; + } + + // Server streaming - multiplies input by 2 and streams result periodically + serverStreamingRpc(stream: ServerStream) { + setInterval(() => { + stream.send(stream.body * 2); + }, 1000); + } + + // Bidirectional streaming - multiplies each received number by 2 and sends it back + async bidirectionalStreamingRpc(stream: BidirectionalStream) { + while (true) { + const value = await stream.read(); + if (value === null) break; + + stream.send(value * 2); + } + } +} + +// === + +// Inspiration: +// - Simplicity of Socket.io +// - Rooms from Socket.io & PartyKit +// - Hybrid approach of gRPC +// - Simplicity of RPC of tRPC and Durable Objects + +// TODO: Focus on how you'd do it in raw JS +// Is presence a thing? + +class Counter extends Actor { + initialize(): State { + return { count: 0 }; + } + + async increment(count: number): Promise { + this.state.count += count; + return this.state.count; + } + + async observe(socket: Socket) { + socket.onMessage("foo", (x) => { + console.log(x); + this.network.broadcast("observe", 5); + socket.close(); + }); + + this.on("stateUpdate", () => { + this.socket.send("state", this.state); + }); + + socket.onClose(() => { + console.log('close'); + }); + } +} + +// === + +// TODO: Some sort of version management? + +class Counter extends Actor { + initialize(): State { + return { count: 0 }; + } + + async increment(count: number): Promise { + this.state.count += count; + return this.state.count; + } + + async observe(socket: Socket) { + this.onStateUpdate((state) => { + this.socket.send("state", state); + }); + + socket.onMessage("foo", (x) => { + console.log(x); + this.network.broadcast("observe", 5); + socket.close(); + }); + + socket.onClose(() => { + console.log('close'); + }); + } +} diff --git a/examples/actor-simple/client.ts b/examples/actor-simple/client.ts new file mode 100644 index 0000000000..47e4552bbf --- /dev/null +++ b/examples/actor-simple/client.ts @@ -0,0 +1,39 @@ +// import ActorClient from "@rivet-gg/actor-client"; +// +// const client = new ActorClient(); // TODO: +// +// const actor = cilent.get("counter", { "hello": "world" }); // TODO: Very TBD on this part +// +// const newCount = await actor.rpc("increment", 1); +// +// const socket = actor.socket("observe"); +// socket.send("foo", "var"); +// socket.on("state", state => { +// console.log('New count', state); +// }); +// +// // or a simpler one-liner rpc +// const actor = await cilent.get("counter", { "hello": "world" }).rpc("increment", 1); + + + +// Fetch the actor address from environment variables +const actorAddr = Deno.env.get("ACTOR_ADDR"); +if (!actorAddr) throw "Missing ACTOR_ADDR" +console.log("Actor Address:", actorAddr); + +// Make an HTTP fetch request to the actor address +const response = await fetch(actorAddr + "/rpc/increment", { + method: "POST", + body: JSON.stringify({ + args: [1] + }), + headers: { + "content-type": "application/json" + } +}); +if (!response.ok) throw `${response.statusText}: ${await response.text()}`; +const data = await response.json(); +console.log("Response from actor address:", data); + + diff --git a/examples/actor-simple/counter.ts b/examples/actor-simple/counter.ts new file mode 100644 index 0000000000..1a7f94012e --- /dev/null +++ b/examples/actor-simple/counter.ts @@ -0,0 +1,34 @@ +import Actor from "../../sdks/actors/actors/src/mod.ts"; + +interface State { + count: number; +} + +class Counter extends Actor { + constructor() { + super(); + + // setInterval(() => { + // this.increment(); + // console.log(this.state.count); + // this.forceSaveState(); + // }, 1000); + } + + public override initialize(): State { + return { count: 0 }; + } + + increment(): number { + this.state.count += 1; + return this.state.count; + } + + observe(socket: unknown) { + + } +} + +// TODO: +new Counter().run(); + diff --git a/examples/actor-simple/deno.jsonc b/examples/actor-simple/deno.jsonc new file mode 100644 index 0000000000..6f0da35b00 --- /dev/null +++ b/examples/actor-simple/deno.jsonc @@ -0,0 +1,12 @@ +{ + "imports": { + "@core/asyncutil": "jsr:@core/asyncutil@^1.2.0", + "@std/async": "jsr:@std/async@^1.0.9", + "hono": "jsr:@hono/hono@^4.6.12", + "@rivet-gg/actors-core": "jsr:@rivet-gg/actors-core@0.0.1-rc.5", + "on-change": "npm:on-change@^5.0.1" + }, + "fmt": { + "useTabs": true + } +} diff --git a/examples/actor-simple/deno.lock b/examples/actor-simple/deno.lock new file mode 100644 index 0000000000..5d941276c7 --- /dev/null +++ b/examples/actor-simple/deno.lock @@ -0,0 +1,38 @@ +{ + "version": "4", + "specifiers": { + "jsr:@core/asyncutil@^1.2.0": "1.2.0", + "jsr:@hono/hono@^4.6.12": "4.6.12", + "jsr:@rivet-gg/actors-core@0.0.1-rc.5": "0.0.1-rc.5", + "jsr:@std/async@^1.0.9": "1.0.9", + "npm:on-change@^5.0.1": "5.0.1" + }, + "jsr": { + "@core/asyncutil@1.2.0": { + "integrity": "9967f15190c60df032c13f72ce5ac73d185c34f31c53dc918d8800025854c118" + }, + "@hono/hono@4.6.12": { + "integrity": "fa0b97fa7c3292f0d9957109ac07a475fe485868795b71b8e3114c284152cdb5" + }, + "@rivet-gg/actors-core@0.0.1-rc.5": { + "integrity": "466ca05f6acd1822e3b29794065e7b6e4e404333289a7c220967aa92b8b1821c" + }, + "@std/async@1.0.9": { + "integrity": "c6472fd0623b3f3daae023cdf7ca5535e1b721dfbf376562c0c12b3fb4867f91" + } + }, + "npm": { + "on-change@5.0.1": { + "integrity": "sha512-n7THCP7RkyReRSLkJb8kUWoNsxUIBxTkIp3JKno+sEz6o/9AJ3w3P9fzQkITEkMwyTKJjZciF3v/pVoouxZZMg==" + } + }, + "workspace": { + "dependencies": [ + "jsr:@core/asyncutil@^1.2.0", + "jsr:@hono/hono@^4.6.12", + "jsr:@rivet-gg/actors-core@0.0.1-rc.5", + "jsr:@std/async@^1.0.9", + "npm:on-change@^5.0.1" + ] + } +} diff --git a/examples/actor-simple/rivet.jsonc b/examples/actor-simple/rivet.jsonc new file mode 100644 index 0000000000..159164fc1f --- /dev/null +++ b/examples/actor-simple/rivet.jsonc @@ -0,0 +1,12 @@ +{ + "builds": [ + { + "tags": { "name": "counter" }, + "runtime": "javascript", + "script": "counter.ts", + "unstable": { + "minify": false + } + } + ] +} diff --git a/sdks/actors-core/.vscode/settings.json b/sdks/actors-core/.vscode/settings.json deleted file mode 100644 index b943dbc7a3..0000000000 --- a/sdks/actors-core/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "deno.enable": true -} \ No newline at end of file diff --git a/sdks/actors-bridge/.vscode/settings.json b/sdks/actors/.vscode/settings.json similarity index 100% rename from sdks/actors-bridge/.vscode/settings.json rename to sdks/actors/.vscode/settings.json diff --git a/sdks/actors/actors/deno.jsonc b/sdks/actors/actors/deno.jsonc new file mode 100644 index 0000000000..a5b507a5c8 --- /dev/null +++ b/sdks/actors/actors/deno.jsonc @@ -0,0 +1,13 @@ +{ + "imports": { + "@core/asyncutil": "jsr:@core/asyncutil@^1.2.0", + "@std/async": "jsr:@std/async@^1.0.9", + "hono": "jsr:@hono/hono@^4.6.12", + "@rivet-gg/actors-core": "jsr:@rivet-gg/actors-core@0.0.1-rc.5", + "on-change": "npm:on-change@^5.0.1" + }, + "exports": "./src/mod.ts", + "fmt": { + "useTabs": true + } +} diff --git a/sdks/actors/actors/src/actor.ts b/sdks/actors/actors/src/actor.ts new file mode 100644 index 0000000000..a8b39d9a23 --- /dev/null +++ b/sdks/actors/actors/src/actor.ts @@ -0,0 +1,348 @@ +import Rivet from "@rivet-gg/actors-core"; +import { deadline } from "@std/async/deadline"; +import { Context as HonoContext, Hono } from "hono"; +import { WSEvents } from "hono/ws"; +import { upgradeWebSocket } from "hono/deno"; +import { Lock } from "@core/asyncutil/lock"; +import { + ActorConfig, + DEFAULT_ACTOR_CONFIG, + mergeActorConfig, +} from "./config.ts"; +import onChange from "on-change"; +import { RpcRequest } from "./rpc_request.ts"; + +const KEYS = { + SCHEDULE: { + SCHEDULE: ["actor", "schedule", "schedule"], + EVENT_PREFIX: ["actor", "schedule", "event"], + event(id: string): string[] { + return [...this.EVENT_PREFIX, id]; + }, + }, + STATE: { + INITIALIZED: ["actor", "state", "initialized"], + DATA: ["actor", "state", "data"], + }, +}; + +interface RpcRequest { + args: any[]; +} + +// TODO: Instead of doing this, use a temp var for state and attempt to write +// it. Roll back state if fails to serialize. +function isJsonSerializable(value: unknown): boolean { + // Handle primitive types directly + if (value === null || value === undefined) return true; + if (typeof value === 'number') return Number.isFinite(value); + if (typeof value === 'boolean' || typeof value === 'string') return true; + + // Handle arrays + if (Array.isArray(value)) { + return value.every(isJsonSerializable); + } + + // Handle plain objects + if (typeof value === 'object') { + // Reject if it's not a plain object + if (Object.getPrototypeOf(value) !== Object.prototype) return false; + + // Check all values recursively + return Object.values(value).every(isJsonSerializable); + } + + return false; +} + +export abstract class Actor { + #stateChanged: boolean = false; + + /** The proxied state that notifies of changes automatically. */ + #stateProxy!: State; + + /** Gets the raw state without the proxy wrapper */ + get #stateRaw(): State { + return onChange.target(this.#stateProxy); + } + + #saveStateLock = new Lock(); + + #backgroundPromises: Promise[] = []; + #config: ActorConfig; + #ready: boolean = false; + + public constructor(config?: Partial) { + this.#config = mergeActorConfig(config); + } + + public abstract initialize(): State | Promise; + + public async run() { + await this.#initializeState(); + + this.#runServer(); + + console.log("ready"); + this.#ready = true; + } + + public get state(): State { + return this.#stateProxy; + } + + public set state(value: State) { + this.#setStateWithoutChange(value); + this.#stateChanged = true; + } + + /** Updates the state and creates a new proxy. */ + #setStateWithoutChange(value: State) { + if (!isJsonSerializable(value)) { + throw new Error("State must be JSON serializable"); + } + this.#stateProxy = this.#createStateProxy(value); + } + + #createStateProxy(target: State): State { + if (target === null || typeof target !== "object") { + if (!isJsonSerializable(target)) { + throw new Error("State value must be JSON serializable"); + } + return target; + } + + // Unsubscribe from old state + if (this.#stateProxy) { + onChange.unsubscribe(this.#stateProxy); + } + + // Listen for changes to the object in order to automatically write state + return onChange( + target, + // deno-lint-ignore no-explicit-any + (path: any, value: any, _previousValue: any, _applyData: any) => { + if (!isJsonSerializable(value)) { + throw new Error( + `State value at path "${path}" must be JSON serializable`, + ); + } + this.#stateChanged = true; + }, + { + ignoreDetached: true, + }, + ); + } + + async #initializeState() { + // Read initial state + const getStateBatch = await Rivet.kv.getBatch([ + KEYS.STATE.INITIALIZED, + KEYS.STATE.DATA, + ]); + const initialized = getStateBatch.get(KEYS.STATE.INITIALIZED); + const stateData = getStateBatch.get(KEYS.STATE.DATA); + + if (!initialized) { + // Initialize + console.log("initializing"); + let stateData = this.initialize(); + if (stateData instanceof Promise) stateData = await stateData; + + // Update state + console.log("writing initial state"); + await Rivet.kv.putBatch( + new Map([ + [KEYS.STATE.INITIALIZED, true], + [KEYS.STATE.DATA, stateData], + ]), + ); + this.#setStateWithoutChange(stateData); + } else { + // Save state + console.log("found existing state"); + this.#setStateWithoutChange(stateData); + } + } + + #runServer() { + const app = new Hono(); + + app.get("/", (c) => { + // TODO: Give the metadata about this actor (ie tags) + return c.text( + "This is a Rivet Actor\n\nLearn more at https://rivet.gg", + ); + }); + + app.post("/rpc/:name", this.#handleRpc.bind(this)); + + app.get( + "/socket/:name", + upgradeWebSocket(this.#handleWebSocket.bind(this)), + ); + + app.all("*", (c) => { + return c.text("Not Found", 404); + }); + + const port = this.#getServerPort(); + console.log(`server running on ${port}`); + Deno.serve({ port, hostname: "0.0.0.0" }, app.fetch); + } + + #getServerPort(): number { + const portStr = Deno.env.get("PORT_http"); + if (!portStr) { + throw "Missing port"; + } + const port = parseInt(portStr); + if (!isFinite(port)) { + throw "Invalid port"; + } + + return port; + } + + async #handleRpc(c: HonoContext): Promise { + try { + const rpcName = c.req.param("name"); + + // Prevent calling private or reserved methods + if (!this.#isValidRpc(rpcName)) { + throw new Error(`RPC ${rpcName} is not accessible`); + } + + // Check if the method exists on this object + // deno-lint-ignore no-explicit-any + const rpcFunction = (this as any)[rpcName]; + if (typeof rpcFunction !== "function") { + throw new Error(`RPC ${rpcName} not found`); + } + + // Read the JSON body as an RpcRequest + const requestBody = await c.req.json(); + + // Build request + const request = new RpcRequest(); + + // TODO: pass abortable to the rpc to decide when to abort + // TODO: Manually call abortable for better error handling + // Call the function on this object with those arguments + let result; + try { + const resultOrPromise = rpcFunction.call(this, request, ...requestBody.args); + if (resultOrPromise instanceof Promise) { + result = await deadline( + resultOrPromise, + this.#config.rpc.timeout, + ); + } else { + result = resultOrPromise; + } + } catch (error) { + if (error instanceof DOMException && error.name == "TimeoutError") { + throw new Error(`RPC ${rpcName} timed out`); + } else { + throw new Error(`RPC ${rpcName} failed: ${String(error)}`); + } + } + + // Serialize the response as JSON + return c.json({ result }); + } catch (error) { + console.error("RPC Error:", error); + return c.json({ error: String(error) }, 500); + } finally { + await this.forceSaveState(); + } + } + + #isValidRpc(rpcName: string): boolean { + // Prevent calling private methods + if (rpcName.startsWith("#")) return false; + + // Prevent accidental leaking of private methods, since this is a common + // convention + if (rpcName.startsWith("_")) return false; + + // Prevent calling protected methods + // TODO: Are there other RPC functions that should be private? i.e. internal JS runtime functions? Should we validate the fn is part of this prototype? + const reservedMethods = ["constructor", "initialize", "run"]; + if (reservedMethods.includes(rpcName)) return false; + + return true; + } + + #handleWebSocket(c: HonoContext): WSEvents { + const socketName = c.req.param("name"); + return { + onOpen(evt, ws) { + + }, + onMessage(evt, ws) { + }, + onClose(evt, ws) { + }, + onError(evt, ws) { + }, + }; + } + + /** + * Runs a promise in the background. + * + * This allows the actor runtime to ensure that a promise completes while + * returning from an RPC request early. + */ + protected runInBackground( + promise: Promise, + ) { + this.#assertReady(); + + // TODO: Should we force save the state? + // Add logging to promise and make it non-failable + const nonfailablePromise = promise + .then(() => console.log("background promise complete")) + .catch((err) => { + console.error("background promise failed", err); + // ctx.log.error( + // "background promise failed", + // ...errorToLogEntries("error", err), + // ) + }); + this.#backgroundPromises.push(nonfailablePromise); + } + + /** + * Forces the state to get saved. + * + * This is helpful if running a long task that may fail later or a background + * job that updates the state. + */ + public async forceSaveState() { + this.#assertReady(); + + // Use a lock in order to avoid race conditions with writing to KV + await this.#saveStateLock.lock(async () => { + if (this.#stateChanged) { + console.log("saving state"); + + // There might be more changes while we're writing, so we set + // this before writing to KV in order to avoid a race + // condition. + this.#stateChanged = false; + + // Write to KV + await Rivet.kv.put(KEYS.STATE.DATA, this.#stateRaw); + } else { + console.log("skipping save, state not modified"); + } + }) + } + + #assertReady() { + if (!this.#ready) throw new Error("Actor not ready"); + } +} diff --git a/sdks/actors/actors/src/config.ts b/sdks/actors/actors/src/config.ts new file mode 100644 index 0000000000..3052a08e7d --- /dev/null +++ b/sdks/actors/actors/src/config.ts @@ -0,0 +1,21 @@ +export interface ActorConfig { + rpc: RpcConfig; +} + +export interface RpcConfig { + timeout: number; +} + +export const DEFAULT_ACTOR_CONFIG: ActorConfig = { + rpc: { + timeout: 5000, + }, +}; + +export function mergeActorConfig(partialConfig?: Partial): ActorConfig { + return { + rpc: { + timeout: partialConfig?.rpc?.timeout ?? DEFAULT_ACTOR_CONFIG.rpc.timeout, + }, + }; +} diff --git a/sdks/actors/actors/src/kv.ts b/sdks/actors/actors/src/kv.ts new file mode 100644 index 0000000000..93a4c3246a --- /dev/null +++ b/sdks/actors/actors/src/kv.ts @@ -0,0 +1,31 @@ +// import { StorageDriver } from "../../driver.ts"; +// import { __GlobalDurableObjectT } from "./global_durable_object.ts"; + +// // TODO: Re-export KV API but with prefixes for all keys +// export class Storage { +// constructor(private readonly durableObject: __GlobalDurableObjectT) {} + +// async get(key: string): Promise { +// const jsonRaw = await this.durableObject.storage.get(buildStorageKey(key)); +// if (jsonRaw) { +// return await JSON.parse(jsonRaw); +// } else { +// return undefined; +// } +// } +// async put(key: string, value: V): Promise { +// await this.durableObject.storage.put(key, JSON.stringify(value)); +// } +// async delete(key: string): Promise { +// await this.durableObject.storage.delete(buildStorageKey(key)); +// } +// } + +// /** +// * Build a key from the actor's API that's namespaced to the storage. +// * +// * This allows us to store metadata on different keys. +// */ +// function buildStorageKey(key: string): string { +// return `storage:${key}`; +// } diff --git a/sdks/actors/actors/src/mod.ts b/sdks/actors/actors/src/mod.ts new file mode 100644 index 0000000000..6a2ee5809b --- /dev/null +++ b/sdks/actors/actors/src/mod.ts @@ -0,0 +1,2 @@ +import { Actor } from "./actor.ts"; +export default Actor; diff --git a/sdks/actors/actors/src/rpc_request.ts b/sdks/actors/actors/src/rpc_request.ts new file mode 100644 index 0000000000..68b257b966 --- /dev/null +++ b/sdks/actors/actors/src/rpc_request.ts @@ -0,0 +1,3 @@ +export class RpcRequest { + +} diff --git a/sdks/actors/actors/src/schedule.ts b/sdks/actors/actors/src/schedule.ts new file mode 100644 index 0000000000..c09711b4c4 --- /dev/null +++ b/sdks/actors/actors/src/schedule.ts @@ -0,0 +1,19 @@ +// export class Schedule { +// constructor() {} + +// after(duration: number, fn: string, request: unknown): void { +// this.durableObject.publicCtx.waitUntil(this.durableObject.scheduleEvent(Date.now() + duration, fn, request)); +// } +// at(timestamp: number, fn: string, request: unknown): void { +// this.durableObject.publicCtx.waitUntil(this.durableObject.scheduleEvent(timestamp, fn, request)); +// } + +// async __inspect(): Promise { +// const keys = await this.durableObject.storage.list({ prefix: "schedule:" }); +// const alarm = await this.durableObject.storage.getAlarm(); +// return { +// keys: Object.fromEntries(keys), +// alarm, +// }; +// } +// } diff --git a/sdks/actors/actors/src/socket.ts b/sdks/actors/actors/src/socket.ts new file mode 100644 index 0000000000..fe3f905758 --- /dev/null +++ b/sdks/actors/actors/src/socket.ts @@ -0,0 +1,90 @@ +// export class ActorSocket { +// private socket: WebSocket | null = null; +// private eventHandlers: Map = new Map(); +// private preflightHandler: (() => void) | null = null; +// private closeHandler: ((error?: Error) => void) | null = null; +// +// // TODO: +// // constructor(onAuthenticated: (events) => void) +// +// constructor(url: string) { +// this.socket = new WebSocket(url); +// this.setupSocketHandlers(); +// } +// +// private setupSocketHandlers() { +// if (!this.socket) return; +// +// this.socket.onopen = (event) => this.onopen(event); +// this.socket.onmessage = (event) => this.onmessage(event); +// this.socket.onclose = (event) => this.onclose(event); +// this.socket.onerror = (event) => this.onerror(event); +// } +// +// // Internal handlers +// private onopen(event: Event) { +// if (this.preflightHandler) { +// this.preflightHandler(); +// } +// } +// +// private onmessage(event: MessageEvent) { +// try { +// const { type, data } = JSON.parse(event.data); +// const handlers = this.eventHandlers.get(type); +// if (handlers) { +// handlers.forEach(handler => handler(data)); +// } +// } catch (error) { +// console.error('Failed to parse message:', error); +// } +// } +// +// private onclose(event: CloseEvent) { +// if (this.closeHandler) { +// this.closeHandler(); +// } +// } +// +// private onerror(event: Event) { +// if (this.closeHandler) { +// this.closeHandler(new Error('WebSocket error occurred')); +// } +// } +// +// // Public API +// public onPreflight(callback: () => void) { +// this.preflightHandler = callback; +// } +// +// public onClose(callback: (error?: Error) => void) { +// this.closeHandler = callback; +// } +// +// public on(messageName: string, callback: Function) { +// if (!this.eventHandlers.has(messageName)) { +// this.eventHandlers.set(messageName, []); +// } +// this.eventHandlers.get(messageName)?.push(callback); +// } +// +// public emit(messageName: string, ...args: any[]) { +// if (!this.socket || this.socket.readyState !== WebSocket.OPEN) { +// throw new Error('Socket is not connected'); +// } +// +// const message = JSON.stringify({ +// type: messageName, +// data: args.length === 1 ? args[0] : args +// }); +// +// this.socket.send(message); +// } +// +// public close() { +// if (this.socket) { +// this.socket.close(); +// this.socket = null; +// } +// } +// } diff --git a/sdks/actors-bridge/.gitignore b/sdks/actors/bridge/.gitignore similarity index 100% rename from sdks/actors-bridge/.gitignore rename to sdks/actors/bridge/.gitignore diff --git a/sdks/actors-bridge/README.md b/sdks/actors/bridge/README.md similarity index 100% rename from sdks/actors-bridge/README.md rename to sdks/actors/bridge/README.md diff --git a/sdks/actors/bridge/deno.jsonc b/sdks/actors/bridge/deno.jsonc new file mode 100644 index 0000000000..0967ef424b --- /dev/null +++ b/sdks/actors/bridge/deno.jsonc @@ -0,0 +1 @@ +{} diff --git a/sdks/actors-bridge/src/bridge/40_rivet_kv.ts b/sdks/actors/bridge/src/bridge/40_rivet_kv.ts similarity index 100% rename from sdks/actors-bridge/src/bridge/40_rivet_kv.ts rename to sdks/actors/bridge/src/bridge/40_rivet_kv.ts diff --git a/sdks/actors-bridge/src/bridge/90_rivet_ns.ts b/sdks/actors/bridge/src/bridge/90_rivet_ns.ts similarity index 100% rename from sdks/actors-bridge/src/bridge/90_rivet_ns.ts rename to sdks/actors/bridge/src/bridge/90_rivet_ns.ts diff --git a/sdks/actors-bridge/src/bridge/99_rivet_main.ts b/sdks/actors/bridge/src/bridge/99_rivet_main.ts similarity index 100% rename from sdks/actors-bridge/src/bridge/99_rivet_main.ts rename to sdks/actors/bridge/src/bridge/99_rivet_main.ts diff --git a/sdks/actors-bridge/src/ext/core.d.ts b/sdks/actors/bridge/src/ext/core.d.ts similarity index 100% rename from sdks/actors-bridge/src/ext/core.d.ts rename to sdks/actors/bridge/src/ext/core.d.ts diff --git a/sdks/actors-bridge/src/ext/ops.d.ts b/sdks/actors/bridge/src/ext/ops.d.ts similarity index 100% rename from sdks/actors-bridge/src/ext/ops.d.ts rename to sdks/actors/bridge/src/ext/ops.d.ts diff --git a/sdks/actors-bridge/src/ext/types.d.ts b/sdks/actors/bridge/src/ext/types.d.ts similarity index 100% rename from sdks/actors-bridge/src/ext/types.d.ts rename to sdks/actors/bridge/src/ext/types.d.ts diff --git a/sdks/actors-bridge/tsconfig.bridge.json b/sdks/actors/bridge/tsconfig.bridge.json similarity index 100% rename from sdks/actors-bridge/tsconfig.bridge.json rename to sdks/actors/bridge/tsconfig.bridge.json diff --git a/sdks/actors-bridge/tsconfig.json b/sdks/actors/bridge/tsconfig.json similarity index 100% rename from sdks/actors-bridge/tsconfig.json rename to sdks/actors/bridge/tsconfig.json diff --git a/sdks/actors-bridge/tsconfig.types.json b/sdks/actors/bridge/tsconfig.types.json similarity index 100% rename from sdks/actors-bridge/tsconfig.types.json rename to sdks/actors/bridge/tsconfig.types.json diff --git a/sdks/actors/client/deno.jsonc b/sdks/actors/client/deno.jsonc new file mode 100644 index 0000000000..0967ef424b --- /dev/null +++ b/sdks/actors/client/deno.jsonc @@ -0,0 +1 @@ +{} diff --git a/sdks/actors/client/src/index.ts b/sdks/actors/client/src/index.ts new file mode 100644 index 0000000000..80bc0afea6 --- /dev/null +++ b/sdks/actors/client/src/index.ts @@ -0,0 +1,44 @@ +import { RivetClient } from "@rivet-gg/api"; + +export class ActorClient { + constructor(client: RivetClient) { + + } + + get(actorName: string, actorTags: any): ActorHandle { + // TODO: + } + + getOrCreate(actorName: string, actorTags: any): ActorHandle { + // TODO: + } + + create(actorName: string, actorTags: any): ActorHandle { + // TODO: + } +} + +export class ActorHandle { + actorId?: string; + actorTags: any; + buildTags: any; + + rpc(name: string, ...args: any): any { + + } + + socket(name: string): Socket { + + } +} + +export class Socket { + send(name: string, ...args: any) { + + } + + on(name: string, ...args: any) { + + } +} + diff --git a/sdks/actors-core/README.md b/sdks/actors/core/README.md similarity index 100% rename from sdks/actors-core/README.md rename to sdks/actors/core/README.md diff --git a/sdks/actors/core/deno.jsonc b/sdks/actors/core/deno.jsonc new file mode 100644 index 0000000000..0967ef424b --- /dev/null +++ b/sdks/actors/core/deno.jsonc @@ -0,0 +1 @@ +{} diff --git a/sdks/actors-core/jsr.jsonc b/sdks/actors/core/jsr.jsonc similarity index 100% rename from sdks/actors-core/jsr.jsonc rename to sdks/actors/core/jsr.jsonc diff --git a/sdks/actors-core/src/index.ts b/sdks/actors/core/src/index.ts similarity index 100% rename from sdks/actors-core/src/index.ts rename to sdks/actors/core/src/index.ts diff --git a/sdks/actors-core/src/types b/sdks/actors/core/src/types similarity index 100% rename from sdks/actors-core/src/types rename to sdks/actors/core/src/types diff --git a/sdks/actors/deno.jsonc b/sdks/actors/deno.jsonc new file mode 100644 index 0000000000..2901182650 --- /dev/null +++ b/sdks/actors/deno.jsonc @@ -0,0 +1,12 @@ +{ + "workspace": [ + "./actors", + "./bridge", + "./client", + "./core", + "./protocol" + ], + "fmt": { + "useTabs": true + } +} diff --git a/sdks/actors/deno.lock b/sdks/actors/deno.lock new file mode 100644 index 0000000000..42a8af0219 --- /dev/null +++ b/sdks/actors/deno.lock @@ -0,0 +1,42 @@ +{ + "version": "4", + "specifiers": { + "jsr:@core/asyncutil@^1.2.0": "1.2.0", + "jsr:@hono/hono@^4.6.12": "4.6.12", + "jsr:@rivet-gg/actors-core@0.0.1-rc.5": "0.0.1-rc.5", + "jsr:@std/async@^1.0.9": "1.0.9", + "npm:on-change@^5.0.1": "5.0.1" + }, + "jsr": { + "@core/asyncutil@1.2.0": { + "integrity": "9967f15190c60df032c13f72ce5ac73d185c34f31c53dc918d8800025854c118" + }, + "@hono/hono@4.6.12": { + "integrity": "fa0b97fa7c3292f0d9957109ac07a475fe485868795b71b8e3114c284152cdb5" + }, + "@rivet-gg/actors-core@0.0.1-rc.5": { + "integrity": "466ca05f6acd1822e3b29794065e7b6e4e404333289a7c220967aa92b8b1821c" + }, + "@std/async@1.0.9": { + "integrity": "c6472fd0623b3f3daae023cdf7ca5535e1b721dfbf376562c0c12b3fb4867f91" + } + }, + "npm": { + "on-change@5.0.1": { + "integrity": "sha512-n7THCP7RkyReRSLkJb8kUWoNsxUIBxTkIp3JKno+sEz6o/9AJ3w3P9fzQkITEkMwyTKJjZciF3v/pVoouxZZMg==" + } + }, + "workspace": { + "members": { + "actors": { + "dependencies": [ + "jsr:@core/asyncutil@^1.2.0", + "jsr:@hono/hono@^4.6.12", + "jsr:@rivet-gg/actors-core@0.0.1-rc.5", + "jsr:@std/async@^1.0.9", + "npm:on-change@^5.0.1" + ] + } + } + } +} diff --git a/sdks/actors/protocol/deno.json b/sdks/actors/protocol/deno.json new file mode 100644 index 0000000000..0967ef424b --- /dev/null +++ b/sdks/actors/protocol/deno.json @@ -0,0 +1 @@ +{} diff --git a/sdks/actors/protocol/src/index.ts b/sdks/actors/protocol/src/index.ts new file mode 100644 index 0000000000..e69de29bb2