From fcc49aaf13eea34fc49c1bc30d9d42974b5c2451 Mon Sep 17 00:00:00 2001 From: daffl Date: Mon, 19 Dec 2022 21:24:21 -0800 Subject: [PATCH] feat(databases): Add support for fast bulk operations --- docs/api/databases/common.md | 12 +++ packages/adapter-commons/src/declarations.ts | 1 + packages/adapter-commons/src/service.ts | 4 + packages/adapter-tests/src/declarations.ts | 3 + packages/adapter-tests/src/methods.ts | 85 ++++++++++++++++++++ packages/knex/src/adapter.ts | 20 +++++ packages/knex/test/index.test.ts | 3 + packages/memory/src/index.ts | 32 +++++--- packages/memory/test/index.test.ts | 3 + packages/mongodb/src/adapter.ts | 31 ++++--- packages/mongodb/test/index.test.ts | 3 + 11 files changed, 172 insertions(+), 25 deletions(-) diff --git a/docs/api/databases/common.md b/docs/api/databases/common.md index d09ce6bf0c..6dc8cbb2e4 100644 --- a/docs/api/databases/common.md +++ b/docs/api/databases/common.md @@ -102,6 +102,18 @@ Disabling or changing the default pagination is not available in the client. Onl +## Bulk updates + +Some database adapters allow to set the `params.bulk` option to perform fast `create`, `patch` or `remove` operations for a large amount of data. Setting `params.bulk = true` will always return no data (an empty array `[]`) and not send any real-time events. + +```ts +const manyTodos = await readCSVFile('todos.csv') + +await app.service('todos').create(manyTodos, { + bulk: true +}) // -> [] +``` + ## Extending Adapters There are two ways to extend existing database adapters. Either by extending the base class or by adding functionality through hooks. diff --git a/packages/adapter-commons/src/declarations.ts b/packages/adapter-commons/src/declarations.ts index ffa90bd163..0f450b0148 100644 --- a/packages/adapter-commons/src/declarations.ts +++ b/packages/adapter-commons/src/declarations.ts @@ -70,6 +70,7 @@ export interface AdapterParams< > extends Params { adapter?: A paginate?: PaginationParams + bulk?: boolean } /** diff --git a/packages/adapter-commons/src/service.ts b/packages/adapter-commons/src/service.ts index c007a2e685..d6620a05bc 100644 --- a/packages/adapter-commons/src/service.ts +++ b/packages/adapter-commons/src/service.ts @@ -57,6 +57,10 @@ export abstract class AdapterBase< * @returns Wether or not multiple updates are allowed. */ allowsMulti(method: string, params: ServiceParams = {} as ServiceParams) { + if (params.bulk) { + return true + } + const always = alwaysMulti[method] if (typeof always !== 'undefined') { diff --git a/packages/adapter-tests/src/declarations.ts b/packages/adapter-tests/src/declarations.ts index d3d4f7cf65..8ec8f4ef46 100644 --- a/packages/adapter-tests/src/declarations.ts +++ b/packages/adapter-tests/src/declarations.ts @@ -33,6 +33,7 @@ export type AdapterMethodsTestName = | '.remove' | '.remove + $select' | '.remove + id + query' + | '.remove bulk' | '.remove + multi' | '.remove + multi no pagination' | '.remove + id + query id' @@ -49,12 +50,14 @@ export type AdapterMethodsTestName = | '.patch multiple no pagination' | '.patch multi query same' | '.patch multi query changed' + | '.patch bulk' | '.patch + NotFound' | '.patch + query + NotFound' | '.patch + id + query id' | '.create' | '.create + $select' | '.create multi' + | '.create bulk' | 'internal .find' | 'internal .get' | 'internal .create' diff --git a/packages/adapter-tests/src/methods.ts b/packages/adapter-tests/src/methods.ts index 9c1c72bff9..299d83bdcf 100644 --- a/packages/adapter-tests/src/methods.ts +++ b/packages/adapter-tests/src/methods.ts @@ -114,6 +114,28 @@ export default (test: AdapterMethodsTest, app: any, _errors: any, serviceName: s } }) + test('.remove bulk', async () => { + await service.create({ name: 'Dave', age: 29, created: true }) + await service.create({ + name: 'David', + age: 3, + created: true + }) + + const data = await service.remove(null, { + query: { created: true }, + bulk: true + }) + + assert.deepStrictEqual(data, []) + + const found = await service.find({ + query: { created: true } + }) + + assert.strictEqual(found.length, 0) + }) + test('.remove + multi', async () => { try { await service.remove(null) @@ -398,6 +420,39 @@ export default (test: AdapterMethodsTest, app: any, _errors: any, serviceName: s await service.remove(david[idProp]) }) + test('.patch bulk', async () => { + const dave = await service.create({ + name: 'Dave', + age: 29, + created: true + }) + const david = await service.create({ + name: 'David', + age: 3, + created: true + }) + + const data = await service.patch( + null, + { + age: 2 + }, + { + query: { created: true }, + bulk: true + } + ) + + assert.deepStrictEqual(data, []) + + const daveAfter = await service.get(dave[idProp]) + + assert.strictEqual(daveAfter.age, 2, 'Dave age was updated') + + await service.remove(dave[idProp]) + await service.remove(david[idProp]) + }) + test('.patch multiple no pagination', async () => { try { await service.remove(doug[idProp]) @@ -643,6 +698,36 @@ export default (test: AdapterMethodsTest, app: any, _errors: any, serviceName: s await service.remove(data[0][idProp]) await service.remove(data[1][idProp]) }) + + test('.create bulk', async () => { + const items = [ + { + name: 'Gerald', + age: 18 + }, + { + name: 'Herald', + age: 18 + } + ] + + const data = await service.create(items, { + bulk: true + }) + + assert.deepStrictEqual(data, []) + + const foundItems = await service.find({ + query: { age: 18 } + }) + + assert.strictEqual(foundItems.length, 2) + + await service.remove(null, { + query: { age: 18 }, + bulk: true + }) + }) }) describe("doesn't call public methods internally", () => { diff --git a/packages/knex/src/adapter.ts b/packages/knex/src/adapter.ts index 9c5d901417..ea351a0251 100644 --- a/packages/knex/src/adapter.ts +++ b/packages/knex/src/adapter.ts @@ -223,6 +223,15 @@ export class KnexAdapter< ): Promise { const data = _data as any + if (params.bulk) { + const res = await this.db(params) + .insert(data) + .then(() => []) + .catch(errorHandler) + + return res as Result[] + } + if (Array.isArray(data)) { return Promise.all(data.map((current) => this._create(current, params))) } @@ -252,6 +261,12 @@ export class KnexAdapter< } const data = _.omit(raw, this.id) + + if (params.bulk) { + await this.createQuery(params).update(data) + return [] + } + const results = await this._findOrGet(id, { ...params, query: { @@ -313,6 +328,11 @@ export class KnexAdapter< throw new MethodNotAllowed('Can not remove multiple entries') } + if (params.bulk) { + await this.createQuery(params).del().catch(errorHandler) + return [] + } + const items = await this._findOrGet(id, params) const { query } = this.filterQuery(params) const q = this.db(params) diff --git a/packages/knex/test/index.test.ts b/packages/knex/test/index.test.ts index 715adde7ad..08795ba57a 100644 --- a/packages/knex/test/index.test.ts +++ b/packages/knex/test/index.test.ts @@ -27,6 +27,7 @@ const testSuite = adapterTests([ '.remove + id + query', '.remove + multi', '.remove + multi no pagination', + '.remove bulk', '.remove + id + query id', '.update', '.update + $select', @@ -38,6 +39,7 @@ const testSuite = adapterTests([ '.patch + $select', '.patch + id + query', '.patch multiple', + '.patch bulk', '.patch multiple no pagination', '.patch multi query same', '.patch multi query changed', @@ -47,6 +49,7 @@ const testSuite = adapterTests([ '.create', '.create + $select', '.create multi', + '.create bulk', 'internal .find', 'internal .get', 'internal .create', diff --git a/packages/memory/src/index.ts b/packages/memory/src/index.ts index f05de9ce23..9b6234d9e4 100644 --- a/packages/memory/src/index.ts +++ b/packages/memory/src/index.ts @@ -6,7 +6,8 @@ import { AdapterBase, AdapterServiceOptions, PaginationOptions, - AdapterParams + AdapterParams, + AdapterQuery } from '@feathersjs/adapter-commons' import sift from 'sift' import { NullableId, Id, Params, Paginated } from '@feathersjs/feathers' @@ -28,10 +29,12 @@ const _select = (data: any, params: any, ...args: any[]) => { return base(JSON.parse(JSON.stringify(data))) } +export type MemoryAdapterParams = AdapterParams> + export class MemoryAdapter< Result = any, Data = Partial, - ServiceParams extends Params = Params, + ServiceParams extends MemoryAdapterParams = MemoryAdapterParams, PatchData = Partial > extends AdapterBase> { store: MemoryServiceStore @@ -145,18 +148,18 @@ export class MemoryAdapter< async _create(data: Partial[], params?: ServiceParams): Promise async _create(data: Partial | Partial[], _params?: ServiceParams): Promise async _create( - data: Partial | Partial[], + _data: Partial | Partial[], params: ServiceParams = {} as ServiceParams ): Promise { - if (Array.isArray(data)) { - return Promise.all(data.map((current) => this._create(current, params))) - } + const payload = Array.isArray(_data) ? _data : [_data] + const results = payload.map((value) => { + const id = (value as any)[this.id] || this._uId++ + const current = _.extend({}, value, { [this.id]: id }) - const id = (data as any)[this.id] || this._uId++ - const current = _.extend({}, data, { [this.id]: id }) - const result = (this.store[id] = current) + return _select((this.store[id] = current), params, this.id) + }) - return _select(result, params, this.id) as Result + return params.bulk ? [] : Array.isArray(_data) ? results : results[0] } async _update(id: Id, data: Data, params: ServiceParams = {} as ServiceParams): Promise { @@ -202,11 +205,12 @@ export class MemoryAdapter< ...params, query }) + const results = entries.map(patchEntry) - return entries.map(patchEntry) + return params.bulk ? [] : results } - return patchEntry(await this._get(id, params)) // Will throw an error if not found + return params.bulk ? [] : patchEntry(await this._get(id, params)) // Will throw an error if not found } async _remove(id: null, params?: ServiceParams): Promise @@ -225,7 +229,9 @@ export class MemoryAdapter< query }) - return Promise.all(entries.map((current: any) => this._remove(current[this.id] as Id, params))) + entries.forEach((current: any) => delete this.store[(current as any)[this.id]]) + + return params.bulk ? [] : entries } const entry = await this._get(id, params) diff --git a/packages/memory/test/index.test.ts b/packages/memory/test/index.test.ts index 3afc43914f..e6b4a85aea 100644 --- a/packages/memory/test/index.test.ts +++ b/packages/memory/test/index.test.ts @@ -26,6 +26,7 @@ const testSuite = adapterTests([ '.remove + id + query', '.remove + multi', '.remove + multi no pagination', + '.remove bulk', '.remove + id + query id', '.update', '.update + $select', @@ -40,12 +41,14 @@ const testSuite = adapterTests([ '.patch multiple no pagination', '.patch multi query same', '.patch multi query changed', + '.patch bulk', '.patch + query + NotFound', '.patch + NotFound', '.patch + id + query id', '.create', '.create + $select', '.create multi', + '.create bulk', 'internal .find', 'internal .get', 'internal .create', diff --git a/packages/mongodb/src/adapter.ts b/packages/mongodb/src/adapter.ts index b41c5125ba..a49b39c1c8 100644 --- a/packages/mongodb/src/adapter.ts +++ b/packages/mongodb/src/adapter.ts @@ -261,7 +261,6 @@ export class MongoDbAdapter< data: Data | Data[], params: ServiceParams = {} as ServiceParams ): Promise { - const writeOptions = params.mongodb const model = await this.getModel(params) const setId = (item: any) => { const entry = Object.assign({}, item) @@ -279,14 +278,14 @@ export class MongoDbAdapter< const promise = Array.isArray(data) ? model - .insertMany(data.map(setId), writeOptions) + .insertMany(data.map(setId), params.mongodb) .then(async (result) => - Promise.all( - Object.values(result.insertedIds).map(async (_id) => model.findOne({ _id }, params.mongodb)) - ) + params.bulk + ? [] + : model.find({ _id: { $in: Object.values(result.insertedIds) } }, params.mongodb).toArray() ) : model - .insertOne(setId(data), writeOptions) + .insertOne(setId(data), params.mongodb) .then(async (result) => model.findOne({ _id: result.insertedId }, params.mongodb)) return promise.then(select(params, this.id)).catch(errorHandler) @@ -325,6 +324,12 @@ export class MongoDbAdapter< return current }, {} as any) + + if (params.bulk) { + await model.updateMany(query, modifier, updateOptions) + return [] + } + const originalIds = await this._findOrGet(id, { ...params, query: { @@ -389,11 +394,13 @@ export class MongoDbAdapter< } } - return this._findOrGet(id, findParams) - .then(async (items) => { - await model.deleteMany(query, deleteOptions) - return items - }) - .catch(errorHandler) + return params.bulk + ? model.deleteMany(query, deleteOptions).then(() => []) + : this._findOrGet(id, findParams) + .then(async (items) => { + await model.deleteMany(query, deleteOptions) + return items + }) + .catch(errorHandler) } } diff --git a/packages/mongodb/test/index.test.ts b/packages/mongodb/test/index.test.ts index ddc1455e4c..51adaf512e 100644 --- a/packages/mongodb/test/index.test.ts +++ b/packages/mongodb/test/index.test.ts @@ -26,6 +26,7 @@ const testSuite = adapterTests([ '.remove', '.remove + $select', '.remove + id + query', + '.remove bulk', '.remove + multi', '.remove + multi no pagination', '.remove + id + query id', @@ -38,6 +39,7 @@ const testSuite = adapterTests([ '.patch', '.patch + $select', '.patch + id + query', + '.patch bulk', '.patch multiple', '.patch multiple no pagination', '.patch multi query same', @@ -47,6 +49,7 @@ const testSuite = adapterTests([ '.patch + id + query id', '.create', '.create + $select', + '.create bulk', '.create multi', 'internal .find', 'internal .get',