Skip to content

Commit

Permalink
code refactor v2
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomasz Wiaderek authored and Tomasz Wiaderek committed Nov 26, 2023
1 parent 20f370d commit a67ace0
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 221 deletions.
15 changes: 5 additions & 10 deletions apps/server/src/modules/tldraw/repo/tldraw-board.repo.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ describe('TldrawBoardRepo', () => {
it('should check if repo and its properties are set correctly', () => {
expect(repo).toBeDefined();
expect(repo.mdb).toBeDefined();
expect(repo.configService).toBeDefined();
expect(repo.flushSize).toBeDefined();
expect(repo.multipleCollections).toBeDefined();
expect(repo.connectionString).toBeDefined();
expect(repo.collectionName).toBeDefined();
});

describe('updateDocument', () => {
Expand All @@ -75,7 +70,7 @@ describe('TldrawBoardRepo', () => {
const storeGetYDocSpy = jest
.spyOn(repo.mdb, 'getYDoc')
.mockImplementation(() => Promise.resolve(new WsSharedDocDo('TEST', service)));
const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdate').mockImplementation(() => Promise.resolve(1));
const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdateTransactional').mockImplementation(() => Promise.resolve(1));

Check failure on line 73 in apps/server/src/modules/tldraw/repo/tldraw-board.repo.spec.ts

View workflow job for this annotation

GitHub Actions / nest_lint

Replace `.spyOn(repo.mdb,·'storeUpdateTransactional')` with `⏎↹↹↹↹↹.spyOn(repo.mdb,·'storeUpdateTransactional')⏎↹↹↹↹↹`

return {
doc,
Expand Down Expand Up @@ -105,7 +100,7 @@ describe('TldrawBoardRepo', () => {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
doc.conns.set(ws, wsSet);
const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdate').mockImplementation(() => Promise.resolve(1));
const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdateTransactional').mockImplementation(() => Promise.resolve(1));

Check failure on line 103 in apps/server/src/modules/tldraw/repo/tldraw-board.repo.spec.ts

View workflow job for this annotation

GitHub Actions / nest_lint

Replace `.spyOn(repo.mdb,·'storeUpdateTransactional')` with `⏎↹↹↹↹↹.spyOn(repo.mdb,·'storeUpdateTransactional')⏎↹↹↹↹↹`
const storeGetYDocSpy = jest
.spyOn(repo.mdb, 'getYDoc')
.mockImplementation(() => Promise.resolve(new WsSharedDocDo('TEST', service)));
Expand Down Expand Up @@ -158,7 +153,7 @@ describe('TldrawBoardRepo', () => {
describe('when the difference between update and current drawing is more than 0', () => {
const setup = () => {
const calculateDiffSpy = jest.spyOn(YjsUtils, 'calculateDiff').mockImplementationOnce(() => 1);
const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdate').mockResolvedValueOnce(Promise.resolve(1));
const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdateTransactional').mockResolvedValueOnce(Promise.resolve(1));

Check failure on line 156 in apps/server/src/modules/tldraw/repo/tldraw-board.repo.spec.ts

View workflow job for this annotation

GitHub Actions / nest_lint

Replace `.spyOn(repo.mdb,·'storeUpdateTransactional')` with `⏎↹↹↹↹↹.spyOn(repo.mdb,·'storeUpdateTransactional')⏎↹↹↹↹↹`

return {
calculateDiffSpy,
Expand All @@ -181,7 +176,7 @@ describe('TldrawBoardRepo', () => {
describe('when the difference between update and current drawing is 0', () => {
const setup = () => {
const calculateDiffSpy = jest.spyOn(YjsUtils, 'calculateDiff').mockImplementationOnce(() => 0);
const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdate');
const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdateTransactional');

return {
calculateDiffSpy,
Expand All @@ -204,7 +199,7 @@ describe('TldrawBoardRepo', () => {

describe('flushDocument', () => {
const setup = () => {
const flushDocumentSpy = jest.spyOn(repo.mdb, 'flushDocument').mockResolvedValueOnce(Promise.resolve());
const flushDocumentSpy = jest.spyOn(repo.mdb, 'flushDocumentTransactional').mockResolvedValueOnce(Promise.resolve());

Check failure on line 202 in apps/server/src/modules/tldraw/repo/tldraw-board.repo.spec.ts

View workflow job for this annotation

GitHub Actions / nest_lint

Replace `.spyOn(repo.mdb,·'flushDocumentTransactional')` with `⏎↹↹↹↹.spyOn(repo.mdb,·'flushDocumentTransactional')⏎↹↹↹↹`

return { flushDocumentSpy };
};
Expand Down
6 changes: 3 additions & 3 deletions apps/server/src/modules/tldraw/repo/tldraw-board.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class TldrawBoardRepo {
public updateStoredDocWithDiff(docName: string, diff: Uint8Array): void {
const calc = calculateDiff(diff);
if (calc > 0) {
this.mdb.storeUpdate(docName, diff).catch((err) => this.logger.error(err));
this.mdb.storeUpdateTransactional(docName, diff).catch((err) => this.logger.error(err));
}
}

Expand All @@ -36,13 +36,13 @@ export class TldrawBoardRepo {
applyUpdate(ydoc, encodeStateAsUpdate(persistedYdoc));

ydoc.on('update', (update: Uint8Array) => {
this.mdb.storeUpdate(docName, update).catch((err) => this.logger.error(err));
this.mdb.storeUpdateTransactional(docName, update).catch((err) => this.logger.error(err));
});

persistedYdoc.destroy();
}

public async flushDocument(docName: string): Promise<void> {
await this.mdb.flushDocument(docName);
await this.mdb.flushDocumentTransactional(docName);
}
}
219 changes: 203 additions & 16 deletions apps/server/src/modules/tldraw/repo/y-mongodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,27 @@ import * as promise from 'lib0/promise';
import { applyUpdate, Doc, encodeStateAsUpdate, encodeStateVector } from 'yjs';
import { Injectable } from '@nestjs/common';
import { TldrawRepo } from '@modules/tldraw/repo/tldraw.repo';
import { storeUpdate, flushDocument, getMongoUpdates, mergeUpdates } from '../utils';
import { TldrawDrawing } from '@modules/tldraw/entities';
import { Buffer } from 'buffer';
import * as binary from 'lib0/binary';
import * as encoding from 'lib0/encoding';
import { LegacyLogger } from '@src/core/logger';

@Injectable()
export class YMongodb {
private flushSize: number;
private MAX_DOCUMENT_SIZE = 15000000;

private readonly flushSize: number;

private tr = { string: Promise<never> };

private _transact;
private readonly _transact;

constructor(private readonly configService: ConfigService<TldrawConfig, true>, private readonly repo: TldrawRepo) {
constructor(
private readonly configService: ConfigService<TldrawConfig, true>,
private readonly repo: TldrawRepo,
private readonly logger: LegacyLogger
) {
this.flushSize = this.configService.get<number>('TLDRAW_DB_FLUSH_SIZE') ?? 400;

this._transact = <T>(docName: string, f: (TldrawRepo) => Promise<T>) => {
Expand All @@ -35,8 +45,7 @@ export class YMongodb {
try {
res = await f(this.repo);
} catch (err) {
// eslint-disable-next-line no-console
console.warn('Error during saving transaction', err);
this.logger.error('Error during saving transaction', err);
}

// once the last transaction for a given docName resolves, remove it from the queue
Expand Down Expand Up @@ -69,8 +78,8 @@ export class YMongodb {

getYDoc(docName: string): Promise<Doc> {
// eslint-disable-next-line @typescript-eslint/no-unsafe-call,@typescript-eslint/no-unsafe-return
return this._transact(docName, async (db: TldrawRepo): Promise<Doc> => {
const updates = await getMongoUpdates(db, docName);
return this._transact(docName, async (): Promise<Doc> => {
const updates = await this.getMongoUpdates(docName);
const ydoc = new Doc();
ydoc.transact(() => {
for (let i = 0; i < updates.length; i++) {

Check failure on line 85 in apps/server/src/modules/tldraw/repo/y-mongodb.ts

View workflow job for this annotation

GitHub Actions / nest_lint

Unary operator '++' used
Expand All @@ -81,23 +90,201 @@ export class YMongodb {
}
});
if (updates.length > this.flushSize) {
await flushDocument(db, docName, encodeStateAsUpdate(ydoc), encodeStateVector(ydoc));
await this.flushDocument(docName, encodeStateAsUpdate(ydoc), encodeStateVector(ydoc));
}
return ydoc;
});
}

storeUpdate(docName: string, update: Uint8Array): Promise<number> {
storeUpdateTransactional(docName: string, update: Uint8Array): Promise<number> {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return,@typescript-eslint/no-unsafe-call
return this._transact(docName, (db: TldrawRepo) => storeUpdate(db, docName, update));
return this._transact(docName, () => this.storeUpdate(docName, update));
}

flushDocument(docName: string) {
flushDocumentTransactional(docName: string) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return,@typescript-eslint/no-unsafe-call
return this._transact(docName, async (db: TldrawRepo) => {
const updates = await getMongoUpdates(db, docName);
const { update, sv } = mergeUpdates(updates);
await flushDocument(db, docName, update, sv);
return this._transact(docName, async () => {
const updates = await this.getMongoUpdates(docName);
const { update, sv } = this.mergeUpdates(updates);
await this.flushDocument(docName, update, sv);
});
}

private async clearUpdatesRange(docName: string, from: number, to: number) {
return this.repo.del({
docName,
clock: {
$gte: from,
$lt: to,
},
});
}

private getMongoBulkData(query: object, opts: object) {
return this.repo.readAsCursor(query, opts);
}

private mergeDocsTogether(doc: TldrawDrawing, docs: TldrawDrawing[], docIndex: number) {
const parts = [Buffer.from(doc.value.buffer)];
let currentPartId: number | undefined = doc.part;
for (let i = docIndex + 1; i < docs.length; i++) {

Check failure on line 130 in apps/server/src/modules/tldraw/repo/y-mongodb.ts

View workflow job for this annotation

GitHub Actions / nest_lint

Unary operator '++' used
const part = docs[i];
if (part.clock === doc.clock) {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
if (currentPartId !== part.part - 1) {
throw new Error('Couldnt merge updates together because a part is missing!');
}
parts.push(Buffer.from(part.value.buffer));
currentPartId = part.part;
} else {
break;
}
}

return parts;
}

/**
* Convert the mongo document array to an array of values (as buffers)
*/
private convertMongoUpdates(docs: TldrawDrawing[]) {
if (!Array.isArray(docs) || !docs.length) return [];

const updates: Buffer[] = [];
for (let i = 0; i < docs.length; i++) {

Check failure on line 155 in apps/server/src/modules/tldraw/repo/y-mongodb.ts

View workflow job for this annotation

GitHub Actions / nest_lint

Unary operator '++' used
const doc = docs[i];
if (!doc.part) {
updates.push(doc.value);
} else if (doc.part === 1) {
// merge the docs together that got split because of mongodb size limits
const parts = this.mergeDocsTogether(doc, docs, i);
updates.push(Buffer.concat(parts));
}
}
return updates;
}

/**
* Get all document updates for a specific document.
*/
private async getMongoUpdates(docName: string, opts = {}) {
const docs = await this.getMongoBulkData(this.createDocumentUpdateKey(docName), opts);
return this.convertMongoUpdates(docs);
}

private getCurrentUpdateClock(docName: string) {
return this.getMongoBulkData(
{
...this.createDocumentUpdateKey(docName, 0),
clock: {
$gte: 0,
$lt: binary.BITS32,
},
},
{ reverse: true, limit: 1 }
).then((updates) => {
if (updates.length === 0) {
return -1;
}
return updates[0].clock;
});
}

private async writeStateVector(docName: string, sv: Uint8Array, clock: number) {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, clock);
encoding.writeVarUint8Array(encoder, sv);
await this.repo.put(this.createDocumentStateVectorKey(docName), {
value: Buffer.from(encoding.toUint8Array(encoder)),
});
}

private async storeUpdate(docName: string, update: Uint8Array) {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
const clock: number = await this.getCurrentUpdateClock(docName);
if (clock === -1) {
// make sure that a state vector is always written, so we can search for available documents
const ydoc = new Doc();
applyUpdate(ydoc, update);
const sv = encodeStateVector(ydoc);
await this.writeStateVector(docName, sv, 0);
}

const value = Buffer.from(update);
// if our buffer exceeds it, we store the update in multiple documents
if (value.length <= this.MAX_DOCUMENT_SIZE) {
await this.repo.put(this.createDocumentUpdateKey(docName, clock + 1), {
value,
});
} else {
const totalChunks = Math.ceil(value.length / this.MAX_DOCUMENT_SIZE);

const putPromises: Promise<any>[] = [];
for (let i = 0; i < totalChunks; i++) {

Check failure on line 225 in apps/server/src/modules/tldraw/repo/y-mongodb.ts

View workflow job for this annotation

GitHub Actions / nest_lint

Unary operator '++' used
const start = i * this.MAX_DOCUMENT_SIZE;
const end = Math.min(start + this.MAX_DOCUMENT_SIZE, value.length);
const chunk = value.subarray(start, end);

putPromises.push(
this.repo.put({ ...this.createDocumentUpdateKey(docName, clock + 1), part: i + 1 }, { value: chunk })
);
}

await Promise.all(putPromises);
}

return clock + 1;
}

/**
* For now this is a helper method that creates a Y.Doc and then re-encodes a document update.
* In the future this will be handled by Yjs without creating a Y.Doc (constant memory consumption).
*/
private mergeUpdates(updates: Array<Uint8Array>) {
const ydoc = new Doc();
ydoc.transact(() => {
for (const element of updates) {
applyUpdate(ydoc, element);
}
});
return { update: encodeStateAsUpdate(ydoc), sv: encodeStateVector(ydoc) };
}

/**
* Merge all MongoDB documents of the same yjs document together.
*/
private async flushDocument(docName: string, stateAsUpdate: Uint8Array, stateVector: Uint8Array) {
const clock = await this.storeUpdate(docName, stateAsUpdate);
await this.writeStateVector(docName, stateVector, clock);
await this.clearUpdatesRange(docName, 0, clock);
return clock;
}

/**
* Create a unique key for a update message.
*/
private createDocumentUpdateKey(docName: string, clock?: number) {
if (clock !== undefined) {
return {
version: 'v1',
action: 'update',
docName,
clock,
};
}
return {
version: 'v1',
action: 'update',
docName,
};
}

private createDocumentStateVectorKey(docName: string) {
return {
docName,
version: 'v1_sv',
};
}
}
1 change: 0 additions & 1 deletion apps/server/src/modules/tldraw/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export * from './ydoc-utils';
export * from './y-mongodb-utils';
Loading

0 comments on commit a67ace0

Please sign in to comment.