From d344145ec847b45c36e9975ac805b6d4fc9343fe Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Tue, 30 Mar 2021 23:41:45 -0400 Subject: [PATCH] retire in-house pubsub we don't need a pubsub to subscribe only to the first published valuee, we can use an expectant store that returns a promise when the store does not yet contain the value, where all the relevant promises resolve on a future set. this change means we can retire the in-house pubsub. repeaters probably make in-house pubsub unnecessary anyway, and were not yet used elsewhere in the codebase also somewhat relevant, see https://github.com/repeaterjs/repeater/issues/67#issue-692118936 --- packages/batch-execute/package.json | 1 - .../{pubsub => batch-execute}/src/split.ts | 2 +- packages/delegate/package.json | 1 - packages/delegate/src/Receiver.ts | 114 ++++++++---------- packages/delegate/src/expectantStore.ts | 53 ++++++++ packages/pubsub/package.json | 31 ----- packages/pubsub/src/in-memory-channel.ts | 48 -------- packages/pubsub/src/in-memory-pubsub.ts | 52 -------- packages/pubsub/src/index.ts | 4 - packages/pubsub/src/types.ts | 16 --- 10 files changed, 102 insertions(+), 220 deletions(-) rename packages/{pubsub => batch-execute}/src/split.ts (98%) create mode 100644 packages/delegate/src/expectantStore.ts delete mode 100644 packages/pubsub/package.json delete mode 100644 packages/pubsub/src/in-memory-channel.ts delete mode 100644 packages/pubsub/src/in-memory-pubsub.ts delete mode 100644 packages/pubsub/src/index.ts delete mode 100644 packages/pubsub/src/types.ts diff --git a/packages/batch-execute/package.json b/packages/batch-execute/package.json index 2092f2ffd27..f5f7b138319 100644 --- a/packages/batch-execute/package.json +++ b/packages/batch-execute/package.json @@ -22,7 +22,6 @@ "input": "./src/index.ts" }, "dependencies": { - "@graphql-tools/pubsub": "^7.0.0", "@graphql-tools/utils": "^7.0.0", "dataloader": "2.0.0", "is-promise": "4.0.0", diff --git a/packages/pubsub/src/split.ts b/packages/batch-execute/src/split.ts similarity index 98% rename from packages/pubsub/src/split.ts rename to packages/batch-execute/src/split.ts index 4a4a37931f9..ede6e83b873 100644 --- a/packages/pubsub/src/split.ts +++ b/packages/batch-execute/src/split.ts @@ -4,7 +4,7 @@ import { Push, Repeater } from '@repeaterjs/repeater'; -import { Splitter } from './types'; +type Splitter = (item: T) => [number, T]; export function split(asyncIterable: AsyncIterableIterator, n: number, splitter: Splitter>) { const iterator = asyncIterable[Symbol.asyncIterator](); diff --git a/packages/delegate/package.json b/packages/delegate/package.json index 2bff6700c2f..c09cab11e83 100644 --- a/packages/delegate/package.json +++ b/packages/delegate/package.json @@ -24,7 +24,6 @@ "dependencies": { "@ardatan/aggregate-error": "0.0.6", "@graphql-tools/batch-execute": "^7.0.0", - "@graphql-tools/pubsub": "^7.0.0", "@graphql-tools/schema": "^7.0.0", "@graphql-tools/utils": "^7.1.6", "@repeaterjs/repeater": "^3.0.4", diff --git a/packages/delegate/src/Receiver.ts b/packages/delegate/src/Receiver.ts index 61bbb9cf7ee..7ddc3708f19 100644 --- a/packages/delegate/src/Receiver.ts +++ b/packages/delegate/src/Receiver.ts @@ -13,15 +13,15 @@ import { import DataLoader from 'dataloader'; -import { Repeater } from '@repeaterjs/repeater'; +import { Repeater, Stop } from '@repeaterjs/repeater'; import { AsyncExecutionResult } from '@graphql-tools/utils'; -import { InMemoryPubSub } from '@graphql-tools/pubsub'; import { DelegationContext, ExternalObject } from './types'; import { getReceiver, getSubschema, getUnpathedErrors, mergeExternalObjects } from './externalObjects'; import { resolveExternalValue } from './resolveExternalValue'; import { externalValueFromResult, externalValueFromPatchResult } from './externalValues'; +import { ExpectantStore } from './expectantStore'; export class Receiver { private readonly asyncIterable: AsyncIterable; @@ -31,10 +31,10 @@ export class Receiver { private readonly asyncSelectionSets: Record; private readonly resultTransformer: (originalResult: ExecutionResult) => any; private readonly initialResultDepth: number; - private readonly pubsub: InMemoryPubSub; private deferredPatches: Record>; private streamedPatches: Record>>; - private cache: Record; + private cache: ExpectantStore; + private stoppers: Array; private loaders: Record>; private infos: Record>; @@ -54,11 +54,11 @@ export class Receiver { this.resultTransformer = resultTransformer; this.initialResultDepth = info ? responsePathAsArray(info.path).length - 1 : 0; - this.pubsub = new InMemoryPubSub(); this.deferredPatches = Object.create(null); this.streamedPatches = Object.create(null); - this.cache = Object.create(null); + this.cache = new ExpectantStore(); + this.stoppers = []; this.loaders = Object.create(null); this.infos = Object.create(null); } @@ -67,7 +67,7 @@ export class Receiver { const asyncIterator = this.asyncIterable[Symbol.asyncIterator](); const payload = await asyncIterator.next(); const initialResult = externalValueFromResult(this.resultTransformer(payload.value), this.delegationContext, this); - this.cache[this.fieldName] = initialResult; + this.cache.set(this.fieldName, initialResult); this._iterate(); @@ -110,73 +110,52 @@ export class Receiver { this.onNewInfo(pathKey, combinedInfo); } - const parent = this.cache[parentKey]; - if (parent !== undefined) { - const data = parent[responseKey]; - if (data !== undefined) { - const unpathedErrors = getUnpathedErrors(parent); - const subschema = getSubschema(parent, responseKey); - const receiver = getReceiver(parent, subschema); - this.onNewExternalValue( - pathKey, - resolveExternalValue(data, unpathedErrors, subschema, this.context, combinedInfo, receiver), - isCompositeType(getNamedType(combinedInfo.returnType)) - ? { - kind: Kind.SELECTION_SET, - selections: [].concat(...combinedInfo.fieldNodes.map(fieldNode => fieldNode.selectionSet.selections)), - } - : undefined - ); - } + const parent = await this.cache.request(parentKey); + + const data = parent[responseKey]; + if (data !== undefined) { + const unpathedErrors = getUnpathedErrors(parent); + const subschema = getSubschema(parent, responseKey); + const receiver = getReceiver(parent, subschema); + this.onNewExternalValue( + pathKey, + resolveExternalValue(data, unpathedErrors, subschema, this.context, combinedInfo, receiver), + isCompositeType(getNamedType(combinedInfo.returnType)) + ? { + kind: Kind.SELECTION_SET, + selections: [].concat(...combinedInfo.fieldNodes.map(fieldNode => fieldNode.selectionSet.selections)), + } + : undefined + ); } if (fieldShouldStream(combinedInfo)) { return infos.map( () => new Repeater(async (push, stop) => { - let initialValues: Array = this.cache[pathKey]; - if (initialValues !== undefined) { - initialValues.forEach(async value => push(value)); - } else { - const payload = await this.pubsub.subscribe(pathKey).next(); - initialValues = payload.value; - if (initialValues === undefined) { - return; - } - } + const initialValues = ((await this.cache.request(pathKey)) as unknown) as Array; + initialValues.forEach(async value => push(value)); let index = initialValues.length; - while (true) { - const listMemberKey = `${pathKey}.${index++}`; - - let listMember = this.cache[listMemberKey]; - if (listMember !== undefined) { - await push(listMember); - continue; - } - - const listMemberPayload = await this.pubsub.subscribe(listMemberKey).next(); - listMember = listMemberPayload.value; - if (listMember === undefined) { - break; - } - - await push(listMember); - } - stop(); + let stopped = false; + stop.then(() => (stopped = true)); + + this.stoppers.push(stop); + + const next = () => this.cache.request(`${pathKey}.${index++}`); + + /* eslint-disable no-unmodified-loop-condition */ + while (!stopped) { + await push(next()); + } + /* eslint-disable no-unmodified-loop-condition */ }) ); } - const externalValue = this.cache[pathKey]; - if (externalValue !== undefined) { - return new Array(infos.length).fill(externalValue); - } - - const payload = await this.pubsub.subscribe(pathKey).next(); - - return new Array(infos.length).fill(payload.value); + const externalValue = await this.cache.request(pathKey); + return new Array(infos.length).fill(externalValue); } private async _iterate(): Promise { @@ -266,13 +245,15 @@ export class Receiver { } setTimeout(() => { - this.pubsub.close(); + this.cache.clear(); + this.stoppers.forEach(stop => stop()); }); } private onNewExternalValue(pathKey: string, newExternalValue: any, selectionSet: SelectionSetNode): void { - const externalValue = this.cache[pathKey]; - this.cache[pathKey] = + const externalValue = this.cache.get(pathKey); + this.cache.set( + pathKey, externalValue === undefined ? newExternalValue : mergeExternalObjects( @@ -282,7 +263,8 @@ export class Receiver { externalValue, [newExternalValue], [selectionSet] - ); + ) + ); const infosByParentKey = this.infos[pathKey]; if (infosByParentKey !== undefined) { @@ -309,7 +291,7 @@ export class Receiver { }); } - this.pubsub.publish(pathKey, newExternalValue); + this.cache.set(pathKey, newExternalValue); } private onNewInfo(pathKey: string, info: GraphQLResolveInfo): void { diff --git a/packages/delegate/src/expectantStore.ts b/packages/delegate/src/expectantStore.ts new file mode 100644 index 00000000000..3d52d606f2b --- /dev/null +++ b/packages/delegate/src/expectantStore.ts @@ -0,0 +1,53 @@ +interface Settler { + resolve(value: T): void; + reject(reason?: any): void; +} + +export class ExpectantStore { + protected settlers: Record>> = {}; + protected cache: Record = {}; + + set(key: string, value: T): void { + this.cache[key] = value; + const settlers = this.settlers[key]; + if (settlers != null) { + for (const { resolve } of settlers) { + resolve(value); + } + settlers.clear(); + delete this.settlers[key]; + } + } + + get(key: string): T { + return this.cache[key]; + } + + request(key: string): Promise | T { + const value = this.cache[key]; + + if (value !== undefined) { + return value; + } + + let settlers = this.settlers[key]; + if (settlers != null) { + settlers = this.settlers[key] = new Set(); + } + + return new Promise((resolve, reject) => { + settlers.add({ resolve, reject }); + }); + } + + clear(reason?: any): void { + for (const settlers of Object.values(this.settlers)) { + for (const { reject } of settlers) { + reject(reason); + } + } + + this.settlers = {}; + this.cache = {}; + } +} diff --git a/packages/pubsub/package.json b/packages/pubsub/package.json deleted file mode 100644 index 198d61fe711..00000000000 --- a/packages/pubsub/package.json +++ /dev/null @@ -1,31 +0,0 @@ -{ - "name": "@graphql-tools/pubsub", - "version": "7.0.0", - "description": "A set of utils for faster development of GraphQL tools", - "repository": { - "type": "git", - "url": "ardatan/graphql-tools", - "directory": "packages/batch-delegate" - }, - "license": "MIT", - "sideEffects": false, - "main": "dist/index.cjs.js", - "module": "dist/index.esm.js", - "typings": "dist/index.d.ts", - "typescript": { - "definition": "dist/index.d.ts" - }, - "peerDependencies": { - "graphql": "^14.0.0 || ^15.0.0" - }, - "buildOptions": { - "input": "./src/index.ts" - }, - "dependencies": { - "@repeaterjs/repeater": "^3.0.4" - }, - "publishConfig": { - "access": "public", - "directory": "dist" - } -} diff --git a/packages/pubsub/src/in-memory-channel.ts b/packages/pubsub/src/in-memory-channel.ts deleted file mode 100644 index 8e386a67ce8..00000000000 --- a/packages/pubsub/src/in-memory-channel.ts +++ /dev/null @@ -1,48 +0,0 @@ -// adapted from https://github.com/repeaterjs/repeater/blob/7b294acfa7e2c21721ff77018cf77c452c51dad9/packages/pubsub/src/pubsub.ts -// adapted rather than importing @repeaterjs/pubsub -// because of https://github.com/repeaterjs/repeater/issues/67 in which pubsub will be killed! - -import { Repeater, RepeaterBuffer } from '@repeaterjs/repeater'; - -import { Channel } from './types'; - -interface Hooks { - push(value: T): Promise; - stop(reason?: any): unknown; -} - -export class InMemoryChannel implements Channel { - protected hooks: Set> = new Set(); - - publish(value: T): void { - const hooks = this.hooks; - - for (const { push, stop } of hooks) { - try { - push(value).catch(stop); - } catch (err) { - // push queue is full - stop(err); - } - } - } - - unpublish(reason?: any): void { - const hooks = this.hooks; - - for (const { stop } of hooks) { - stop(reason); - } - - hooks.clear(); - } - - subscribe(buffer?: RepeaterBuffer): Repeater { - return new Repeater(async (push, stop) => { - const publisher = { push, stop }; - this.hooks.add(publisher); - await stop; - this.hooks.delete(publisher); - }, buffer); - } -} diff --git a/packages/pubsub/src/in-memory-pubsub.ts b/packages/pubsub/src/in-memory-pubsub.ts deleted file mode 100644 index 54275519c24..00000000000 --- a/packages/pubsub/src/in-memory-pubsub.ts +++ /dev/null @@ -1,52 +0,0 @@ -// adapted from https://github.com/repeaterjs/repeater/blob/7b294acfa7e2c21721ff77018cf77c452c51dad9/packages/pubsub/src/pubsub.ts -// adapted rather than importing @repeaterjs/pubsub -// because of https://github.com/repeaterjs/repeater/issues/67 in which pubsub will be killed! - -import { Repeater, RepeaterBuffer } from '@repeaterjs/repeater'; -import { InMemoryChannel } from './in-memory-channel'; - -import { PubSub } from './types'; - -export class InMemoryPubSub implements PubSub { - protected channels: Record> = Object.create(null); - - publish(topic: string, value: T): void { - let channel = this.channels[topic]; - - if (channel === undefined) { - channel = this.channels[topic] = new InMemoryChannel(); - } - - channel.publish(value); - } - - unpublish(topic: string, reason?: any): void { - const channel = this.channels[topic]; - - if (channel === undefined) { - return; - } - - channel.unpublish(reason); - - delete this.channels[topic]; - } - - subscribe(topic: string, buffer?: RepeaterBuffer): Repeater { - let channel = this.channels[topic]; - - if (this.channels[topic] === undefined) { - channel = this.channels[topic] = new InMemoryChannel(); - } - - return channel.subscribe(buffer); - } - - close(reason?: any): void { - for (const channel of Object.values(this.channels)) { - channel.unpublish(reason); - } - - this.channels = Object.create(null); - } -} diff --git a/packages/pubsub/src/index.ts b/packages/pubsub/src/index.ts deleted file mode 100644 index ffdbbff8412..00000000000 --- a/packages/pubsub/src/index.ts +++ /dev/null @@ -1,4 +0,0 @@ -export * from './in-memory-channel'; -export * from './in-memory-pubsub'; -export * from './split'; -export * from './types'; diff --git a/packages/pubsub/src/types.ts b/packages/pubsub/src/types.ts deleted file mode 100644 index 72ddce3e91c..00000000000 --- a/packages/pubsub/src/types.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { RepeaterBuffer } from '@repeaterjs/repeater'; - -export interface Channel { - publish(value: T): Promise | unknown; - unpublish(reason?: any): Promise | unknown; - subscribe(buffer?: RepeaterBuffer): AsyncIterableIterator; -} - -export interface PubSub { - publish(topic: string, value: T): Promise | unknown; - unpublish(topic: string, reason?: any): Promise | unknown; - subscribe(topic: string, buffer?: RepeaterBuffer): AsyncIterableIterator; - close(reason?: any): Promise | unknown; -} - -export type Splitter = (item: T) => [number, T];