Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-6633): MongoClient.close closes active cursors #4372

Merged
merged 8 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 31 additions & 18 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ export interface CursorStreamOptions {
/** @public */
export type CursorFlag = (typeof CURSOR_FLAGS)[number];

function removeActiveCursor(this: AbstractCursor) {
this.client.s.activeCursors.delete(this);
}

/**
* @public
* @experimental
Expand Down Expand Up @@ -365,6 +369,7 @@ export abstract class AbstractCursor<
this.signal,
() => void this.close().then(undefined, squashError)
);
this.trackCursor();
}

/**
Expand Down Expand Up @@ -444,6 +449,14 @@ export abstract class AbstractCursor<
await this.close();
}

/** Adds cursor to client's tracking so it will be closed by MongoClient.close() */
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
private trackCursor() {
this.cursorClient.s.activeCursors.add(this);
if (!this.listeners('close').includes(removeActiveCursor)) {
this.once('close', removeActiveCursor);
}
}

/** Returns current buffered documents length */
bufferedCount(): number {
return this.documents?.length ?? 0;
Expand Down Expand Up @@ -858,16 +871,15 @@ export abstract class AbstractCursor<
this.isClosed = false;
this.isKilled = false;
this.initialized = false;
this.hasEmittedClose = false;
this.trackCursor();
aditi-khare-mongoDB marked this conversation as resolved.
Show resolved Hide resolved

const session = this.cursorSession;
if (session) {
// We only want to end this session if we created it, and it hasn't ended yet
if (session.explicit === false) {
if (!session.hasEnded) {
session.endSession().then(undefined, squashError);
}
this.cursorSession = this.cursorClient.startSession({ owner: this, explicit: false });
// We only want to end this session if we created it, and it hasn't ended yet
if (this.cursorSession.explicit === false) {
if (!this.cursorSession.hasEnded) {
this.cursorSession.endSession().then(undefined, squashError);
}
this.cursorSession = this.cursorClient.startSession({ owner: this, explicit: false });
}
}

Expand Down Expand Up @@ -1004,7 +1016,6 @@ export abstract class AbstractCursor<
private async cleanup(timeoutMS?: number, error?: Error) {
this.abortListener?.[kDispose]();
this.isClosed = true;
const session = this.cursorSession;
const timeoutContextForKillCursors = (): CursorTimeoutContext | undefined => {
if (timeoutMS != null) {
this.timeoutContext?.clear();
Expand All @@ -1026,7 +1037,7 @@ export abstract class AbstractCursor<
!this.cursorId.isZero() &&
this.cursorNamespace &&
this.selectedServer &&
!session.hasEnded
!this.cursorSession.hasEnded
) {
this.isKilled = true;
const cursorId = this.cursorId;
Expand All @@ -1035,22 +1046,24 @@ export abstract class AbstractCursor<
await executeOperation(
this.cursorClient,
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
session
session: this.cursorSession
}),
timeoutContextForKillCursors()
);
}
} catch (error) {
squashError(error);
} finally {
if (session?.owner === this) {
await session.endSession({ error });
}
if (!session?.inTransaction()) {
maybeClearPinnedConnection(session, { error });
try {
if (this.cursorSession?.owner === this) {
await this.cursorSession.endSession({ error });
}
if (!this.cursorSession?.inTransaction()) {
maybeClearPinnedConnection(this.cursorSession, { error });
}
} finally {
this.emitClose();
}

this.emitClose();
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type { ClientMetadata } from './cmap/handshake/client_metadata';
import type { CompressorName } from './cmap/wire_protocol/compression';
import { parseOptions, resolveSRVRecord } from './connection_string';
import { MONGO_CLIENT_EVENTS } from './constants';
import { type AbstractCursor } from './cursor/abstract_cursor';
import { Db, type DbOptions } from './db';
import type { Encrypter } from './encrypter';
import { MongoInvalidArgumentError } from './error';
Expand Down Expand Up @@ -323,6 +324,12 @@ export interface MongoClientPrivate {
* - used to notify the leak checker in our tests if test author forgot to clean up explicit sessions
*/
readonly activeSessions: Set<ClientSession>;
/**
* We keep a reference to the cursors that are created from this client.
* - used to track and close all cursors in client.close().
* Cursors in this set are ones that still need to have their close method invoked (no other conditions are considered)
*/
readonly activeCursors: Set<AbstractCursor>;
readonly sessionPool: ServerSessionPool;
readonly options: MongoOptions;
readonly readConcern?: ReadConcern;
Expand Down Expand Up @@ -398,6 +405,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
hasBeenClosed: false,
sessionPool: new ServerSessionPool(this),
activeSessions: new Set(),
activeCursors: new Set(),
authProviders: new MongoClientAuthProviders(),

get options() {
Expand Down Expand Up @@ -650,6 +658,11 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
writable: false
});

const activeCursorCloses = Array.from(this.s.activeCursors, cursor => cursor.close());
this.s.activeCursors.clear();
aditi-khare-mongoDB marked this conversation as resolved.
Show resolved Hide resolved

await Promise.all(activeCursorCloses);

const activeSessionEnds = Array.from(this.s.activeSessions, session => session.endSession());
this.s.activeSessions.clear();

Expand Down
21 changes: 20 additions & 1 deletion test/integration/crud/find_cursor_methods.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ describe('Find Cursor', function () {
});
});

context('#rewind', function () {
describe('#rewind', function () {
it('should rewind a cursor', async function () {
const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({});
Expand Down Expand Up @@ -335,6 +335,25 @@ describe('Find Cursor', function () {
});
}
});

it('emits close after rewind', async () => {
let cursor;
try {
const coll = client.db().collection('abstract_cursor');
cursor = coll.find({}, { batchSize: 1 });
const closes = [];
cursor.on('close', () => closes.push('close'));
const doc0 = await cursor.next();
await cursor.close();
cursor.rewind();
const doc1 = await cursor.next();
await cursor.close();
expect(doc0).to.deep.equal(doc1); // make sure rewind happened
expect(closes).to.have.lengthOf(2);
} finally {
await cursor.close();
}
});
});

context('#allowDiskUse', function () {
Expand Down
4 changes: 2 additions & 2 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const sinon = require('sinon');
const { Writable } = require('stream');
const { once, on } = require('events');
const { setTimeout } = require('timers');
const { ReadPreference, MongoExpiredSessionError } = require('../../mongodb');
const { ReadPreference } = require('../../mongodb');
const { ServerType } = require('../../mongodb');
const { formatSort } = require('../../mongodb');

Expand Down Expand Up @@ -1872,7 +1872,7 @@ describe('Cursor', function () {
expect(cursor).to.have.property('closed', true);

const error = await rejectedEarlyBecauseClientClosed;
expect(error).to.be.instanceOf(MongoExpiredSessionError);
expect(error).to.be.null; // TODO(NODE-6632): This should throw again after the client signal aborts the in-progress next call
});

it('shouldAwaitData', {
Expand Down
2 changes: 1 addition & 1 deletion test/integration/node-specific/abort_signal.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ describe('AbortSignal support', () => {
const start = performance.now();
const result = await cursor.toArray().catch(error => error);
const end = performance.now();
expect(end - start).to.be.lessThan(15);
expect(end - start).to.be.lessThan(50);

expect(result).to.be.instanceOf(DOMException);
});
Expand Down
43 changes: 43 additions & 0 deletions test/integration/node-specific/abstract_cursor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -556,4 +556,47 @@ describe('class AbstractCursor', function () {
);
});
});

describe('cursor tracking', () => {
let client: MongoClient;
let collection: Collection;

beforeEach(async function () {
client = this.configuration.newClient();
collection = client.db('activeCursors').collection('activeCursors');
await collection.drop().catch(() => null);
await collection.insertMany(Array.from({ length: 50 }, (_, i) => ({ i })));
});

afterEach(async function () {
await client.close();
});

it('adds itself to a set upon construction', () => {
collection.find({}, { batchSize: 1 });
expect(client.s.activeCursors).to.have.lengthOf(1);
});

it('adds itself to a set upon rewind', async () => {
const cursor = collection.find({}, { batchSize: 1 });
await cursor.next();
expect(client.s.activeCursors).to.have.lengthOf(1);
await cursor.close();
expect(client.s.activeCursors).to.have.lengthOf(0);
cursor.rewind();
expect(client.s.activeCursors).to.have.lengthOf(1);
});

it('does not add more than one close listener', async () => {
const cursor = collection.find({}, { batchSize: 1 });
await cursor.next();
expect(cursor.listeners('close')).to.have.lengthOf(1);
await cursor.close();
expect(cursor.listeners('close')).to.have.lengthOf(0);
cursor.rewind();
cursor.rewind();
cursor.rewind();
expect(cursor.listeners('close')).to.have.lengthOf(1);
});
});
});
86 changes: 83 additions & 3 deletions test/integration/node-specific/mongo_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as net from 'net';
import * as sinon from 'sinon';

import {
type Collection,
type CommandFailedEvent,
type CommandStartedEvent,
type CommandSucceededEvent,
Expand Down Expand Up @@ -31,7 +32,6 @@ describe('class MongoClient', function () {
afterEach(async () => {
sinon.restore();
await client?.close();
// @ts-expect-error: Put this variable back to undefined to force tests to make their own client
client = undefined;
});

Expand Down Expand Up @@ -567,7 +567,44 @@ describe('class MongoClient', function () {
});
});

context('#close()', () => {
describe('active cursors', function () {
let client: MongoClient;
let collection: Collection<{ _id: number }>;
const kills = [];

beforeEach(async function () {
client = this.configuration.newClient();
collection = client.db('activeCursors').collection('activeCursors');
await collection.drop().catch(() => null);
await collection.insertMany(Array.from({ length: 50 }, (_, _id) => ({ _id })));

kills.length = 0;
client.on('commandStarted', ev => ev.commandName === 'killCursors' && kills.push(ev));
});

afterEach(async function () {
await client.close();
});

it('are tracked upon creation and removed upon exhaustion', async () => {
const cursors = Array.from({ length: 30 }, (_, skip) =>
collection.find({}, { skip, batchSize: 1 })
);
expect(client.s.activeCursors).to.have.lengthOf(30);
await Promise.all(cursors.map(c => c.toArray()));
expect(client.s.activeCursors).to.have.lengthOf(0);
expect(kills).to.have.lengthOf(0);
});

it('are removed from tracking if exhausted in first batch', async () => {
const cursors = Array.from({ length: 30 }, () => collection.find());
expect(client.s.activeCursors).to.have.lengthOf(30);
await Promise.all(cursors.map(c => c.next())); // only one document pulled from each.
expect(client.s.activeCursors).to.have.lengthOf(0);
});
});

describe('#close()', () => {
let client: MongoClient;
let db: Db;

Expand Down Expand Up @@ -702,7 +739,7 @@ describe('class MongoClient', function () {
expect(endEvents[0]).to.have.property('reply', undefined); // noReponse: true
});

context('when server selection would return no servers', () => {
describe('when server selection would return no servers', () => {
const serverDescription = new ServerDescription('a:1');

it('short circuits and does not end sessions', async () => {
Expand All @@ -722,6 +759,49 @@ describe('class MongoClient', function () {
expect(client.s.sessionPool.sessions).to.have.lengthOf(1);
});
});

describe('active cursors', function () {
let collection: Collection<{ _id: number }>;
const kills = [];

beforeEach(async () => {
collection = client.db('test').collection('activeCursors');
await collection.drop().catch(() => null);
await collection.insertMany(Array.from({ length: 50 }, (_, _id) => ({ _id })));

kills.length = 0;
client.on('commandStarted', ev => ev.commandName === 'killCursors' && kills.push(ev));
});

it('are all closed', async () => {
const cursors = Array.from({ length: 30 }, (_, skip) =>
collection.find({}, { skip, batchSize: 1 })
);
await Promise.all(cursors.map(c => c.next()));
expect(client.s.activeCursors).to.have.lengthOf(30);
await client.close();
expect(client.s.activeCursors).to.have.lengthOf(0);
expect(kills).to.have.lengthOf(30);
aditi-khare-mongoDB marked this conversation as resolved.
Show resolved Hide resolved
});

it('creating cursors after close adds to activeCursors', async () => {
expect(client.s.activeCursors).to.have.lengthOf(0);
await client.close();
collection.find({});
expect(client.s.activeCursors).to.have.lengthOf(1);
});

it('rewinding cursors after close adds to activeCursors', async () => {
expect(client.s.activeCursors).to.have.lengthOf(0);
const cursor = collection.find({}, { batchSize: 1 });
await cursor.next();
expect(client.s.activeCursors).to.have.lengthOf(1);
await client.close();
expect(client.s.activeCursors).to.have.lengthOf(0);
cursor.rewind();
expect(client.s.activeCursors).to.have.lengthOf(1);
});
});
});

context('when connecting', function () {
Expand Down
Loading