Skip to content

Commit

Permalink
Support actor discriminator
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia committed Dec 1, 2024
1 parent 40d8443 commit 047453b
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 14 deletions.
12 changes: 9 additions & 3 deletions src/actors/proxy.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
// deno-lint-ignore-file no-explicit-any
import { type ActorProxy, create, createHttpInvoker } from "./proxyutil.ts";
import {
type ActorProxy,
create,
createHttpInvoker,
type ProxyFactory,
} from "./proxyutil.ts";
import type { Actor, ActorConstructor } from "./runtime.ts";
export type { ActorProxy };
export interface ActorsServer {
Expand All @@ -20,8 +25,9 @@ export const actors = {
proxy: <TInstance extends Actor>(
actor: ActorConstructor<TInstance> | string,
options?: ActorsOptions | undefined,
): { id: (id: string) => ActorProxy<TInstance> } => {
const factory = (id: string) => createHttpInvoker(id, options);
): { id: ProxyFactory<TInstance> } => {
const factory = (id: string, discriminator?: string) =>
createHttpInvoker(id, discriminator, options);
return create(actor, factory);
},
};
29 changes: 23 additions & 6 deletions src/actors/proxyutil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ import { retry } from "./util/retry.ts";
export const ACTOR_ID_HEADER_NAME = "x-deno-isolate-instance-id";
export const ACTOR_ID_QS_NAME = "deno_isolate_instance_id";
export const ACTOR_CONSTRUCTOR_NAME_HEADER = "x-error-constructor-name";
export const ACTOR_DISCRIMINATOR_HEADER_NAME = "x-actor-discriminator";
export const ACTOR_DISCRIMINATOR_QS_NAME = "actor_discriminator";

export type ProxyFactory<TInstance> = (
id: string,
discriminator?: string,
) => ActorProxy<TInstance>;
/**
* Promise.prototype.then onfufilled callback type.
*/
Expand Down Expand Up @@ -325,6 +332,7 @@ export const createHttpInvoker = <
TChannel extends DuplexChannel<unknown, unknown>,
>(
actorId: string,
discriminator?: string,
options?: ActorsOptions,
): ActorInvoker<TResponse, TChannel> => {
const server = options?.server;
Expand All @@ -345,7 +353,11 @@ export const createHttpInvoker = <
}),
),
)
}&${ACTOR_ID_QS_NAME}=${actorId}`);
}&${ACTOR_ID_QS_NAME}=${actorId}${
discriminator
? `&${ACTOR_DISCRIMINATOR_QS_NAME}=${discriminator}`
: ""
}`);
url.protocol = url.protocol === "http:" ? "ws:" : "wss:";
const ws = new WebSocket(
url,
Expand All @@ -362,6 +374,9 @@ export const createHttpInvoker = <
headers: {
"Content-Type": "application/json",
[options?.actorIdHeaderName ?? ACTOR_ID_HEADER_NAME]: actorId,
...discriminator
? { [ACTOR_DISCRIMINATOR_HEADER_NAME]: discriminator }
: {},
...actorsServer.deploymentId
? { ["x-deno-deployment-id"]: actorsServer.deploymentId }
: {},
Expand Down Expand Up @@ -425,22 +440,23 @@ export const createHttpInvoker = <
export const WELL_KNOWN_RPC_MEHTOD = "_rpc";
export const create = <TInstance extends Actor>(
actor: ActorConstructor<TInstance> | string,
invokerFactory: (id: string) => ActorInvoker,
invokerFactory: (id: string, discriminator?: string) => ActorInvoker,
metadata?: unknown,
disposer?: () => void,
): { id: (id: string) => ActorProxy<TInstance> } => {
): { id: ProxyFactory<TInstance> } => {
const name = typeof actor === "string" ? actor : actor.name;
return {
id: (id: string): ActorProxy<TInstance> => {
id: (id: string, discriminator?: string): ActorProxy<TInstance> => {
return new Proxy<ActorProxy<TInstance>>({} as ActorProxy<TInstance>, {
get: (_, method) => {
if (method === "withMetadata") {
return (m: unknown) => create(actor, invokerFactory, m).id(id);
return (m: unknown) =>
create(actor, invokerFactory, m).id(id, discriminator);
}
if (method === Symbol.dispose && disposer) {
return disposer;
}
const invoker = invokerFactory(id);
const invoker = invokerFactory(id, discriminator);
if (method === "rpc") {
return () => {
const awaiter = new ActorAwaiter(
Expand All @@ -458,6 +474,7 @@ export const create = <TInstance extends Actor>(
() => awaiter.close(),
).id(
id,
discriminator,
);
};
}
Expand Down
2 changes: 1 addition & 1 deletion src/actors/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ Deno.test("counter tests", async () => {
const counterProxy = actors.proxy(Counter);

const counterActor = counterProxy.id(actorId);
using rpcActor = counterProxy.id("12345").rpc();
using rpcActor = counterProxy.id("12345", "1234").rpc();

const name = `Counter Actor`;
const ch = counterActor.chan(name);
Expand Down
29 changes: 25 additions & 4 deletions src/actors/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import process from "node:process";
import { ActorError } from "./errors.ts";
import {
ACTOR_CONSTRUCTOR_NAME_HEADER,
ACTOR_DISCRIMINATOR_HEADER_NAME,
ACTOR_DISCRIMINATOR_QS_NAME,
ACTOR_ID_HEADER_NAME,
ACTOR_ID_QS_NAME,
} from "./proxyutil.ts";
Expand All @@ -10,11 +12,11 @@ import type { ActorState } from "./state.ts";
import type { ActorStorage } from "./storage.ts";
import { DenoKvActorStorage } from "./storage/denoKv.ts";
import { S3ActorStorage } from "./storage/s3.ts";
import { serializeUint8Array } from "./util/buffers.ts";
import {
EVENT_STREAM_RESPONSE_HEADER,
isEventStreamResponse,
} from "./stream.ts";
import { serializeUint8Array } from "./util/buffers.ts";
import { isUpgrade, makeWebSocket } from "./util/channels/channel.ts";
import {
type ServerSentEventMessage,
Expand Down Expand Up @@ -88,13 +90,16 @@ export class ActorRuntime {
});
}

private getOrCreateSilo(actorId: string): ActorSilo {
let silo = this.silos.get(actorId);
private getOrCreateSilo(actorId: string, discriminator?: string): ActorSilo {
let silo = this.silos.get(
discriminator ? `${actorId}#${discriminator}` : actorId,
);
if (!silo) {
silo = new ActorSilo(
this.actorsConstructors,
actorId,
this.getActorStorage.bind(this),
discriminator,
);
this.silos.set(actorId, silo);
}
Expand All @@ -119,6 +124,19 @@ export class ActorRuntime {
return null;
}

actorDiscriminator(req: Request): string | null;
actorDiscriminator(url: URL, req: Request): string | null;
actorDiscriminator(reqOrUrl: URL | Request, req?: Request): string | null {
if (reqOrUrl instanceof Request) {
return this.actorDiscriminator(new URL(reqOrUrl.url), reqOrUrl);
}
if (reqOrUrl instanceof URL && req instanceof Request) {
return req.headers.get(ACTOR_DISCRIMINATOR_HEADER_NAME) ??
reqOrUrl.searchParams.get(ACTOR_DISCRIMINATOR_QS_NAME);
}
return null;
}

/**
* Handles an incoming request and invokes the corresponding actor method.
* @param req - The incoming request.
Expand All @@ -133,7 +151,10 @@ export class ActorRuntime {
});
}

const silo = this.getOrCreateSilo(actorId);
const silo = this.getOrCreateSilo(
actorId,
this.actorDiscriminator(url, req) || undefined,
);

const result = parseActorInvokeApi(url.pathname);
if (!result) {
Expand Down
2 changes: 2 additions & 0 deletions src/actors/silo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export class ActorSilo {
actorId: string,
actorName: string,
) => ActorStorage,
private discriminator?: string,
) {
this.invoker = {
invoke: this.invoke.bind(this),
Expand All @@ -47,6 +48,7 @@ export class ActorSilo {
const storage = this.getActorStorage(this.actorId, Actor.name);
const state = new ActorState({
id: this.actorId,
discriminator: this.discriminator,
storage,
proxy: (actor) => {
const invoker = (id: string) => {
Expand Down
3 changes: 3 additions & 0 deletions src/actors/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { ActorStorage } from "./storage.ts";

export interface ActorStateOptions {
id: string;
discriminator?: string;
storage: ActorStorage;
proxy: <TInstance extends Actor>(
actor: ActorConstructor<TInstance>,
Expand All @@ -14,6 +15,7 @@ export interface ActorStateOptions {
*/
export class ActorState {
public id: string;
public discriminator?: string;
public storage: ActorStorage;
public proxy: <TInstance extends Actor>(
actor: ActorConstructor<TInstance>,
Expand All @@ -23,6 +25,7 @@ export class ActorState {
this.storage = options.storage;
this.proxy = options.proxy;
this.id = options.id;
this.discriminator = options.discriminator;
}

blockConcurrencyWhile<T>(callback: () => Promise<T>): Promise<T> {
Expand Down

0 comments on commit 047453b

Please sign in to comment.