Skip to content

Commit

Permalink
Add incontext invocation
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia committed Oct 15, 2024
1 parent 5eea728 commit 8c27aad
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 116 deletions.
9 changes: 9 additions & 0 deletions src/actors/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export type ErrorCode =
| "NOT_FOUND"
| "METHOD_NOT_FOUND"
| "METHOD_NOT_INVOCABLE";
export class ActorError extends Error {
constructor(msg: string, public code: ErrorCode, options?: ErrorOptions) {
super(msg, options);
}
}
174 changes: 117 additions & 57 deletions src/actors/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,26 @@ export type Fulfilled<R, T> = ((result: R) => T | PromiseLike<T>) | null;
// deno-lint-ignore no-explicit-any
export type Rejected<E> = ((reason: any) => E | PromiseLike<E>) | null;

export interface ActorInvoker<
// deno-lint-ignore no-explicit-any
TResponse = any,
TChannel extends DuplexChannel<unknown, unknown> = DuplexChannel<
unknown,
unknown
>,
> {
invoke(
name: string,
method: string,
methodArgs: unknown[],
): Promise<TResponse>;

connect(
name: string,
method: string,
methodArgs: unknown[],
): Promise<TChannel>;
}
export class ActorAwaiter<
TResponse,
TChannel extends DuplexChannel<unknown, unknown>,
Expand All @@ -29,10 +49,10 @@ export class ActorAwaiter<
DuplexChannel<unknown, unknown> {
ch: Promise<TChannel> | null = null;
constructor(
protected fetcher: () => Promise<
TResponse
>,
protected ws: () => Promise<TChannel>,
protected actorName: string,
protected actorMethod: string,
protected methodArgs: unknown[],
protected invoker: ActorInvoker<TResponse, TChannel>,
) {
}
async close() {
Expand All @@ -46,7 +66,11 @@ export class ActorAwaiter<
}

private get channel(): Promise<TChannel> {
return this.ch ??= this.ws();
return this.ch ??= this.invoker.connect(
this.actorName,
this.actorMethod,
this.methodArgs,
);
}

async send(value: unknown): Promise<void> {
Expand All @@ -55,7 +79,12 @@ export class ActorAwaiter<
}

catch<TResult>(onrejected: Rejected<TResult>): Promise<TResponse | TResult> {
return this.fetcher().catch(onrejected);
return this.invoker.invoke(
this.actorName,
this.actorMethod,
this.methodArgs,
)
.catch(onrejected);
}

then<TResult1, TResult2 = TResult1>(
Expand All @@ -65,7 +94,11 @@ export class ActorAwaiter<
>,
onrejected?: Rejected<TResult2>,
): Promise<TResult1 | TResult2> {
return this.fetcher().then(onfufilled).catch(
return this.invoker.invoke(
this.actorName,
this.actorMethod,
this.methodArgs,
).then(onfufilled).catch(
onrejected,
);
}
Expand Down Expand Up @@ -122,6 +155,74 @@ const initServer = (): ActorsServer => {
};
};

const urlFor = (
serverUrl: string,
actor: ActorConstructor | string,
actorMethod: string,
): string => {
return `${serverUrl}/actors/${
typeof actor === "string" ? actor : actor.name
}/invoke/${actorMethod}`;
};

const createHttpInvoker = <
TResponse,
TChannel extends DuplexChannel<unknown, unknown>,
>(
actorId: string,
server: ActorsServer,
): ActorInvoker<TResponse, TChannel> => {
return {
connect: (name, method, methodArgs) => {
const endpoint = urlFor(server.url, name, method);
const ws = new WebSocket(
`${endpoint}?args=${
encodeURIComponent(
btoa(JSON.stringify({ args: methodArgs ?? [] })),
)
}&${ACTOR_ID_QS_NAME}=${actorId}`,
);
return makeWebSocket(ws) as Promise<TChannel>;
},
invoke: async (name, method, methodArgs) => {
const endpoint = urlFor(server.url, name, method);
const abortCtrl = new AbortController();
const resp = await fetch(
endpoint,
{
method: "POST",
signal: abortCtrl.signal,
headers: {
"Content-Type": "application/json",
[ACTOR_ID_HEADER_NAME]: actorId,
...server.deploymentId
? { ["x-deno-deployment-id"]: server.deploymentId }
: {},
},
body: JSON.stringify({
args: methodArgs ?? [],
}),
},
);
if (
resp.headers.get("content-type") ===
EVENT_STREAM_RESPONSE_HEADER
) {
const iterator = readFromStream(resp);
const retn = iterator.return;
iterator.return = function (val) {
abortCtrl.abort();
return retn?.call(iterator, val) ?? val;
};
return iterator;
}
if (resp.status === 204) {
return;
}
return resp.json();
},
};
};
/**
* utilities to create and manage actors.
*/
Expand All @@ -134,60 +235,19 @@ export const actors = {
_server ??= initServer();
}
const actorsServer = server ?? _server!;
const name = typeof actor === "string" ? actor : actor.name;
return {
id: (id: string): Promisify<TInstance> => {
return new Proxy<Promisify<TInstance>>({} as Promisify<TInstance>, {
get: (_, prop) => {
const endpoint = `${actorsServer.url}/actors/${
typeof actor === "string" ? actor : actor.name
}/invoke/${String(prop)}`;
const fetcher = async (...args: unknown[]) => {
const abortCtrl = new AbortController();
const resp = await fetch(
endpoint,
{
method: "POST",
signal: abortCtrl.signal,
headers: {
"Content-Type": "application/json",
[ACTOR_ID_HEADER_NAME]: id,
...actorsServer.deploymentId
? { ["x-deno-deployment-id"]: actorsServer.deploymentId }
: {},
},
body: JSON.stringify({
args: args ?? [],
}),
},
);
if (
resp.headers.get("content-type") ===
EVENT_STREAM_RESPONSE_HEADER
) {
const iterator = readFromStream(resp);
const retn = iterator.return;
iterator.return = function (val) {
abortCtrl.abort();
return retn?.call(iterator, val) ?? val;
};
return iterator;
}
if (resp.status === 204) {
return;
}
return resp.json();
};
get: (_, method) => {
const invoker = createHttpInvoker(id, actorsServer);
return (...args: unknown[]) => {
const awaiter = new ActorAwaiter(() => fetcher(...args), () => {
const ws = new WebSocket(
`${endpoint}?args=${
encodeURIComponent(
btoa(JSON.stringify({ args: args ?? [] })),
)
}&${ACTOR_ID_QS_NAME}=${id}`,
);
return makeWebSocket(ws);
});
const awaiter = new ActorAwaiter(
name,
String(method),
args,
invoker,
);
return awaiter;
};
},
Expand Down
Loading

0 comments on commit 8c27aad

Please sign in to comment.