From 7bf23efdc8a0c52762742fe4895be59768012361 Mon Sep 17 00:00:00 2001 From: Andres Villegas Date: Mon, 24 Jun 2024 11:43:03 -0700 Subject: [PATCH] Removes "default" options from the invocation (#125) Default options in the invocation were in reality the registered options from the top level function call. This commit changes the code to express that more clearly and removes the split functions that were a source of confusion. --- lib/async.ts | 31 +++++++++++----- lib/core/execution.ts | 56 ++++++++++++++--------------- lib/core/invocation.ts | 33 ++--------------- lib/core/options.ts | 10 +++--- lib/core/promises/promises.ts | 21 +++++------ lib/core/utils.ts | 9 +++++ lib/generator.ts | 25 ++++++------- lib/resonate.ts | 67 +++++++++++++++++++---------------- test/options.test.ts | 35 +++++++++--------- 9 files changed, 143 insertions(+), 144 deletions(-) diff --git a/lib/async.ts b/lib/async.ts index b8dbdd4..fa0381a 100644 --- a/lib/async.ts +++ b/lib/async.ts @@ -5,6 +5,7 @@ import { ResonateOptions, Options, PartialOptions } from "./core/options"; import { DurablePromise } from "./core/promises/promises"; import * as retryPolicy from "./core/retry"; import * as schedules from "./core/schedules/schedules"; +import * as utils from "./core/utils"; import { ResonateBase } from "./resonate"; ///////////////////////////////////////////////////////////////////// @@ -41,10 +42,9 @@ export class Resonate extends ResonateBase { func: F, args: Params, opts: Options, - defaults: Options, durablePromise?: DurablePromise, ): ResonatePromise> { - return this.scheduler.add(name, id, func, args, opts, defaults, durablePromise); + return this.scheduler.add(name, id, func, args, opts, durablePromise); } /** @@ -210,22 +210,36 @@ export class Context { // the id is either: // 1. a provided string in the case of a deferred execution // 2. a generated string in the case of an ordinary execution - const id = typeof func === "string" ? func : `${parent.id}.${parent.counter}`; + const id = typeof func === "string" ? func : `${parent.id}.${parent.counter}.${func.name}`; // human readable name of the function const name = typeof func === "string" ? func : func.name; // opts are optional and can be provided as the last arg - const { args, opts } = this.invocation.split(argsWithOpts); + const { args, opts: givenOpts } = utils.split(argsWithOpts); + const registeredOptions = this.resonate.registeredOptions( + this.invocation.root.name, + this.invocation.root.opts.version, + ); + const resonateOptions = this.resonate.defaults(); + + const opts = { + ...resonateOptions, + ...registeredOptions, + ...givenOpts, + }; + + // Merge the tags + opts.tags = { ...resonateOptions.tags, ...registeredOptions.tags, ...givenOpts.tags }; - // default opts never change - const defaults = this.invocation.defaults; + // Default lock is false for children execution + opts.lock = opts.lock ?? false; // param is only required for deferred executions const param = typeof func === "string" ? args[0] : undefined; // create a new invocation - const invocation = new Invocation(name, id, undefined, param, opts, defaults, parent); + const invocation = new Invocation(name, id, undefined, param, opts, parent); let execution: Execution; if (typeof func === "string") { @@ -434,7 +448,6 @@ class Scheduler { func: F, args: Params, opts: Options, - defaults: Options, durablePromise?: DurablePromise, ): ResonatePromise> { // if the execution is already running, and not killed, @@ -453,7 +466,7 @@ class Scheduler { }; // create a new invocation - const invocation = new Invocation>(name, id, undefined, param, opts, defaults); + const invocation = new Invocation>(name, id, undefined, param, opts); // create a new execution const ctx = new Context(this.resonate, invocation); diff --git a/lib/core/execution.ts b/lib/core/execution.ts index 3389573..0858bc7 100644 --- a/lib/core/execution.ts +++ b/lib/core/execution.ts @@ -91,23 +91,21 @@ export class OrdinaryExecution extends Execution { } protected async fork() { - if (this.invocation.opts.durable) { + if (this.invocation.opts.durable && !this.durablePromise) { // if durable, create a durable promise try { - this.durablePromise = - this.durablePromise ?? - (await DurablePromise.create( - this.resonate.store.promises, - this.invocation.opts.encoder, - this.invocation.id, - this.invocation.timeout, - { - idempotencyKey: this.invocation.idempotencyKey, - headers: this.invocation.headers, - param: this.invocation.param, - tags: this.invocation.opts.tags, - }, - )); + this.durablePromise = await DurablePromise.create( + this.resonate.store.promises, + this.invocation.opts.encoder, + this.invocation.id, + this.invocation.timeout, + { + idempotencyKey: this.invocation.idempotencyKey, + headers: this.invocation.headers, + param: this.invocation.param, + tags: this.invocation.opts.tags, + }, + ); } catch (e) { // if an error occurs, kill the execution this.kill(e); @@ -265,22 +263,20 @@ export class GeneratorExecution extends Execution { async create() { try { - if (this.invocation.opts.durable) { + if (this.invocation.opts.durable && !this.durablePromise) { // create a durable promise - this.durablePromise = - this.durablePromise ?? - (await DurablePromise.create( - this.resonate.store.promises, - this.invocation.opts.encoder, - this.invocation.id, - this.invocation.timeout, - { - idempotencyKey: this.invocation.idempotencyKey, - headers: this.invocation.headers, - param: this.invocation.param, - tags: this.invocation.opts.tags, - }, - )); + this.durablePromise = await DurablePromise.create( + this.resonate.store.promises, + this.invocation.opts.encoder, + this.invocation.id, + this.invocation.timeout, + { + idempotencyKey: this.invocation.idempotencyKey, + headers: this.invocation.headers, + param: this.invocation.param, + tags: this.invocation.opts.tags, + }, + ); // resolve/reject the invocation if already completed if (this.durablePromise.resolved) { diff --git a/lib/core/invocation.ts b/lib/core/invocation.ts index f98b273..73c3acb 100644 --- a/lib/core/invocation.ts +++ b/lib/core/invocation.ts @@ -1,5 +1,5 @@ import { Future } from "./future"; -import { Options, PartialOptions, isOptions } from "./options"; +import { Options } from "./options"; import { RetryPolicy, exponential } from "./retry"; ///////////////////////////////////////////////////////////////////// @@ -44,7 +44,6 @@ export class Invocation { public readonly headers: Record | undefined, public readonly param: unknown, public readonly opts: Options, - public readonly defaults: Options, parent?: Invocation, ) { // create a future and hold on to its resolvers @@ -59,13 +58,12 @@ export class Invocation { // get the execution id from either: // - a hard coded string // - a function that returns a string given the invocation id - this.eid = typeof this.opts.eid === "function" ? this.opts.eid(this.id) : this.opts.eid; + this.eid = this.opts.eidFn(this.id); // get the idempotency key from either: // - a hard coded string // - a function that returns a string given the invocation id - this.idempotencyKey = - typeof this.opts.idempotencyKey === "function" ? this.opts.idempotencyKey(this.id) : this.opts.idempotencyKey; + this.idempotencyKey = this.opts.idempotencyKeyFn(this.id); // the timeout is the minimum of: // - the current time plus the user provided relative time @@ -89,29 +87,4 @@ export class Invocation { unblock() { this.blocked = null; } - - split(args: [...any, PartialOptions?]): { args: any[]; opts: Options } { - let opts = args[args.length - 1]; - - // merge opts - if (isOptions(opts)) { - args = args.slice(0, -1); - opts = { - ...this.defaults, - ...opts, - tags: { ...this.defaults.tags, ...opts.tags }, // tags are merged - }; - } else { - // copy defaults - opts = { ...this.defaults }; - } - - // lock is false by default - opts.lock = opts.lock ?? false; - - // version cannot be overridden - opts.version = this.defaults.version; - - return { args, opts }; - } } diff --git a/lib/core/options.ts b/lib/core/options.ts index d1689f9..9fc4426 100644 --- a/lib/core/options.ts +++ b/lib/core/options.ts @@ -83,9 +83,10 @@ export type Options = { durable: boolean; /** - * A unique id for this execution, defaults to a random id. + * A function that calculates the id for this execution + * defaults to a random funciton. */ - eid: string | ((id: string) => string); + eidFn: (id: string) => string; /** * Overrides the default encoder. @@ -93,9 +94,10 @@ export type Options = { encoder: IEncoder; /** - * Overrides the default idempotency key. + * Overrides the default funciton to calculate the idempotency key. + * defaults to a variation fnv-1a the hash funciton. */ - idempotencyKey: string | ((id: string) => string); + idempotencyKeyFn: (id: string) => string; /** * Acquire a lock for the execution. diff --git a/lib/core/promises/promises.ts b/lib/core/promises/promises.ts index 57dac45..ee23095 100644 --- a/lib/core/promises/promises.ts +++ b/lib/core/promises/promises.ts @@ -202,19 +202,16 @@ export class DurablePromise { timeout: number, opts: Partial = {}, ) { - return new DurablePromise( - store, - encoder, - await store.create( - id, - opts.idempotencyKey, - opts.strict ?? false, - opts.headers, - encoder.encode(opts.param), - timeout, - opts.tags, - ), + const storedPromise = await store.create( + id, + opts.idempotencyKey, + opts.strict ?? false, + opts.headers, + encoder.encode(opts.param), + timeout, + opts.tags, ); + return new DurablePromise(store, encoder, storedPromise); } /** diff --git a/lib/core/utils.ts b/lib/core/utils.ts index 1e1471a..ac4dc41 100644 --- a/lib/core/utils.ts +++ b/lib/core/utils.ts @@ -1,3 +1,5 @@ +import { Options, isOptions } from "./options"; + export function randomId(): string { return Math.floor(Math.random() * Number.MAX_SAFE_INTEGER).toString(16); } @@ -13,3 +15,10 @@ export function hash(s: string): string { const maxLength = 8; return "0".repeat(Math.max(0, maxLength - hashString.length)) + hashString; } + +export function split(argsWithOpts: any[]): { args: any[]; opts: Partial } { + const possibleOpts = argsWithOpts.at(-1); + return isOptions(possibleOpts) + ? { args: argsWithOpts.slice(0, -1), opts: possibleOpts } + : { args: argsWithOpts, opts: {} }; +} diff --git a/lib/generator.ts b/lib/generator.ts index 2270ce7..802436a 100644 --- a/lib/generator.ts +++ b/lib/generator.ts @@ -4,6 +4,7 @@ import { Invocation } from "./core/invocation"; import { ResonateOptions, Options, PartialOptions } from "./core/options"; import { DurablePromise } from "./core/promises/promises"; import * as schedules from "./core/schedules/schedules"; +import * as utils from "./core/utils"; import { ResonateBase } from "./resonate"; ///////////////////////////////////////////////////////////////////// @@ -78,10 +79,9 @@ export class Resonate extends ResonateBase { func: F, args: Params, opts: Options, - defaults: Options, durablePromise?: DurablePromise, ): ResonatePromise> { - return this.scheduler.add(name, id, func, args, opts, defaults, durablePromise); + return this.scheduler.add(name, id, func, args, opts, durablePromise); } /** @@ -195,7 +195,10 @@ export class Info { } export class Context { - constructor(private invocation: Invocation) {} + constructor( + private resonate: Resonate, + private invocation: Invocation, + ) {} /** * The running count of child function invocations. @@ -313,7 +316,9 @@ export class Context { } private _call(func: string | ((...args: any[]) => any), argsWithOpts: any[], yieldFuture: boolean): Call { - const { args, opts } = this.invocation.split(argsWithOpts); + const { args, opts: givenOpts } = utils.split(argsWithOpts); + + const opts = this.resonate.defaults(givenOpts); if (typeof func === "string") { return { kind: "call", value: { kind: "deferred", func, args, opts }, yieldFuture }; @@ -366,7 +371,6 @@ class Scheduler { func: F, args: Params, opts: Options, - defaults: Options, durablePromise?: DurablePromise, ): ResonatePromise> { // if the execution is already running, and not killed, @@ -383,10 +387,10 @@ class Scheduler { }; // create a new invocation - const invocation = new Invocation(name, id, undefined, param, opts, defaults); + const invocation = new Invocation(name, id, undefined, param, opts); // create a new execution - const generator = func(new Context(invocation), ...args); + const generator = func(new Context(this.resonate, invocation), ...args); const execution = new GeneratorExecution(this.resonate, invocation, generator, durablePromise); // once the durable promise has been created, @@ -504,14 +508,11 @@ class Scheduler { // human readable name of the function const name = value.kind === "deferred" ? value.func : value.func.name; - // default opts never change - const defaults = parent.defaults; - // param is only required for deferred executions const param = value.kind === "deferred" ? value.args[0] : undefined; // create a new invocation - const invocation = new Invocation(name, id, undefined, param, value.opts, defaults, parent); + const invocation = new Invocation(name, id, undefined, param, value.opts, parent); // add child and increment counter parent.addChild(invocation); @@ -521,7 +522,7 @@ class Scheduler { if (value.kind === "resonate") { // create a generator execution - const ctx = new Context(invocation); + const ctx = new Context(this.resonate, invocation); execution = new GeneratorExecution(this.resonate, invocation, value.func(ctx, ...value.args)); await execution.create(); diff --git a/lib/resonate.ts b/lib/resonate.ts index 26157cb..a76e4e5 100644 --- a/lib/resonate.ts +++ b/lib/resonate.ts @@ -3,7 +3,7 @@ import { JSONEncoder } from "./core/encoders/json"; import { ResonatePromise } from "./core/future"; import { ILogger } from "./core/logger"; import { Logger } from "./core/loggers/logger"; -import { ResonateOptions, Options, PartialOptions, isOptions } from "./core/options"; +import { ResonateOptions, Options, PartialOptions } from "./core/options"; import * as promises from "./core/promises/promises"; import * as retryPolicy from "./core/retry"; import * as schedules from "./core/schedules/schedules"; @@ -121,7 +121,6 @@ export abstract class ResonateBase { func: Func, args: any[], opts: Options, - defaults: Options, durablePromise?: promises.DurablePromise, ): ResonatePromise; @@ -170,18 +169,15 @@ export abstract class ResonateBase { * @returns A promise that resolve to the function return value. */ run(name: string, id: string, ...argsWithOpts: [...any, PartialOptions?]): ResonatePromise { - const { - args, - opts: { version }, - part: { durable, idempotencyKey, tags, timeout }, - } = this.split(argsWithOpts); + const { args, opts: givenOpts } = utils.split(argsWithOpts); + const { version, durable, idempotencyKeyFn, eidFn, tags, timeout } = givenOpts; - if (!this.functions[name] || !this.functions[name][version]) { + if (!this.functions[name] || !this.functions[name][version || 0]) { throw new Error(`Function ${name} version ${version} not registered`); } // the options registered with the function are the defaults - const { func, opts: defaults } = this.functions[name][version]; + const { func, opts: registeredOpts } = this.functions[name][version || 0]; // only the following options can be overridden, this information is persisted // in the durable promise and therefore not required on the recovery path @@ -191,14 +187,18 @@ export abstract class ResonateBase { override.durable = durable; } - if (idempotencyKey !== undefined) { - override.idempotencyKey = idempotencyKey; + if (idempotencyKeyFn !== undefined) { + override.idempotencyKeyFn = idempotencyKeyFn; + } + + if (eidFn !== undefined) { + override.eidFn = eidFn; } if (tags !== undefined) { - override.tags = { ...defaults.tags, ...tags, "resonate:invocation": "true" }; + override.tags = { ...registeredOpts.tags, ...tags, "resonate:invocation": "true" }; } else { - override.tags = { ...defaults.tags, "resonate:invocation": "true" }; + override.tags = { ...registeredOpts.tags, "resonate:invocation": "true" }; } if (timeout !== undefined) { @@ -207,14 +207,25 @@ export abstract class ResonateBase { // merge defaults with override to get opts const opts = { - ...defaults, + ...registeredOpts, ...override, }; // lock on top level is true by default opts.lock = opts.lock ?? true; - return this.execute(name, id, func, args, opts, defaults); + return this.execute(name, id, func, args, opts); + } + + // Gets the registered options for a specific function and version + // that has been previously registered. + registeredOptions(name: string, version: number): Options { + if (!this.functions[name] || !this.functions[name][version]) { + throw new Error(`Function ${name} version ${version} not registered`); + } + + const { opts } = this.functions[name][version]; + return opts; } schedule( @@ -223,7 +234,9 @@ export abstract class ResonateBase { func: Func | string, ...argsWithOpts: [...any, PartialOptions?] ): Promise { - const { args, opts } = this.split(argsWithOpts); + const { args, opts: givenOpts } = utils.split(argsWithOpts); + + const opts = this.defaults(givenOpts); if (typeof func === "function") { // if function is provided, the default version is 1 @@ -242,8 +255,7 @@ export abstract class ResonateBase { opts: { retry, version, timeout, tags: promiseTags }, } = this.functions[funcName][opts.version]; - const idempotencyKey = - typeof opts.idempotencyKey === "function" ? opts.idempotencyKey(funcName) : opts.idempotencyKey; + const idempotencyKey = opts.idempotencyKeyFn(funcName); const promiseParam = { func: funcName, @@ -288,11 +300,11 @@ export abstract class ResonateBase { clearInterval(this.interval); } - private defaults({ + public defaults({ durable = true, - eid = utils.randomId, + eidFn = utils.randomId, encoder = this.encoder, - idempotencyKey = utils.hash, + idempotencyKeyFn = utils.hash, lock = undefined, poll = this.poll, retry = this.retry, @@ -305,10 +317,10 @@ export abstract class ResonateBase { return { __resonate: true, - eid, + eidFn, durable, encoder, - idempotencyKey, + idempotencyKeyFn, lock, poll, retry, @@ -337,7 +349,7 @@ export abstract class ResonateBase { ) { const { func, opts } = this.functions[param.func][param.version]; opts.retry = param.retryPolicy; - this.execute(param.func, promise.id, func, param.args, opts, opts, promise); + this.execute(param.func, promise.id, func, param.args, opts, promise); } } } @@ -347,13 +359,6 @@ export abstract class ResonateBase { this.logger.error(e); } } - - private split(args: [...any, PartialOptions?]): { args: any[]; opts: Options; part: Partial } { - const part = args[args.length - 1]; - return isOptions(part) - ? { args: args.slice(0, -1), opts: this.defaults(part), part } - : { args, opts: this.defaults(), part: {} }; - } } export interface ResonatePromises { diff --git a/test/options.test.ts b/test/options.test.ts index ef23480..341db2f 100644 --- a/test/options.test.ts +++ b/test/options.test.ts @@ -43,9 +43,9 @@ describe("Options", () => { const overrides = { durable: false, - eid: "eid", + eidFn: () => "eid", encoder: new Base64Encoder(), - idempotencyKey: "idempotencyKey", + idempotencyKeyFn: (_: string) => "idempotencyKey", lock: false, poll: 2000, retry: retry.linear(), @@ -79,11 +79,12 @@ describe("Options", () => { resonate.options({ version: 1 }), ); + // Most options defaults are set when created a resonate instance for (const opts of [top, middle, bottom]) { expect(opts.durable).toBe(false); - expect(opts.eid).toBe(utils.randomId); + expect(opts.eidFn).toBe(utils.randomId); expect(opts.encoder).toBe(resonateOpts.encoder); - expect(opts.idempotencyKey).toBe(utils.hash); + expect(opts.idempotencyKeyFn).toBe(utils.hash); expect(opts.poll).toBe(resonateOpts.poll); expect(opts.retry).toBe(resonateOpts.retry); expect(opts.timeout).toBe(resonateOpts.timeout); @@ -104,9 +105,9 @@ describe("Options", () => { for (const opts of [top, middle, bottom]) { expect(opts.durable).toBe(overrides.durable); - expect(opts.eid).toBe(overrides.eid); + expect(opts.eidFn).toBe(overrides.eidFn); expect(opts.encoder).toBe(overrides.encoder); - expect(opts.idempotencyKey).toBe(overrides.idempotencyKey); + expect(opts.idempotencyKeyFn).toBe(overrides.idempotencyKeyFn); expect(opts.lock).toBe(overrides.lock); expect(opts.poll).toBe(overrides.poll); expect(opts.retry).toBe(overrides.retry); @@ -131,9 +132,9 @@ describe("Options", () => { // top level options expect(top.durable).toBe(false); - expect(top.eid).toBe(utils.randomId); + expect(top.eidFn).toBe(overrides.eidFn); expect(top.encoder).toBe(resonateOpts.encoder); - expect(top.idempotencyKey).toBe(overrides.idempotencyKey); + expect(top.idempotencyKeyFn).toBe(overrides.idempotencyKeyFn); expect(top.lock).toBe(true); expect(top.poll).toBe(resonateOpts.poll); expect(top.retry).toBe(resonateOpts.retry); @@ -144,9 +145,9 @@ describe("Options", () => { // bottom level options for (const opts of bottom) { expect(opts.durable).toBe(false); - expect(opts.eid).toBe(utils.randomId); + expect(opts.eidFn).toBe(utils.randomId); expect(opts.encoder).toBe(resonateOpts.encoder); - expect(opts.idempotencyKey).toBe(utils.hash); + expect(opts.idempotencyKeyFn).toBe(utils.hash); expect(opts.lock).toBe(false); expect(opts.poll).toBe(resonateOpts.poll); expect(opts.retry).toBe(resonateOpts.retry); @@ -166,28 +167,30 @@ describe("Options", () => { // middle options (overriden) expect(middle.durable).toBe(overrides.durable); - expect(middle.eid).toBe(overrides.eid); + expect(middle.eidFn).toBe(overrides.eidFn); expect(middle.encoder).toBe(overrides.encoder); - expect(middle.idempotencyKey).toBe(overrides.idempotencyKey); + expect(middle.idempotencyKeyFn).toBe(overrides.idempotencyKeyFn); expect(middle.lock).toBe(overrides.lock); expect(middle.poll).toBe(overrides.poll); expect(middle.retry).toBe(overrides.retry); expect(middle.tags).toEqual({ ...resonateOpts.tags, ...overrides.tags }); expect(middle.timeout).toBe(overrides.timeout); - expect(middle.version).toBe(1); // top and bottom options for (const opts of [top, bottom]) { expect(opts.durable).toBe(false); - expect(opts.eid).toBe(utils.randomId); + expect(opts.eidFn).toBe(utils.randomId); expect(opts.encoder).toBe(resonateOpts.encoder); - expect(opts.idempotencyKey).toBe(utils.hash); + expect(opts.idempotencyKeyFn).toBe(utils.hash); expect(opts.poll).toBe(resonateOpts.poll); expect(opts.retry).toBe(resonateOpts.retry); expect(opts.timeout).toBe(resonateOpts.timeout); - expect(opts.version).toBe(1); } + expect(top.version).toBe(1); + expect(middle.version).toBeDefined(); + expect(bottom.version).toBeDefined(); + expect(top.lock).toBe(true); expect(bottom.lock).toBe(false);