diff --git a/lib/async.ts b/lib/async.ts index b8dbdd4..fa0381a 100644 --- a/lib/async.ts +++ b/lib/async.ts @@ -5,6 +5,7 @@ import { ResonateOptions, Options, PartialOptions } from "./core/options"; import { DurablePromise } from "./core/promises/promises"; import * as retryPolicy from "./core/retry"; import * as schedules from "./core/schedules/schedules"; +import * as utils from "./core/utils"; import { ResonateBase } from "./resonate"; ///////////////////////////////////////////////////////////////////// @@ -41,10 +42,9 @@ export class Resonate extends ResonateBase { func: F, args: Params, opts: Options, - defaults: Options, durablePromise?: DurablePromise, ): ResonatePromise> { - return this.scheduler.add(name, id, func, args, opts, defaults, durablePromise); + return this.scheduler.add(name, id, func, args, opts, durablePromise); } /** @@ -210,22 +210,36 @@ export class Context { // the id is either: // 1. a provided string in the case of a deferred execution // 2. a generated string in the case of an ordinary execution - const id = typeof func === "string" ? func : `${parent.id}.${parent.counter}`; + const id = typeof func === "string" ? func : `${parent.id}.${parent.counter}.${func.name}`; // human readable name of the function const name = typeof func === "string" ? func : func.name; // opts are optional and can be provided as the last arg - const { args, opts } = this.invocation.split(argsWithOpts); + const { args, opts: givenOpts } = utils.split(argsWithOpts); + const registeredOptions = this.resonate.registeredOptions( + this.invocation.root.name, + this.invocation.root.opts.version, + ); + const resonateOptions = this.resonate.defaults(); + + const opts = { + ...resonateOptions, + ...registeredOptions, + ...givenOpts, + }; + + // Merge the tags + opts.tags = { ...resonateOptions.tags, ...registeredOptions.tags, ...givenOpts.tags }; - // default opts never change - const defaults = this.invocation.defaults; + // Default lock is false for children execution + opts.lock = opts.lock ?? false; // param is only required for deferred executions const param = typeof func === "string" ? args[0] : undefined; // create a new invocation - const invocation = new Invocation(name, id, undefined, param, opts, defaults, parent); + const invocation = new Invocation(name, id, undefined, param, opts, parent); let execution: Execution; if (typeof func === "string") { @@ -434,7 +448,6 @@ class Scheduler { func: F, args: Params, opts: Options, - defaults: Options, durablePromise?: DurablePromise, ): ResonatePromise> { // if the execution is already running, and not killed, @@ -453,7 +466,7 @@ class Scheduler { }; // create a new invocation - const invocation = new Invocation>(name, id, undefined, param, opts, defaults); + const invocation = new Invocation>(name, id, undefined, param, opts); // create a new execution const ctx = new Context(this.resonate, invocation); diff --git a/lib/core/execution.ts b/lib/core/execution.ts index 3389573..0858bc7 100644 --- a/lib/core/execution.ts +++ b/lib/core/execution.ts @@ -91,23 +91,21 @@ export class OrdinaryExecution extends Execution { } protected async fork() { - if (this.invocation.opts.durable) { + if (this.invocation.opts.durable && !this.durablePromise) { // if durable, create a durable promise try { - this.durablePromise = - this.durablePromise ?? - (await DurablePromise.create( - this.resonate.store.promises, - this.invocation.opts.encoder, - this.invocation.id, - this.invocation.timeout, - { - idempotencyKey: this.invocation.idempotencyKey, - headers: this.invocation.headers, - param: this.invocation.param, - tags: this.invocation.opts.tags, - }, - )); + this.durablePromise = await DurablePromise.create( + this.resonate.store.promises, + this.invocation.opts.encoder, + this.invocation.id, + this.invocation.timeout, + { + idempotencyKey: this.invocation.idempotencyKey, + headers: this.invocation.headers, + param: this.invocation.param, + tags: this.invocation.opts.tags, + }, + ); } catch (e) { // if an error occurs, kill the execution this.kill(e); @@ -265,22 +263,20 @@ export class GeneratorExecution extends Execution { async create() { try { - if (this.invocation.opts.durable) { + if (this.invocation.opts.durable && !this.durablePromise) { // create a durable promise - this.durablePromise = - this.durablePromise ?? - (await DurablePromise.create( - this.resonate.store.promises, - this.invocation.opts.encoder, - this.invocation.id, - this.invocation.timeout, - { - idempotencyKey: this.invocation.idempotencyKey, - headers: this.invocation.headers, - param: this.invocation.param, - tags: this.invocation.opts.tags, - }, - )); + this.durablePromise = await DurablePromise.create( + this.resonate.store.promises, + this.invocation.opts.encoder, + this.invocation.id, + this.invocation.timeout, + { + idempotencyKey: this.invocation.idempotencyKey, + headers: this.invocation.headers, + param: this.invocation.param, + tags: this.invocation.opts.tags, + }, + ); // resolve/reject the invocation if already completed if (this.durablePromise.resolved) { diff --git a/lib/core/invocation.ts b/lib/core/invocation.ts index f98b273..73c3acb 100644 --- a/lib/core/invocation.ts +++ b/lib/core/invocation.ts @@ -1,5 +1,5 @@ import { Future } from "./future"; -import { Options, PartialOptions, isOptions } from "./options"; +import { Options } from "./options"; import { RetryPolicy, exponential } from "./retry"; ///////////////////////////////////////////////////////////////////// @@ -44,7 +44,6 @@ export class Invocation { public readonly headers: Record | undefined, public readonly param: unknown, public readonly opts: Options, - public readonly defaults: Options, parent?: Invocation, ) { // create a future and hold on to its resolvers @@ -59,13 +58,12 @@ export class Invocation { // get the execution id from either: // - a hard coded string // - a function that returns a string given the invocation id - this.eid = typeof this.opts.eid === "function" ? this.opts.eid(this.id) : this.opts.eid; + this.eid = this.opts.eidFn(this.id); // get the idempotency key from either: // - a hard coded string // - a function that returns a string given the invocation id - this.idempotencyKey = - typeof this.opts.idempotencyKey === "function" ? this.opts.idempotencyKey(this.id) : this.opts.idempotencyKey; + this.idempotencyKey = this.opts.idempotencyKeyFn(this.id); // the timeout is the minimum of: // - the current time plus the user provided relative time @@ -89,29 +87,4 @@ export class Invocation { unblock() { this.blocked = null; } - - split(args: [...any, PartialOptions?]): { args: any[]; opts: Options } { - let opts = args[args.length - 1]; - - // merge opts - if (isOptions(opts)) { - args = args.slice(0, -1); - opts = { - ...this.defaults, - ...opts, - tags: { ...this.defaults.tags, ...opts.tags }, // tags are merged - }; - } else { - // copy defaults - opts = { ...this.defaults }; - } - - // lock is false by default - opts.lock = opts.lock ?? false; - - // version cannot be overridden - opts.version = this.defaults.version; - - return { args, opts }; - } } diff --git a/lib/core/options.ts b/lib/core/options.ts index d1689f9..9fc4426 100644 --- a/lib/core/options.ts +++ b/lib/core/options.ts @@ -83,9 +83,10 @@ export type Options = { durable: boolean; /** - * A unique id for this execution, defaults to a random id. + * A function that calculates the id for this execution + * defaults to a random funciton. */ - eid: string | ((id: string) => string); + eidFn: (id: string) => string; /** * Overrides the default encoder. @@ -93,9 +94,10 @@ export type Options = { encoder: IEncoder; /** - * Overrides the default idempotency key. + * Overrides the default funciton to calculate the idempotency key. + * defaults to a variation fnv-1a the hash funciton. */ - idempotencyKey: string | ((id: string) => string); + idempotencyKeyFn: (id: string) => string; /** * Acquire a lock for the execution. diff --git a/lib/core/promises/promises.ts b/lib/core/promises/promises.ts index 57dac45..ee23095 100644 --- a/lib/core/promises/promises.ts +++ b/lib/core/promises/promises.ts @@ -202,19 +202,16 @@ export class DurablePromise { timeout: number, opts: Partial = {}, ) { - return new DurablePromise( - store, - encoder, - await store.create( - id, - opts.idempotencyKey, - opts.strict ?? false, - opts.headers, - encoder.encode(opts.param), - timeout, - opts.tags, - ), + const storedPromise = await store.create( + id, + opts.idempotencyKey, + opts.strict ?? false, + opts.headers, + encoder.encode(opts.param), + timeout, + opts.tags, ); + return new DurablePromise(store, encoder, storedPromise); } /** diff --git a/lib/core/utils.ts b/lib/core/utils.ts index 1e1471a..ac4dc41 100644 --- a/lib/core/utils.ts +++ b/lib/core/utils.ts @@ -1,3 +1,5 @@ +import { Options, isOptions } from "./options"; + export function randomId(): string { return Math.floor(Math.random() * Number.MAX_SAFE_INTEGER).toString(16); } @@ -13,3 +15,10 @@ export function hash(s: string): string { const maxLength = 8; return "0".repeat(Math.max(0, maxLength - hashString.length)) + hashString; } + +export function split(argsWithOpts: any[]): { args: any[]; opts: Partial } { + const possibleOpts = argsWithOpts.at(-1); + return isOptions(possibleOpts) + ? { args: argsWithOpts.slice(0, -1), opts: possibleOpts } + : { args: argsWithOpts, opts: {} }; +} diff --git a/lib/generator.ts b/lib/generator.ts index 2270ce7..802436a 100644 --- a/lib/generator.ts +++ b/lib/generator.ts @@ -4,6 +4,7 @@ import { Invocation } from "./core/invocation"; import { ResonateOptions, Options, PartialOptions } from "./core/options"; import { DurablePromise } from "./core/promises/promises"; import * as schedules from "./core/schedules/schedules"; +import * as utils from "./core/utils"; import { ResonateBase } from "./resonate"; ///////////////////////////////////////////////////////////////////// @@ -78,10 +79,9 @@ export class Resonate extends ResonateBase { func: F, args: Params, opts: Options, - defaults: Options, durablePromise?: DurablePromise, ): ResonatePromise> { - return this.scheduler.add(name, id, func, args, opts, defaults, durablePromise); + return this.scheduler.add(name, id, func, args, opts, durablePromise); } /** @@ -195,7 +195,10 @@ export class Info { } export class Context { - constructor(private invocation: Invocation) {} + constructor( + private resonate: Resonate, + private invocation: Invocation, + ) {} /** * The running count of child function invocations. @@ -313,7 +316,9 @@ export class Context { } private _call(func: string | ((...args: any[]) => any), argsWithOpts: any[], yieldFuture: boolean): Call { - const { args, opts } = this.invocation.split(argsWithOpts); + const { args, opts: givenOpts } = utils.split(argsWithOpts); + + const opts = this.resonate.defaults(givenOpts); if (typeof func === "string") { return { kind: "call", value: { kind: "deferred", func, args, opts }, yieldFuture }; @@ -366,7 +371,6 @@ class Scheduler { func: F, args: Params, opts: Options, - defaults: Options, durablePromise?: DurablePromise, ): ResonatePromise> { // if the execution is already running, and not killed, @@ -383,10 +387,10 @@ class Scheduler { }; // create a new invocation - const invocation = new Invocation(name, id, undefined, param, opts, defaults); + const invocation = new Invocation(name, id, undefined, param, opts); // create a new execution - const generator = func(new Context(invocation), ...args); + const generator = func(new Context(this.resonate, invocation), ...args); const execution = new GeneratorExecution(this.resonate, invocation, generator, durablePromise); // once the durable promise has been created, @@ -504,14 +508,11 @@ class Scheduler { // human readable name of the function const name = value.kind === "deferred" ? value.func : value.func.name; - // default opts never change - const defaults = parent.defaults; - // param is only required for deferred executions const param = value.kind === "deferred" ? value.args[0] : undefined; // create a new invocation - const invocation = new Invocation(name, id, undefined, param, value.opts, defaults, parent); + const invocation = new Invocation(name, id, undefined, param, value.opts, parent); // add child and increment counter parent.addChild(invocation); @@ -521,7 +522,7 @@ class Scheduler { if (value.kind === "resonate") { // create a generator execution - const ctx = new Context(invocation); + const ctx = new Context(this.resonate, invocation); execution = new GeneratorExecution(this.resonate, invocation, value.func(ctx, ...value.args)); await execution.create(); diff --git a/lib/resonate.ts b/lib/resonate.ts index 26157cb..a76e4e5 100644 --- a/lib/resonate.ts +++ b/lib/resonate.ts @@ -3,7 +3,7 @@ import { JSONEncoder } from "./core/encoders/json"; import { ResonatePromise } from "./core/future"; import { ILogger } from "./core/logger"; import { Logger } from "./core/loggers/logger"; -import { ResonateOptions, Options, PartialOptions, isOptions } from "./core/options"; +import { ResonateOptions, Options, PartialOptions } from "./core/options"; import * as promises from "./core/promises/promises"; import * as retryPolicy from "./core/retry"; import * as schedules from "./core/schedules/schedules"; @@ -121,7 +121,6 @@ export abstract class ResonateBase { func: Func, args: any[], opts: Options, - defaults: Options, durablePromise?: promises.DurablePromise, ): ResonatePromise; @@ -170,18 +169,15 @@ export abstract class ResonateBase { * @returns A promise that resolve to the function return value. */ run(name: string, id: string, ...argsWithOpts: [...any, PartialOptions?]): ResonatePromise { - const { - args, - opts: { version }, - part: { durable, idempotencyKey, tags, timeout }, - } = this.split(argsWithOpts); + const { args, opts: givenOpts } = utils.split(argsWithOpts); + const { version, durable, idempotencyKeyFn, eidFn, tags, timeout } = givenOpts; - if (!this.functions[name] || !this.functions[name][version]) { + if (!this.functions[name] || !this.functions[name][version || 0]) { throw new Error(`Function ${name} version ${version} not registered`); } // the options registered with the function are the defaults - const { func, opts: defaults } = this.functions[name][version]; + const { func, opts: registeredOpts } = this.functions[name][version || 0]; // only the following options can be overridden, this information is persisted // in the durable promise and therefore not required on the recovery path @@ -191,14 +187,18 @@ export abstract class ResonateBase { override.durable = durable; } - if (idempotencyKey !== undefined) { - override.idempotencyKey = idempotencyKey; + if (idempotencyKeyFn !== undefined) { + override.idempotencyKeyFn = idempotencyKeyFn; + } + + if (eidFn !== undefined) { + override.eidFn = eidFn; } if (tags !== undefined) { - override.tags = { ...defaults.tags, ...tags, "resonate:invocation": "true" }; + override.tags = { ...registeredOpts.tags, ...tags, "resonate:invocation": "true" }; } else { - override.tags = { ...defaults.tags, "resonate:invocation": "true" }; + override.tags = { ...registeredOpts.tags, "resonate:invocation": "true" }; } if (timeout !== undefined) { @@ -207,14 +207,25 @@ export abstract class ResonateBase { // merge defaults with override to get opts const opts = { - ...defaults, + ...registeredOpts, ...override, }; // lock on top level is true by default opts.lock = opts.lock ?? true; - return this.execute(name, id, func, args, opts, defaults); + return this.execute(name, id, func, args, opts); + } + + // Gets the registered options for a specific function and version + // that has been previously registered. + registeredOptions(name: string, version: number): Options { + if (!this.functions[name] || !this.functions[name][version]) { + throw new Error(`Function ${name} version ${version} not registered`); + } + + const { opts } = this.functions[name][version]; + return opts; } schedule( @@ -223,7 +234,9 @@ export abstract class ResonateBase { func: Func | string, ...argsWithOpts: [...any, PartialOptions?] ): Promise { - const { args, opts } = this.split(argsWithOpts); + const { args, opts: givenOpts } = utils.split(argsWithOpts); + + const opts = this.defaults(givenOpts); if (typeof func === "function") { // if function is provided, the default version is 1 @@ -242,8 +255,7 @@ export abstract class ResonateBase { opts: { retry, version, timeout, tags: promiseTags }, } = this.functions[funcName][opts.version]; - const idempotencyKey = - typeof opts.idempotencyKey === "function" ? opts.idempotencyKey(funcName) : opts.idempotencyKey; + const idempotencyKey = opts.idempotencyKeyFn(funcName); const promiseParam = { func: funcName, @@ -288,11 +300,11 @@ export abstract class ResonateBase { clearInterval(this.interval); } - private defaults({ + public defaults({ durable = true, - eid = utils.randomId, + eidFn = utils.randomId, encoder = this.encoder, - idempotencyKey = utils.hash, + idempotencyKeyFn = utils.hash, lock = undefined, poll = this.poll, retry = this.retry, @@ -305,10 +317,10 @@ export abstract class ResonateBase { return { __resonate: true, - eid, + eidFn, durable, encoder, - idempotencyKey, + idempotencyKeyFn, lock, poll, retry, @@ -337,7 +349,7 @@ export abstract class ResonateBase { ) { const { func, opts } = this.functions[param.func][param.version]; opts.retry = param.retryPolicy; - this.execute(param.func, promise.id, func, param.args, opts, opts, promise); + this.execute(param.func, promise.id, func, param.args, opts, promise); } } } @@ -347,13 +359,6 @@ export abstract class ResonateBase { this.logger.error(e); } } - - private split(args: [...any, PartialOptions?]): { args: any[]; opts: Options; part: Partial } { - const part = args[args.length - 1]; - return isOptions(part) - ? { args: args.slice(0, -1), opts: this.defaults(part), part } - : { args, opts: this.defaults(), part: {} }; - } } export interface ResonatePromises { diff --git a/test/options.test.ts b/test/options.test.ts index ef23480..341db2f 100644 --- a/test/options.test.ts +++ b/test/options.test.ts @@ -43,9 +43,9 @@ describe("Options", () => { const overrides = { durable: false, - eid: "eid", + eidFn: () => "eid", encoder: new Base64Encoder(), - idempotencyKey: "idempotencyKey", + idempotencyKeyFn: (_: string) => "idempotencyKey", lock: false, poll: 2000, retry: retry.linear(), @@ -79,11 +79,12 @@ describe("Options", () => { resonate.options({ version: 1 }), ); + // Most options defaults are set when created a resonate instance for (const opts of [top, middle, bottom]) { expect(opts.durable).toBe(false); - expect(opts.eid).toBe(utils.randomId); + expect(opts.eidFn).toBe(utils.randomId); expect(opts.encoder).toBe(resonateOpts.encoder); - expect(opts.idempotencyKey).toBe(utils.hash); + expect(opts.idempotencyKeyFn).toBe(utils.hash); expect(opts.poll).toBe(resonateOpts.poll); expect(opts.retry).toBe(resonateOpts.retry); expect(opts.timeout).toBe(resonateOpts.timeout); @@ -104,9 +105,9 @@ describe("Options", () => { for (const opts of [top, middle, bottom]) { expect(opts.durable).toBe(overrides.durable); - expect(opts.eid).toBe(overrides.eid); + expect(opts.eidFn).toBe(overrides.eidFn); expect(opts.encoder).toBe(overrides.encoder); - expect(opts.idempotencyKey).toBe(overrides.idempotencyKey); + expect(opts.idempotencyKeyFn).toBe(overrides.idempotencyKeyFn); expect(opts.lock).toBe(overrides.lock); expect(opts.poll).toBe(overrides.poll); expect(opts.retry).toBe(overrides.retry); @@ -131,9 +132,9 @@ describe("Options", () => { // top level options expect(top.durable).toBe(false); - expect(top.eid).toBe(utils.randomId); + expect(top.eidFn).toBe(overrides.eidFn); expect(top.encoder).toBe(resonateOpts.encoder); - expect(top.idempotencyKey).toBe(overrides.idempotencyKey); + expect(top.idempotencyKeyFn).toBe(overrides.idempotencyKeyFn); expect(top.lock).toBe(true); expect(top.poll).toBe(resonateOpts.poll); expect(top.retry).toBe(resonateOpts.retry); @@ -144,9 +145,9 @@ describe("Options", () => { // bottom level options for (const opts of bottom) { expect(opts.durable).toBe(false); - expect(opts.eid).toBe(utils.randomId); + expect(opts.eidFn).toBe(utils.randomId); expect(opts.encoder).toBe(resonateOpts.encoder); - expect(opts.idempotencyKey).toBe(utils.hash); + expect(opts.idempotencyKeyFn).toBe(utils.hash); expect(opts.lock).toBe(false); expect(opts.poll).toBe(resonateOpts.poll); expect(opts.retry).toBe(resonateOpts.retry); @@ -166,28 +167,30 @@ describe("Options", () => { // middle options (overriden) expect(middle.durable).toBe(overrides.durable); - expect(middle.eid).toBe(overrides.eid); + expect(middle.eidFn).toBe(overrides.eidFn); expect(middle.encoder).toBe(overrides.encoder); - expect(middle.idempotencyKey).toBe(overrides.idempotencyKey); + expect(middle.idempotencyKeyFn).toBe(overrides.idempotencyKeyFn); expect(middle.lock).toBe(overrides.lock); expect(middle.poll).toBe(overrides.poll); expect(middle.retry).toBe(overrides.retry); expect(middle.tags).toEqual({ ...resonateOpts.tags, ...overrides.tags }); expect(middle.timeout).toBe(overrides.timeout); - expect(middle.version).toBe(1); // top and bottom options for (const opts of [top, bottom]) { expect(opts.durable).toBe(false); - expect(opts.eid).toBe(utils.randomId); + expect(opts.eidFn).toBe(utils.randomId); expect(opts.encoder).toBe(resonateOpts.encoder); - expect(opts.idempotencyKey).toBe(utils.hash); + expect(opts.idempotencyKeyFn).toBe(utils.hash); expect(opts.poll).toBe(resonateOpts.poll); expect(opts.retry).toBe(resonateOpts.retry); expect(opts.timeout).toBe(resonateOpts.timeout); - expect(opts.version).toBe(1); } + expect(top.version).toBe(1); + expect(middle.version).toBeDefined(); + expect(bottom.version).toBeDefined(); + expect(top.lock).toBe(true); expect(bottom.lock).toBe(false);