Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions packages/core/src/client/worker-common.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import type { AnyWorkerDefinition, WorkerDefinition } from "@/worker/definition";
import type {
AnyWorkerDefinition,
WorkerDefinition,
} from "@/worker/definition";
import type * as protoHttpResolve from "@/worker/protocol/http/resolve";
import type { Encoding } from "@/worker/protocol/serde";
import type { WorkerQuery } from "@/manager/protocol/query";
import { logger } from "./log";
import * as errors from "./errors";
import { sendHttpRequest } from "./utils";
import { HEADER_WORKER_QUERY, HEADER_ENCODING } from "@/worker/router-endpoints";
import {
HEADER_WORKER_QUERY,
HEADER_ENCODING,
} from "@/worker/router-endpoints";

/**
* Action function returned by Worker connections and handles.
Expand All @@ -27,11 +33,10 @@ export type WorkerActionFunction<
* Maps action methods from worker definition to typed function signatures.
*/
export type WorkerDefinitionActions<AD extends AnyWorkerDefinition> =
AD extends WorkerDefinition<any, any, any, any, infer R>
AD extends WorkerDefinition<any, any, any, any, any, any, infer R>
? {
[K in keyof R]: R[K] extends (...args: infer Args) => infer Return
? WorkerActionFunction<Args, Return>
: never;
}
: never;

2 changes: 1 addition & 1 deletion packages/core/src/registry/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export type WorkerPeerConfig = z.infer<typeof WorkerPeerConfigSchema>;

export const WorkersSchema = z.record(
z.string(),
z.custom<WorkerDefinition<any, any, any, any, any>>(),
z.custom<WorkerDefinition<any, any, any, any, any, any, any>>(),
);
export type Workers = z.infer<typeof WorkersSchema>;

Expand Down
10 changes: 5 additions & 5 deletions packages/core/src/worker/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import { WorkerContext } from "./context";
*
* @typeParam A Worker this action belongs to
*/
export class ActionContext<S, CP, CS, V> {
#workerContext: WorkerContext<S, CP, CS, V>;
export class ActionContext<S, CP, CS, V, I, AD> {
#workerContext: WorkerContext<S, CP, CS, V, I, AD>;

/**
* Should not be called directly.
Expand All @@ -23,8 +23,8 @@ export class ActionContext<S, CP, CS, V> {
* @param conn - The connection associated with the action
*/
constructor(
workerContext: WorkerContext<S, CP, CS, V>,
public readonly conn: Conn<S, CP, CS, V>,
workerContext: WorkerContext<S, CP, CS, V, I, AD>,
public readonly conn: Conn<S, CP, CS, V, I, AD>,
) {
this.#workerContext = workerContext;
}
Expand Down Expand Up @@ -95,7 +95,7 @@ export class ActionContext<S, CP, CS, V> {
/**
* Gets the map of connections.
*/
get conns(): Map<ConnId, Conn<S, CP, CS, V>> {
get conns(): Map<ConnId, Conn<S, CP, CS, V, I, AD>> {
return this.#workerContext.conns;
}

Expand Down
114 changes: 71 additions & 43 deletions packages/core/src/worker/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ export const WorkerConfigSchema = z
},
);

export interface OnCreateOptions {
input?: unknown;
export interface OnCreateOptions<I> {
input?: I;
}

export interface CreateStateOptions {
input?: unknown;
export interface CreateStateOptions<I> {
input?: I;
}

export interface OnConnectOptions<CP> {
Expand All @@ -98,12 +98,12 @@ export interface OnConnectOptions<CP> {
// This must have only one or the other or else S will not be able to be inferred
//
// Data returned from this handler will be available on `c.state`.
type CreateState<S, CP, CS, V> =
type CreateState<S, CP, CS, V, I, AD> =
| { state: S }
| {
createState: (
c: WorkerContext<undefined, undefined, undefined, undefined>,
opts: CreateStateOptions,
c: WorkerContext<undefined, undefined, undefined, undefined, undefined, undefined>,
opts: CreateStateOptions<I>,
) => S | Promise<S>;
}
| Record<never, never>;
Expand All @@ -113,11 +113,11 @@ type CreateState<S, CP, CS, V> =
// This must have only one or the other or else S will not be able to be inferred
//
// Data returned from this handler will be available on `c.conn.state`.
type CreateConnState<S, CP, CS, V> =
type CreateConnState<S, CP, CS, V, I, AD> =
| { connState: CS }
| {
createConnState: (
c: WorkerContext<undefined, undefined, undefined, undefined>,
c: WorkerContext<undefined, undefined, undefined, undefined, undefined, undefined>,
opts: OnConnectOptions<CP>,
) => CS | Promise<CS>;
}
Expand All @@ -129,7 +129,7 @@ type CreateConnState<S, CP, CS, V> =
/**
* @experimental
*/
type CreateVars<S, CP, CS, V> =
type CreateVars<S, CP, CS, V, I, AD> =
| {
/**
* @experimental
Expand All @@ -141,20 +141,23 @@ type CreateVars<S, CP, CS, V> =
* @experimental
*/
createVars: (
c: WorkerContext<undefined, undefined, undefined, undefined>,
c: WorkerContext<undefined, undefined, undefined, undefined, undefined, undefined>,
driverCtx: unknown,
) => V | Promise<V>;
}
| Record<never, never>;

export interface Actions<S, CP, CS, V> {
[Action: string]: (c: ActionContext<S, CP, CS, V>, ...args: any[]) => any;
export interface Actions<S, CP, CS, V, I, AD> {
[Action: string]: (
c: ActionContext<S, CP, CS, V, I, AD>,
...args: any[]
) => any;
}

//export type WorkerConfig<S, CP, CS, V> = BaseWorkerConfig<S, CP, CS, V> &
// WorkerConfigLifecycle<S, CP, CS, V> &
// CreateState<S, CP, CS, V> &
// CreateConnState<S, CP, CS, V>;
//export type WorkerConfig<S, CP, CS, V, I, AD> = BaseWorkerConfig<S, CP, CS, V, I, AD> &
// WorkerConfigLifecycle<S, CP, CS, V, I, AD> &
// CreateState<S, CP, CS, V, I, AD> &
// CreateConnState<S, CP, CS, V, I, AD>;

/**
* @experimental
Expand All @@ -170,7 +173,15 @@ interface OnAuthOptions<CP> {
params: CP;
}

interface BaseWorkerConfig<S, CP, CS, V, R extends Actions<S, CP, CS, V>> {
interface BaseWorkerConfig<
S,
CP,
CS,
V,
I,
AD,
R extends Actions<S, CP, CS, V, I, AD>,
> {
/**
* Called on the HTTP server before clients can interact with the worker.
*
Expand All @@ -196,7 +207,7 @@ interface BaseWorkerConfig<S, CP, CS, V, R extends Actions<S, CP, CS, V>> {
* @returns Authentication data to attach to connections (must be serializable)
* @throws Throw an error to deny access to the worker
*/
onAuth?: (opts: OnAuthOptions<CP>) => unknown | Promise<unknown>;
onAuth?: (opts: OnAuthOptions<CP>) => AD | Promise<AD>;

/**
* Called when the worker is first initialized.
Expand All @@ -205,8 +216,8 @@ interface BaseWorkerConfig<S, CP, CS, V, R extends Actions<S, CP, CS, V>> {
* This is called before any other lifecycle hooks.
*/
onCreate?: (
c: WorkerContext<S, CP, CS, V>,
opts: OnCreateOptions,
c: WorkerContext<S, CP, CS, V, I, AD>,
opts: OnCreateOptions<I>,
) => void | Promise<void>;

/**
Expand All @@ -217,7 +228,7 @@ interface BaseWorkerConfig<S, CP, CS, V, R extends Actions<S, CP, CS, V>> {
*
* @returns Void or a Promise that resolves when startup is complete
*/
onStart?: (c: WorkerContext<S, CP, CS, V>) => void | Promise<void>;
onStart?: (c: WorkerContext<S, CP, CS, V, I, AD>) => void | Promise<void>;

/**
* Called when the worker's state changes.
Expand All @@ -227,7 +238,7 @@ interface BaseWorkerConfig<S, CP, CS, V, R extends Actions<S, CP, CS, V>> {
*
* @param newState The updated state
*/
onStateChange?: (c: WorkerContext<S, CP, CS, V>, newState: S) => void;
onStateChange?: (c: WorkerContext<S, CP, CS, V, I, AD>, newState: S) => void;

/**
* Called before a client connects to the worker.
Expand All @@ -250,7 +261,7 @@ interface BaseWorkerConfig<S, CP, CS, V, R extends Actions<S, CP, CS, V>> {
* @throws Throw an error to reject the connection
*/
onBeforeConnect?: (
c: WorkerContext<S, CP, CS, V>,
c: WorkerContext<S, CP, CS, V, I, AD>,
opts: OnConnectOptions<CP>,
) => void | Promise<void>;

Expand All @@ -264,8 +275,8 @@ interface BaseWorkerConfig<S, CP, CS, V, R extends Actions<S, CP, CS, V>> {
* @returns Void or a Promise that resolves when connection handling is complete
*/
onConnect?: (
c: WorkerContext<S, CP, CS, V>,
conn: Conn<S, CP, CS, V>,
c: WorkerContext<S, CP, CS, V, I, AD>,
conn: Conn<S, CP, CS, V, I, AD>,
) => void | Promise<void>;

/**
Expand All @@ -278,8 +289,8 @@ interface BaseWorkerConfig<S, CP, CS, V, R extends Actions<S, CP, CS, V>> {
* @returns Void or a Promise that resolves when disconnect handling is complete
*/
onDisconnect?: (
c: WorkerContext<S, CP, CS, V>,
conn: Conn<S, CP, CS, V>,
c: WorkerContext<S, CP, CS, V, I, AD>,
conn: Conn<S, CP, CS, V, I, AD>,
) => void | Promise<void>;

/**
Expand All @@ -295,7 +306,7 @@ interface BaseWorkerConfig<S, CP, CS, V, R extends Actions<S, CP, CS, V>> {
* @returns The modified output to send to the client
*/
onBeforeActionResponse?: <Out>(
c: WorkerContext<S, CP, CS, V>,
c: WorkerContext<S, CP, CS, V, I, AD>,
name: string,
args: unknown[],
output: Out,
Expand All @@ -307,7 +318,7 @@ interface BaseWorkerConfig<S, CP, CS, V, R extends Actions<S, CP, CS, V>> {
// 1. Infer schema
// 2. Omit keys that we'll manually define (because of generics)
// 3. Define our own types that have generic constraints
export type WorkerConfig<S, CP, CS, V> = Omit<
export type WorkerConfig<S, CP, CS, V, I, AD> = Omit<
z.infer<typeof WorkerConfigSchema>,
| "actions"
| "onAuth"
Expand All @@ -325,18 +336,20 @@ export type WorkerConfig<S, CP, CS, V> = Omit<
| "vars"
| "createVars"
> &
BaseWorkerConfig<S, CP, CS, V, Actions<S, CP, CS, V>> &
CreateState<S, CP, CS, V> &
CreateConnState<S, CP, CS, V> &
CreateVars<S, CP, CS, V>;
BaseWorkerConfig<S, CP, CS, V, I, AD, Actions<S, CP, CS, V, I, AD>> &
CreateState<S, CP, CS, V, I, AD> &
CreateConnState<S, CP, CS, V, I, AD> &
CreateVars<S, CP, CS, V, I, AD>;

// See description on `WorkerConfig`
export type WorkerConfigInput<
S,
CP,
CS,
V,
R extends Actions<S, CP, CS, V>,
I,
AD,
R extends Actions<S, CP, CS, V, I, AD>,
> = Omit<
z.input<typeof WorkerConfigSchema>,
| "actions"
Expand All @@ -355,16 +368,31 @@ export type WorkerConfigInput<
| "vars"
| "createVars"
> &
BaseWorkerConfig<S, CP, CS, V, R> &
CreateState<S, CP, CS, V> &
CreateConnState<S, CP, CS, V> &
CreateVars<S, CP, CS, V>;
BaseWorkerConfig<S, CP, CS, V, I, AD, R> &
CreateState<S, CP, CS, V, I, AD> &
CreateConnState<S, CP, CS, V, I, AD> &
CreateVars<S, CP, CS, V, I, AD>;

// For testing type definitions:
export function test<S, CP, CS, V, R extends Actions<S, CP, CS, V>>(
input: WorkerConfigInput<S, CP, CS, V, R>,
): WorkerConfig<S, CP, CS, V> {
const config = WorkerConfigSchema.parse(input) as WorkerConfig<S, CP, CS, V>;
export function test<
S,
CP,
CS,
V,
I,
AD,
R extends Actions<S, CP, CS, V, I, AD>,
>(
input: WorkerConfigInput<S, CP, CS, V, I, AD, R>,
): WorkerConfig<S, CP, CS, V, I, AD> {
const config = WorkerConfigSchema.parse(input) as WorkerConfig<
S,
CP,
CS,
V,
I,
AD
>;
return config;
}

Expand Down
15 changes: 10 additions & 5 deletions packages/core/src/worker/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export function generateConnToken(): string {

export type ConnId = string;

export type AnyConn = Conn<any, any, any, any>;
export type AnyConn = Conn<any, any, any, any, any, any>;

/**
* Represents a client connection to a worker.
Expand All @@ -26,13 +26,13 @@ export type AnyConn = Conn<any, any, any, any>;
*
* @see {@link https://rivet.gg/docs/connections|Connection Documentation}
*/
export class Conn<S, CP, CS, V> {
export class Conn<S, CP, CS, V, I, AD> {
subscriptions: Set<string> = new Set<string>();

#stateEnabled: boolean;

// TODO: Remove this cyclical reference
#worker: WorkerInstance<S, CP, CS, V>;
#worker: WorkerInstance<S, CP, CS, V, I, AD>;

/**
* The proxied state that notifies of changes automatically.
Expand Down Expand Up @@ -103,7 +103,7 @@ export class Conn<S, CP, CS, V> {
* @protected
*/
public constructor(
worker: WorkerInstance<S, CP, CS, V>,
worker: WorkerInstance<S, CP, CS, V, I, AD>,
persist: PersistedConn<CP, CS>,
driver: ConnDriver,
stateEnabled: boolean,
Expand Down Expand Up @@ -157,6 +157,11 @@ export class Conn<S, CP, CS, V> {
* @param reason - The reason for disconnection.
*/
public async disconnect(reason?: string) {
await this.#driver.disconnect(this.#worker, this, this.__persist.ds, reason);
await this.#driver.disconnect(
this.#worker,
this,
this.__persist.ds,
reason,
);
}
}
8 changes: 4 additions & 4 deletions packages/core/src/worker/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import { Schedule } from "./schedule";
/**
* WorkerContext class that provides access to worker methods and state
*/
export class WorkerContext<S, CP, CS, V> {
#worker: WorkerInstance<S, CP, CS, V>;
export class WorkerContext<S, CP, CS, V, I, AD> {
#worker: WorkerInstance<S, CP, CS, V, I, AD>;

constructor(worker: WorkerInstance<S, CP, CS, V>) {
constructor(worker: WorkerInstance<S, CP, CS, V, I, AD>) {
this.#worker = worker;
}

Expand Down Expand Up @@ -84,7 +84,7 @@ export class WorkerContext<S, CP, CS, V> {
/**
* Gets the map of connections.
*/
get conns(): Map<ConnId, Conn<S, CP, CS, V>> {
get conns(): Map<ConnId, Conn<S, CP, CS, V, I, AD>> {
return this.#worker.conns;
}

Expand Down
Loading
Loading