diff --git a/apps/server/src/modules/tldraw/repo/tldraw-board.repo.spec.ts b/apps/server/src/modules/tldraw/repo/tldraw-board.repo.spec.ts index 6d9b3c799bb..d5b3451d1a7 100644 --- a/apps/server/src/modules/tldraw/repo/tldraw-board.repo.spec.ts +++ b/apps/server/src/modules/tldraw/repo/tldraw-board.repo.spec.ts @@ -55,11 +55,6 @@ describe('TldrawBoardRepo', () => { it('should check if repo and its properties are set correctly', () => { expect(repo).toBeDefined(); expect(repo.mdb).toBeDefined(); - expect(repo.configService).toBeDefined(); - expect(repo.flushSize).toBeDefined(); - expect(repo.multipleCollections).toBeDefined(); - expect(repo.connectionString).toBeDefined(); - expect(repo.collectionName).toBeDefined(); }); describe('updateDocument', () => { @@ -75,7 +70,7 @@ describe('TldrawBoardRepo', () => { const storeGetYDocSpy = jest .spyOn(repo.mdb, 'getYDoc') .mockImplementation(() => Promise.resolve(new WsSharedDocDo('TEST', service))); - const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdate').mockImplementation(() => Promise.resolve(1)); + const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdateTransactional').mockImplementation(() => Promise.resolve(1)); return { doc, @@ -105,7 +100,7 @@ describe('TldrawBoardRepo', () => { // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore doc.conns.set(ws, wsSet); - const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdate').mockImplementation(() => Promise.resolve(1)); + const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdateTransactional').mockImplementation(() => Promise.resolve(1)); const storeGetYDocSpy = jest .spyOn(repo.mdb, 'getYDoc') .mockImplementation(() => Promise.resolve(new WsSharedDocDo('TEST', service))); @@ -158,7 +153,7 @@ describe('TldrawBoardRepo', () => { describe('when the difference between update and current drawing is more than 0', () => { const setup = () => { const calculateDiffSpy = jest.spyOn(YjsUtils, 'calculateDiff').mockImplementationOnce(() => 1); - const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdate').mockResolvedValueOnce(Promise.resolve(1)); + const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdateTransactional').mockResolvedValueOnce(Promise.resolve(1)); return { calculateDiffSpy, @@ -181,7 +176,7 @@ describe('TldrawBoardRepo', () => { describe('when the difference between update and current drawing is 0', () => { const setup = () => { const calculateDiffSpy = jest.spyOn(YjsUtils, 'calculateDiff').mockImplementationOnce(() => 0); - const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdate'); + const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdateTransactional'); return { calculateDiffSpy, @@ -204,7 +199,7 @@ describe('TldrawBoardRepo', () => { describe('flushDocument', () => { const setup = () => { - const flushDocumentSpy = jest.spyOn(repo.mdb, 'flushDocument').mockResolvedValueOnce(Promise.resolve()); + const flushDocumentSpy = jest.spyOn(repo.mdb, 'flushDocumentTransactional').mockResolvedValueOnce(Promise.resolve()); return { flushDocumentSpy }; }; diff --git a/apps/server/src/modules/tldraw/repo/tldraw-board.repo.ts b/apps/server/src/modules/tldraw/repo/tldraw-board.repo.ts index 23fc3bd607d..1eadcf61340 100644 --- a/apps/server/src/modules/tldraw/repo/tldraw-board.repo.ts +++ b/apps/server/src/modules/tldraw/repo/tldraw-board.repo.ts @@ -23,7 +23,7 @@ export class TldrawBoardRepo { public updateStoredDocWithDiff(docName: string, diff: Uint8Array): void { const calc = calculateDiff(diff); if (calc > 0) { - this.mdb.storeUpdate(docName, diff).catch((err) => this.logger.error(err)); + this.mdb.storeUpdateTransactional(docName, diff).catch((err) => this.logger.error(err)); } } @@ -36,13 +36,13 @@ export class TldrawBoardRepo { applyUpdate(ydoc, encodeStateAsUpdate(persistedYdoc)); ydoc.on('update', (update: Uint8Array) => { - this.mdb.storeUpdate(docName, update).catch((err) => this.logger.error(err)); + this.mdb.storeUpdateTransactional(docName, update).catch((err) => this.logger.error(err)); }); persistedYdoc.destroy(); } public async flushDocument(docName: string): Promise { - await this.mdb.flushDocument(docName); + await this.mdb.flushDocumentTransactional(docName); } } diff --git a/apps/server/src/modules/tldraw/repo/y-mongodb.ts b/apps/server/src/modules/tldraw/repo/y-mongodb.ts index ffa8a515ae0..89bff5902c2 100644 --- a/apps/server/src/modules/tldraw/repo/y-mongodb.ts +++ b/apps/server/src/modules/tldraw/repo/y-mongodb.ts @@ -4,17 +4,27 @@ import * as promise from 'lib0/promise'; import { applyUpdate, Doc, encodeStateAsUpdate, encodeStateVector } from 'yjs'; import { Injectable } from '@nestjs/common'; import { TldrawRepo } from '@modules/tldraw/repo/tldraw.repo'; -import { storeUpdate, flushDocument, getMongoUpdates, mergeUpdates } from '../utils'; +import { TldrawDrawing } from '@modules/tldraw/entities'; +import { Buffer } from 'buffer'; +import * as binary from 'lib0/binary'; +import * as encoding from 'lib0/encoding'; +import { LegacyLogger } from '@src/core/logger'; @Injectable() export class YMongodb { - private flushSize: number; + private MAX_DOCUMENT_SIZE = 15000000; + + private readonly flushSize: number; private tr = { string: Promise }; - private _transact; + private readonly _transact; - constructor(private readonly configService: ConfigService, private readonly repo: TldrawRepo) { + constructor( + private readonly configService: ConfigService, + private readonly repo: TldrawRepo, + private readonly logger: LegacyLogger + ) { this.flushSize = this.configService.get('TLDRAW_DB_FLUSH_SIZE') ?? 400; this._transact = (docName: string, f: (TldrawRepo) => Promise) => { @@ -35,8 +45,7 @@ export class YMongodb { try { res = await f(this.repo); } catch (err) { - // eslint-disable-next-line no-console - console.warn('Error during saving transaction', err); + this.logger.error('Error during saving transaction', err); } // once the last transaction for a given docName resolves, remove it from the queue @@ -69,8 +78,8 @@ export class YMongodb { getYDoc(docName: string): Promise { // eslint-disable-next-line @typescript-eslint/no-unsafe-call,@typescript-eslint/no-unsafe-return - return this._transact(docName, async (db: TldrawRepo): Promise => { - const updates = await getMongoUpdates(db, docName); + return this._transact(docName, async (): Promise => { + const updates = await this.getMongoUpdates(docName); const ydoc = new Doc(); ydoc.transact(() => { for (let i = 0; i < updates.length; i++) { @@ -81,23 +90,201 @@ export class YMongodb { } }); if (updates.length > this.flushSize) { - await flushDocument(db, docName, encodeStateAsUpdate(ydoc), encodeStateVector(ydoc)); + await this.flushDocument(docName, encodeStateAsUpdate(ydoc), encodeStateVector(ydoc)); } return ydoc; }); } - storeUpdate(docName: string, update: Uint8Array): Promise { + storeUpdateTransactional(docName: string, update: Uint8Array): Promise { // eslint-disable-next-line @typescript-eslint/no-unsafe-return,@typescript-eslint/no-unsafe-call - return this._transact(docName, (db: TldrawRepo) => storeUpdate(db, docName, update)); + return this._transact(docName, () => this.storeUpdate(docName, update)); } - flushDocument(docName: string) { + flushDocumentTransactional(docName: string) { // eslint-disable-next-line @typescript-eslint/no-unsafe-return,@typescript-eslint/no-unsafe-call - return this._transact(docName, async (db: TldrawRepo) => { - const updates = await getMongoUpdates(db, docName); - const { update, sv } = mergeUpdates(updates); - await flushDocument(db, docName, update, sv); + return this._transact(docName, async () => { + const updates = await this.getMongoUpdates(docName); + const { update, sv } = this.mergeUpdates(updates); + await this.flushDocument(docName, update, sv); + }); + } + + private async clearUpdatesRange(docName: string, from: number, to: number) { + return this.repo.del({ + docName, + clock: { + $gte: from, + $lt: to, + }, + }); + } + + private getMongoBulkData(query: object, opts: object) { + return this.repo.readAsCursor(query, opts); + } + + private mergeDocsTogether(doc: TldrawDrawing, docs: TldrawDrawing[], docIndex: number) { + const parts = [Buffer.from(doc.value.buffer)]; + let currentPartId: number | undefined = doc.part; + for (let i = docIndex + 1; i < docs.length; i++) { + const part = docs[i]; + if (part.clock === doc.clock) { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + if (currentPartId !== part.part - 1) { + throw new Error('Couldnt merge updates together because a part is missing!'); + } + parts.push(Buffer.from(part.value.buffer)); + currentPartId = part.part; + } else { + break; + } + } + + return parts; + } + + /** + * Convert the mongo document array to an array of values (as buffers) + */ + private convertMongoUpdates(docs: TldrawDrawing[]) { + if (!Array.isArray(docs) || !docs.length) return []; + + const updates: Buffer[] = []; + for (let i = 0; i < docs.length; i++) { + const doc = docs[i]; + if (!doc.part) { + updates.push(doc.value); + } else if (doc.part === 1) { + // merge the docs together that got split because of mongodb size limits + const parts = this.mergeDocsTogether(doc, docs, i); + updates.push(Buffer.concat(parts)); + } + } + return updates; + } + + /** + * Get all document updates for a specific document. + */ + private async getMongoUpdates(docName: string, opts = {}) { + const docs = await this.getMongoBulkData(this.createDocumentUpdateKey(docName), opts); + return this.convertMongoUpdates(docs); + } + + private getCurrentUpdateClock(docName: string) { + return this.getMongoBulkData( + { + ...this.createDocumentUpdateKey(docName, 0), + clock: { + $gte: 0, + $lt: binary.BITS32, + }, + }, + { reverse: true, limit: 1 } + ).then((updates) => { + if (updates.length === 0) { + return -1; + } + return updates[0].clock; + }); + } + + private async writeStateVector(docName: string, sv: Uint8Array, clock: number) { + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, clock); + encoding.writeVarUint8Array(encoder, sv); + await this.repo.put(this.createDocumentStateVectorKey(docName), { + value: Buffer.from(encoding.toUint8Array(encoder)), }); } + + private async storeUpdate(docName: string, update: Uint8Array) { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + const clock: number = await this.getCurrentUpdateClock(docName); + if (clock === -1) { + // make sure that a state vector is always written, so we can search for available documents + const ydoc = new Doc(); + applyUpdate(ydoc, update); + const sv = encodeStateVector(ydoc); + await this.writeStateVector(docName, sv, 0); + } + + const value = Buffer.from(update); + // if our buffer exceeds it, we store the update in multiple documents + if (value.length <= this.MAX_DOCUMENT_SIZE) { + await this.repo.put(this.createDocumentUpdateKey(docName, clock + 1), { + value, + }); + } else { + const totalChunks = Math.ceil(value.length / this.MAX_DOCUMENT_SIZE); + + const putPromises: Promise[] = []; + for (let i = 0; i < totalChunks; i++) { + const start = i * this.MAX_DOCUMENT_SIZE; + const end = Math.min(start + this.MAX_DOCUMENT_SIZE, value.length); + const chunk = value.subarray(start, end); + + putPromises.push( + this.repo.put({ ...this.createDocumentUpdateKey(docName, clock + 1), part: i + 1 }, { value: chunk }) + ); + } + + await Promise.all(putPromises); + } + + return clock + 1; + } + + /** + * For now this is a helper method that creates a Y.Doc and then re-encodes a document update. + * In the future this will be handled by Yjs without creating a Y.Doc (constant memory consumption). + */ + private mergeUpdates(updates: Array) { + const ydoc = new Doc(); + ydoc.transact(() => { + for (const element of updates) { + applyUpdate(ydoc, element); + } + }); + return { update: encodeStateAsUpdate(ydoc), sv: encodeStateVector(ydoc) }; + } + + /** + * Merge all MongoDB documents of the same yjs document together. + */ + private async flushDocument(docName: string, stateAsUpdate: Uint8Array, stateVector: Uint8Array) { + const clock = await this.storeUpdate(docName, stateAsUpdate); + await this.writeStateVector(docName, stateVector, clock); + await this.clearUpdatesRange(docName, 0, clock); + return clock; + } + + /** + * Create a unique key for a update message. + */ + private createDocumentUpdateKey(docName: string, clock?: number) { + if (clock !== undefined) { + return { + version: 'v1', + action: 'update', + docName, + clock, + }; + } + return { + version: 'v1', + action: 'update', + docName, + }; + } + + private createDocumentStateVectorKey(docName: string) { + return { + docName, + version: 'v1_sv', + }; + } } diff --git a/apps/server/src/modules/tldraw/utils/index.ts b/apps/server/src/modules/tldraw/utils/index.ts index 0f9fa053010..a51b9059bc1 100644 --- a/apps/server/src/modules/tldraw/utils/index.ts +++ b/apps/server/src/modules/tldraw/utils/index.ts @@ -1,2 +1 @@ export * from './ydoc-utils'; -export * from './y-mongodb-utils'; diff --git a/apps/server/src/modules/tldraw/utils/y-mongodb-utils.ts b/apps/server/src/modules/tldraw/utils/y-mongodb-utils.ts deleted file mode 100644 index 2e710497823..00000000000 --- a/apps/server/src/modules/tldraw/utils/y-mongodb-utils.ts +++ /dev/null @@ -1,191 +0,0 @@ -import { TldrawDrawing } from '@modules/tldraw/entities'; -import * as binary from 'lib0/binary'; -import * as encoding from 'lib0/encoding'; -import { Buffer } from 'buffer'; -import { applyUpdate, Doc, encodeStateAsUpdate, encodeStateVector } from 'yjs'; -import { TldrawRepo } from '@modules/tldraw/repo/tldraw.repo'; - -const MAX_DOCUMENT_SIZE = 15000000; - -/** - * Remove all documents from db with Clock between $from and $to - */ -export const clearUpdatesRange = async (db: TldrawRepo, docName: string, from: number, to: number) => - db.del({ - docName, - clock: { - $gte: from, - $lt: to, - }, - }); - -/** - * Create a unique key for a update message. - */ -export const createDocumentUpdateKey = (docName: string, clock?: number) => { - if (clock !== undefined) { - return { - version: 'v1', - action: 'update', - docName, - clock, - }; - } - return { - version: 'v1', - action: 'update', - docName, - }; -}; - -export const createDocumentStateVectorKey = (docName: string) => { - return { - docName, - version: 'v1_sv', - }; -}; - -export const getMongoBulkData = (db: TldrawRepo, query: object, opts: object) => db.readAsCursor(query, opts); - -const mergeDocsTogether = (doc: TldrawDrawing, docs: TldrawDrawing[], docIndex: number) => { - const parts = [Buffer.from(doc.value.buffer)]; - let currentPartId: number | undefined = doc.part; - for (let i = docIndex + 1; i < docs.length; i++) { - const part = docs[i]; - if (part.clock === doc.clock) { - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - if (currentPartId !== part.part - 1) { - throw new Error('Couldnt merge updates together because a part is missing!'); - } - parts.push(Buffer.from(part.value.buffer)); - currentPartId = part.part; - } else { - break; - } - } - - return parts; -}; - -/** - * Convert the mongo document array to an array of values (as buffers) - */ -const convertMongoUpdates = (docs: TldrawDrawing[]) => { - if (!Array.isArray(docs) || !docs.length) return []; - - const updates: Buffer[] = []; - for (let i = 0; i < docs.length; i++) { - const doc = docs[i]; - if (!doc.part) { - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - updates.push(doc.value.buffer); - } else if (doc.part === 1) { - // merge the docs together that got split because of mongodb size limits - const parts = mergeDocsTogether(doc, docs, i); - updates.push(Buffer.concat(parts)); - } - } - return updates; -}; - -/** - * Get all document updates for a specific document. - */ -export const getMongoUpdates = async (db: TldrawRepo, docName: string, opts = {}) => { - const docs = await getMongoBulkData(db, createDocumentUpdateKey(docName), opts); - return convertMongoUpdates(docs); -}; - -export const getCurrentUpdateClock = (db: TldrawRepo, docName: string) => - getMongoBulkData( - db, - { - ...createDocumentUpdateKey(docName, 0), - clock: { - $gte: 0, - $lt: binary.BITS32, - }, - }, - { reverse: true, limit: 1 } - ).then((updates) => { - if (updates.length === 0) { - return -1; - } - return updates[0].clock; - }); - -export const writeStateVector = async (db: TldrawRepo, docName: string, sv: Uint8Array, clock: number) => { - const encoder = encoding.createEncoder(); - encoding.writeVarUint(encoder, clock); - encoding.writeVarUint8Array(encoder, sv); - await db.put(createDocumentStateVectorKey(docName), { - value: Buffer.from(encoding.toUint8Array(encoder)), - }); -}; - -export const storeUpdate = async (db: TldrawRepo, docName: string, update: Uint8Array) => { - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - const clock: number = await getCurrentUpdateClock(db, docName); - if (clock === -1) { - // make sure that a state vector is always written, so we can search for available documents - const ydoc = new Doc(); - applyUpdate(ydoc, update); - const sv = encodeStateVector(ydoc); - await writeStateVector(db, docName, sv, 0); - } - - const value = Buffer.from(update); - // if our buffer exceeds it, we store the update in multiple documents - if (value.length <= MAX_DOCUMENT_SIZE) { - await db.put(createDocumentUpdateKey(docName, clock + 1), { - value, - }); - } else { - const totalChunks = Math.ceil(value.length / MAX_DOCUMENT_SIZE); - - const putPromises: Promise[] = []; - for (let i = 0; i < totalChunks; i++) { - const start = i * MAX_DOCUMENT_SIZE; - const end = Math.min(start + MAX_DOCUMENT_SIZE, value.length); - const chunk = value.subarray(start, end); - - putPromises.push(db.put({ ...createDocumentUpdateKey(docName, clock + 1), part: i + 1 }, { value: chunk })); - } - - await Promise.all(putPromises); - } - - return clock + 1; -}; - -/** - * For now this is a helper method that creates a Y.Doc and then re-encodes a document update. - * In the future this will be handled by Yjs without creating a Y.Doc (constant memory consumption). - */ -export const mergeUpdates = (updates: Array) => { - const ydoc = new Doc(); - ydoc.transact(() => { - for (const element of updates) { - applyUpdate(ydoc, element); - } - }); - return { update: encodeStateAsUpdate(ydoc), sv: encodeStateVector(ydoc) }; -}; - -/** - * Merge all MongoDB documents of the same yjs document together. - */ -export const flushDocument = async ( - db: TldrawRepo, - docName: string, - stateAsUpdate: Uint8Array, - stateVector: Uint8Array -) => { - const clock = await storeUpdate(db, docName, stateAsUpdate); - await writeStateVector(db, docName, stateVector, clock); - await clearUpdatesRange(db, docName, 0, clock); - return clock; -};