From 95adff514375cbe1489b0fa290b9dcb97fd81129 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 15 Jan 2025 17:40:49 -0500 Subject: [PATCH 1/8] feat(NODE-6633): MongoClient.close closes active cursors --- src/cursor/abstract_cursor.ts | 29 ++++++-- src/mongo_client.ts | 8 +++ .../node-specific/abstract_cursor.test.ts | 43 ++++++++++++ .../node-specific/mongo_client.test.ts | 68 ++++++++++++++++++- 4 files changed, 138 insertions(+), 10 deletions(-) diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index feaf07347c..517a2778eb 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -67,6 +67,10 @@ export interface CursorStreamOptions { /** @public */ export type CursorFlag = (typeof CURSOR_FLAGS)[number]; +function removeActiveCursor(this: AbstractCursor) { + this.client.s.activeCursors.delete(this); +} + /** * @public * @experimental @@ -268,6 +272,10 @@ export abstract class AbstractCursor< throw new MongoRuntimeError('Cursor must be constructed with MongoClient'); } this.cursorClient = client; + + this.cursorClient.s.activeCursors.add(this); + this.once('close', removeActiveCursor); + this.cursorNamespace = namespace; this.cursorId = null; this.initialized = false; @@ -859,6 +867,11 @@ export abstract class AbstractCursor< this.isKilled = false; this.initialized = false; + this.cursorClient.s.activeCursors.add(this); + if (!this.listeners('close').includes(removeActiveCursor)) { + this.once('close', removeActiveCursor); + } + const session = this.cursorSession; if (session) { // We only want to end this session if we created it, and it hasn't ended yet @@ -1043,14 +1056,16 @@ export abstract class AbstractCursor< } catch (error) { squashError(error); } finally { - if (session?.owner === this) { - await session.endSession({ error }); - } - if (!session?.inTransaction()) { - maybeClearPinnedConnection(session, { error }); + try { + if (session?.owner === this) { + await session.endSession({ error }); + } + if (!session?.inTransaction()) { + maybeClearPinnedConnection(session, { error }); + } + } finally { + this.emitClose(); } - - this.emitClose(); } } diff --git a/src/mongo_client.ts b/src/mongo_client.ts index ab5ee69139..c0e0017a64 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -18,6 +18,7 @@ import type { ClientMetadata } from './cmap/handshake/client_metadata'; import type { CompressorName } from './cmap/wire_protocol/compression'; import { parseOptions, resolveSRVRecord } from './connection_string'; import { MONGO_CLIENT_EVENTS } from './constants'; +import { type AbstractCursor } from './cursor/abstract_cursor'; import { Db, type DbOptions } from './db'; import type { Encrypter } from './encrypter'; import { MongoInvalidArgumentError } from './error'; @@ -323,6 +324,7 @@ export interface MongoClientPrivate { * - used to notify the leak checker in our tests if test author forgot to clean up explicit sessions */ readonly activeSessions: Set; + readonly activeCursors: Set; readonly sessionPool: ServerSessionPool; readonly options: MongoOptions; readonly readConcern?: ReadConcern; @@ -398,6 +400,7 @@ export class MongoClient extends TypedEventEmitter implements hasBeenClosed: false, sessionPool: new ServerSessionPool(this), activeSessions: new Set(), + activeCursors: new Set(), authProviders: new MongoClientAuthProviders(), get options() { @@ -650,6 +653,11 @@ export class MongoClient extends TypedEventEmitter implements writable: false }); + const activeCursorCloses = Array.from(this.s.activeCursors, cursor => cursor.close()); + this.s.activeCursors.clear(); + + await Promise.all(activeCursorCloses); + const activeSessionEnds = Array.from(this.s.activeSessions, session => session.endSession()); this.s.activeSessions.clear(); diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index ac060c9d45..2ca0459419 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -556,4 +556,47 @@ describe('class AbstractCursor', function () { ); }); }); + + describe('cursor tracking', () => { + let client: MongoClient; + let collection: Collection; + + beforeEach(async function () { + client = this.configuration.newClient(); + collection = client.db('activeCursors').collection('activeCursors'); + await collection.drop().catch(() => null); + await collection.insertMany(Array.from({ length: 50 }, (_, i) => ({ i }))); + }); + + afterEach(async function () { + await client.close(); + }); + + it('adds itself to a set upon construction', () => { + collection.find({}, { batchSize: 1 }); + expect(client.s.activeCursors).to.have.lengthOf(1); + }); + + it('adds itself to a set upon rewind', async () => { + const cursor = collection.find({}, { batchSize: 1 }); + await cursor.next(); + expect(client.s.activeCursors).to.have.lengthOf(1); + await cursor.close(); + expect(client.s.activeCursors).to.have.lengthOf(0); + cursor.rewind(); + expect(client.s.activeCursors).to.have.lengthOf(1); + }); + + it('does not add more than one close listener', async () => { + const cursor = collection.find({}, { batchSize: 1 }); + await cursor.next(); + expect(cursor.listeners('close')).to.have.lengthOf(1); + await cursor.close(); + expect(cursor.listeners('close')).to.have.lengthOf(0); + cursor.rewind(); + cursor.rewind(); + cursor.rewind(); + expect(cursor.listeners('close')).to.have.lengthOf(1); + }); + }); }); diff --git a/test/integration/node-specific/mongo_client.test.ts b/test/integration/node-specific/mongo_client.test.ts index 059d9d82c2..744c221597 100644 --- a/test/integration/node-specific/mongo_client.test.ts +++ b/test/integration/node-specific/mongo_client.test.ts @@ -4,6 +4,7 @@ import * as net from 'net'; import * as sinon from 'sinon'; import { + type Collection, type CommandFailedEvent, type CommandStartedEvent, type CommandSucceededEvent, @@ -31,7 +32,6 @@ describe('class MongoClient', function () { afterEach(async () => { sinon.restore(); await client?.close(); - // @ts-expect-error: Put this variable back to undefined to force tests to make their own client client = undefined; }); @@ -567,7 +567,44 @@ describe('class MongoClient', function () { }); }); - context('#close()', () => { + describe('active cursors', function () { + let client: MongoClient; + let collection: Collection<{ _id: number }>; + const kills = []; + + beforeEach(async function () { + client = this.configuration.newClient(); + collection = client.db('activeCursors').collection('activeCursors'); + await collection.drop().catch(() => null); + await collection.insertMany(Array.from({ length: 50 }, (_, _id) => ({ _id }))); + + kills.length = 0; + client.on('commandStarted', ev => ev.commandName === 'killCursors' && kills.push(ev)); + }); + + afterEach(async function () { + await client.close(); + }); + + it('are tracked upon creation and removed upon exhaustion', async () => { + const cursors = Array.from({ length: 30 }, (_, skip) => + collection.find({}, { skip, batchSize: 1 }) + ); + expect(client.s.activeCursors).to.have.lengthOf(30); + await Promise.all(cursors.map(c => c.toArray())); + expect(client.s.activeCursors).to.have.lengthOf(0); + expect(kills).to.have.lengthOf(0); + }); + + it('are removed from tracking if exhausted in first batch', async () => { + const cursors = Array.from({ length: 30 }, () => collection.find()); + expect(client.s.activeCursors).to.have.lengthOf(30); + await Promise.all(cursors.map(c => c.next())); // only one document pulled from each. + expect(client.s.activeCursors).to.have.lengthOf(0); + }); + }); + + describe('#close()', () => { let client: MongoClient; let db: Db; @@ -702,7 +739,7 @@ describe('class MongoClient', function () { expect(endEvents[0]).to.have.property('reply', undefined); // noReponse: true }); - context('when server selection would return no servers', () => { + describe('when server selection would return no servers', () => { const serverDescription = new ServerDescription('a:1'); it('short circuits and does not end sessions', async () => { @@ -722,6 +759,31 @@ describe('class MongoClient', function () { expect(client.s.sessionPool.sessions).to.have.lengthOf(1); }); }); + + describe('active cursors', function () { + let collection: Collection<{ _id: number }>; + const kills = []; + + beforeEach(async () => { + collection = client.db('test').collection('activeCursors'); + await collection.drop().catch(() => null); + await collection.insertMany(Array.from({ length: 50 }, (_, _id) => ({ _id }))); + + kills.length = 0; + client.on('commandStarted', ev => ev.commandName === 'killCursors' && kills.push(ev)); + }); + + it('are all closed', async () => { + const cursors = Array.from({ length: 30 }, (_, skip) => + collection.find({}, { skip, batchSize: 1 }) + ); + await Promise.all(cursors.map(c => c.next())); + expect(client.s.activeCursors).to.have.lengthOf(30); + await client.close(); + expect(client.s.activeCursors).to.have.lengthOf(0); + expect(kills).to.have.lengthOf(30); + }); + }); }); context('when connecting', function () { From 69c4a1a918ad767c4ac37de2f9fa0ef30a44a790 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 16 Jan 2025 16:23:50 -0500 Subject: [PATCH 2/8] fix: track position, helper, tailable interrupt --- src/cursor/abstract_cursor.ts | 45 ++++++++++------------ test/integration/crud/misc_cursors.test.js | 4 +- 2 files changed, 23 insertions(+), 26 deletions(-) diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 517a2778eb..f504c33479 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -272,10 +272,6 @@ export abstract class AbstractCursor< throw new MongoRuntimeError('Cursor must be constructed with MongoClient'); } this.cursorClient = client; - - this.cursorClient.s.activeCursors.add(this); - this.once('close', removeActiveCursor); - this.cursorNamespace = namespace; this.cursorId = null; this.initialized = false; @@ -373,6 +369,7 @@ export abstract class AbstractCursor< this.signal, () => void this.close().then(undefined, squashError) ); + this.trackCursor(); } /** @@ -452,6 +449,14 @@ export abstract class AbstractCursor< await this.close(); } + /** Adds cursor to client's tracking so it will be closed by MongoClient.close() */ + private trackCursor() { + this.cursorClient.s.activeCursors.add(this); + if (!this.listeners('close').includes(removeActiveCursor)) { + this.once('close', removeActiveCursor); + } + } + /** Returns current buffered documents length */ bufferedCount(): number { return this.documents?.length ?? 0; @@ -866,21 +871,14 @@ export abstract class AbstractCursor< this.isClosed = false; this.isKilled = false; this.initialized = false; + this.trackCursor(); - this.cursorClient.s.activeCursors.add(this); - if (!this.listeners('close').includes(removeActiveCursor)) { - this.once('close', removeActiveCursor); - } - - const session = this.cursorSession; - if (session) { - // We only want to end this session if we created it, and it hasn't ended yet - if (session.explicit === false) { - if (!session.hasEnded) { - session.endSession().then(undefined, squashError); - } - this.cursorSession = this.cursorClient.startSession({ owner: this, explicit: false }); + // We only want to end this session if we created it, and it hasn't ended yet + if (this.cursorSession.explicit === false) { + if (!this.cursorSession.hasEnded) { + this.cursorSession.endSession().then(undefined, squashError); } + this.cursorSession = this.cursorClient.startSession({ owner: this, explicit: false }); } } @@ -1017,7 +1015,6 @@ export abstract class AbstractCursor< private async cleanup(timeoutMS?: number, error?: Error) { this.abortListener?.[kDispose](); this.isClosed = true; - const session = this.cursorSession; const timeoutContextForKillCursors = (): CursorTimeoutContext | undefined => { if (timeoutMS != null) { this.timeoutContext?.clear(); @@ -1039,7 +1036,7 @@ export abstract class AbstractCursor< !this.cursorId.isZero() && this.cursorNamespace && this.selectedServer && - !session.hasEnded + !this.cursorSession.hasEnded ) { this.isKilled = true; const cursorId = this.cursorId; @@ -1048,7 +1045,7 @@ export abstract class AbstractCursor< await executeOperation( this.cursorClient, new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, { - session + session: this.cursorSession }), timeoutContextForKillCursors() ); @@ -1057,11 +1054,11 @@ export abstract class AbstractCursor< squashError(error); } finally { try { - if (session?.owner === this) { - await session.endSession({ error }); + if (this.cursorSession?.owner === this) { + await this.cursorSession.endSession({ error }); } - if (!session?.inTransaction()) { - maybeClearPinnedConnection(session, { error }); + if (!this.cursorSession?.inTransaction()) { + maybeClearPinnedConnection(this.cursorSession, { error }); } } finally { this.emitClose(); diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index b8de060b6b..add879e96f 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -10,7 +10,7 @@ const sinon = require('sinon'); const { Writable } = require('stream'); const { once, on } = require('events'); const { setTimeout } = require('timers'); -const { ReadPreference, MongoExpiredSessionError } = require('../../mongodb'); +const { ReadPreference } = require('../../mongodb'); const { ServerType } = require('../../mongodb'); const { formatSort } = require('../../mongodb'); @@ -1872,7 +1872,7 @@ describe('Cursor', function () { expect(cursor).to.have.property('closed', true); const error = await rejectedEarlyBecauseClientClosed; - expect(error).to.be.instanceOf(MongoExpiredSessionError); + expect(error).to.be.null; // TODO: is this API or just "how it behaved at the time" }); it('shouldAwaitData', { From d644bd258d19cc8892ca51c34432e8ba923829e2 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Tue, 21 Jan 2025 16:30:41 -0500 Subject: [PATCH 3/8] test: rewind and creation after close --- .../node-specific/mongo_client.test.ts | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/integration/node-specific/mongo_client.test.ts b/test/integration/node-specific/mongo_client.test.ts index 744c221597..f1a095d79e 100644 --- a/test/integration/node-specific/mongo_client.test.ts +++ b/test/integration/node-specific/mongo_client.test.ts @@ -783,6 +783,24 @@ describe('class MongoClient', function () { expect(client.s.activeCursors).to.have.lengthOf(0); expect(kills).to.have.lengthOf(30); }); + + it('creating cursors after close adds to activeCursors', async () => { + expect(client.s.activeCursors).to.have.lengthOf(0); + await client.close(); + collection.find({}); + expect(client.s.activeCursors).to.have.lengthOf(1); + }); + + it('rewinding cursors after close adds to activeCursors', async () => { + expect(client.s.activeCursors).to.have.lengthOf(0); + const cursor = collection.find({}, { batchSize: 1 }); + await cursor.next(); + expect(client.s.activeCursors).to.have.lengthOf(1); + await client.close(); + expect(client.s.activeCursors).to.have.lengthOf(0); + cursor.rewind(); + expect(client.s.activeCursors).to.have.lengthOf(1); + }); }); }); From d88e0d3061004f3d10354cecf0927526e734b240 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Tue, 21 Jan 2025 16:33:23 -0500 Subject: [PATCH 4/8] chore: update todo --- test/integration/crud/misc_cursors.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index add879e96f..efe873c2b7 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -1872,7 +1872,7 @@ describe('Cursor', function () { expect(cursor).to.have.property('closed', true); const error = await rejectedEarlyBecauseClientClosed; - expect(error).to.be.null; // TODO: is this API or just "how it behaved at the time" + expect(error).to.be.null; // TODO(NODE-6632): This should throw again after the client signal aborts the in-progress next call }); it('shouldAwaitData', { From 303beed3ffd9f5c30bf2aee279bbf35ece46a246 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Tue, 21 Jan 2025 20:25:58 -0500 Subject: [PATCH 5/8] fix rewind not emitting close --- src/cursor/abstract_cursor.ts | 1 + .../crud/find_cursor_methods.test.js | 21 ++++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index f504c33479..1758e80c24 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -871,6 +871,7 @@ export abstract class AbstractCursor< this.isClosed = false; this.isKilled = false; this.initialized = false; + this.hasEmittedClose = false; this.trackCursor(); // We only want to end this session if we created it, and it hasn't ended yet diff --git a/test/integration/crud/find_cursor_methods.test.js b/test/integration/crud/find_cursor_methods.test.js index 21a6649bf0..1b37cdc00b 100644 --- a/test/integration/crud/find_cursor_methods.test.js +++ b/test/integration/crud/find_cursor_methods.test.js @@ -251,7 +251,7 @@ describe('Find Cursor', function () { }); }); - context('#rewind', function () { + describe('#rewind', function () { it('should rewind a cursor', async function () { const coll = client.db().collection('abstract_cursor'); const cursor = coll.find({}); @@ -335,6 +335,25 @@ describe('Find Cursor', function () { }); } }); + + it('emits close after rewind', async () => { + let cursor; + try { + const coll = client.db().collection('abstract_cursor'); + cursor = coll.find({}, { batchSize: 1 }); + const closes = []; + cursor.on('close', () => closes.push('close')); + const doc0 = await cursor.next(); + await cursor.close(); + cursor.rewind(); + const doc1 = await cursor.next(); + await cursor.close(); + expect(doc0).to.deep.equal(doc1); // make sure rewind happened + expect(closes).to.have.lengthOf(2); + } finally { + await cursor.close(); + } + }); }); context('#allowDiskUse', function () { From 76ef67c84135ac0293daf007cedc24dc9e2531e0 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 23 Jan 2025 10:36:34 -0500 Subject: [PATCH 6/8] docs --- src/mongo_client.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/mongo_client.ts b/src/mongo_client.ts index c0e0017a64..e85f6c0a4a 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -324,6 +324,11 @@ export interface MongoClientPrivate { * - used to notify the leak checker in our tests if test author forgot to clean up explicit sessions */ readonly activeSessions: Set; + /** + * We keep a reference to the cursors that are created from this client. + * - used to track and close all cursors in client.close(). + * Cursor's in this set are ones that still need to have their close method invoked (no other conditions are considered) + */ readonly activeCursors: Set; readonly sessionPool: ServerSessionPool; readonly options: MongoOptions; From 73d5e28fc23d31f37ae5c511664fc145c266c6a4 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 23 Jan 2025 11:57:36 -0500 Subject: [PATCH 7/8] docs --- src/mongo_client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mongo_client.ts b/src/mongo_client.ts index e85f6c0a4a..b6e5fcc931 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -327,7 +327,7 @@ export interface MongoClientPrivate { /** * We keep a reference to the cursors that are created from this client. * - used to track and close all cursors in client.close(). - * Cursor's in this set are ones that still need to have their close method invoked (no other conditions are considered) + * Cursors in this set are ones that still need to have their close method invoked (no other conditions are considered) */ readonly activeCursors: Set; readonly sessionPool: ServerSessionPool; From 3ff151eef00facf4087256fc74de1895887dcf8b Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Mon, 27 Jan 2025 12:40:03 -0500 Subject: [PATCH 8/8] test: bump threshold --- test/integration/node-specific/abort_signal.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/node-specific/abort_signal.test.ts b/test/integration/node-specific/abort_signal.test.ts index d2bb089933..8508aee0aa 100644 --- a/test/integration/node-specific/abort_signal.test.ts +++ b/test/integration/node-specific/abort_signal.test.ts @@ -603,7 +603,7 @@ describe('AbortSignal support', () => { const start = performance.now(); const result = await cursor.toArray().catch(error => error); const end = performance.now(); - expect(end - start).to.be.lessThan(15); + expect(end - start).to.be.lessThan(50); expect(result).to.be.instanceOf(DOMException); });