Skip to content

Commit f8a855f

Browse files
feat(NODE-6296)!: remove cursor default batch size of 1000 (#4729)
1 parent 0451dae commit f8a855f

File tree

6 files changed

+126
-84
lines changed

6 files changed

+126
-84
lines changed

src/cursor/abstract_cursor.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,11 @@ export abstract class AbstractCursor<
666666
array.push(await this.transformDocument(doc));
667667
}
668668
} else {
669-
array.push(...docs);
669+
// Note: previous versions of this logic used `array.push(...)`, which adds each item
670+
// to the callstack. For large arrays, this can exceed the maximum call size.
671+
for (const doc of docs) {
672+
array.push(doc);
673+
}
670674
}
671675
}
672676
return array;
@@ -858,7 +862,7 @@ export abstract class AbstractCursor<
858862
): Promise<InitialCursorResponse>;
859863

860864
/** @internal */
861-
async getMore(batchSize: number): Promise<CursorResponse> {
865+
async getMore(): Promise<CursorResponse> {
862866
if (this.cursorId == null) {
863867
throw new MongoRuntimeError(
864868
'Unexpected null cursor id. A cursor creating command should have set this'
@@ -875,11 +879,10 @@ export abstract class AbstractCursor<
875879
'Unexpected null session. A cursor creating command should have set this'
876880
);
877881
}
878-
879882
const getMoreOptions = {
880883
...this.cursorOptions,
881884
session: this.cursorSession,
882-
batchSize
885+
batchSize: this.cursorOptions.batchSize
883886
};
884887

885888
const getMoreOperation = new GetMoreOperation(
@@ -952,14 +955,11 @@ export abstract class AbstractCursor<
952955
await this.cursorInit();
953956
// If the cursor died or returned documents, return
954957
if ((this.documents?.length ?? 0) !== 0 || this.isDead) return;
955-
// Otherwise, run a getMore
956958
}
957959

958-
// otherwise need to call getMore
959-
const batchSize = this.cursorOptions.batchSize || 1000;
960-
960+
// Otherwise, run a getMore
961961
try {
962-
const response = await this.getMore(batchSize);
962+
const response = await this.getMore();
963963
this.cursorId = response.id;
964964
this.documents = response;
965965
} catch (error) {

src/cursor/change_stream_cursor.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,8 @@ export class ChangeStreamCursor<
158158
return { server, session, response };
159159
}
160160

161-
override async getMore(batchSize: number): Promise<CursorResponse> {
162-
const response = await super.getMore(batchSize);
161+
override async getMore(): Promise<CursorResponse> {
162+
const response = await super.getMore();
163163

164164
this.maxWireVersion = maxWireVersion(this.server);
165165
this._processBatch(response);

src/cursor/find_cursor.ts

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import { FindOperation, type FindOptions } from '../operations/find';
1616
import type { Hint } from '../operations/operation';
1717
import type { ClientSession } from '../sessions';
1818
import { formatSort, type Sort, type SortDirection } from '../sort';
19-
import { emitWarningOnce, mergeOptions, type MongoDBNamespace, squashError } from '../utils';
19+
import { emitWarningOnce, mergeOptions, type MongoDBNamespace, noop, squashError } from '../utils';
2020
import { type InitialCursorResponse } from './abstract_cursor';
2121
import { ExplainableCursor } from './explainable_cursor';
2222

@@ -98,37 +98,50 @@ export class FindCursor<TSchema = any> extends ExplainableCursor<TSchema> {
9898
}
9999

100100
/** @internal */
101-
override async getMore(batchSize: number): Promise<CursorResponse> {
101+
override async getMore(): Promise<CursorResponse> {
102102
const numReturned = this.numReturned;
103-
if (numReturned) {
104-
// TODO(DRIVERS-1448): Remove logic to enforce `limit` in the driver
105-
const limit = this.findOptions.limit;
106-
batchSize =
107-
limit && limit > 0 && numReturned + batchSize > limit ? limit - numReturned : batchSize;
108-
109-
if (batchSize <= 0) {
110-
try {
111-
await this.close();
112-
} catch (error) {
113-
squashError(error);
114-
// this is an optimization for the special case of a limit for a find command to avoid an
115-
// extra getMore when the limit has been reached and the limit is a multiple of the batchSize.
116-
// This is a consequence of the new query engine in 5.0 having no knowledge of the limit as it
117-
// produces results for the find command. Once a batch is filled up, it is returned and only
118-
// on the subsequent getMore will the query framework consider the limit, determine the cursor
119-
// is exhausted and return a cursorId of zero.
120-
// instead, if we determine there are no more documents to request from the server, we preemptively
121-
// close the cursor
122-
}
123-
return CursorResponse.emptyGetMore;
103+
const limit = this.findOptions.limit ?? Infinity;
104+
const remaining = limit - numReturned;
105+
106+
if (numReturned === limit && !this.id?.isZero()) {
107+
// this is an optimization for the special case of a limit for a find command to avoid an
108+
// extra getMore when the limit has been reached and the limit is a multiple of the batchSize.
109+
// This is a consequence of the new query engine in 5.0 having no knowledge of the limit as it
110+
// produces results for the find command. Once a batch is filled up, it is returned and only
111+
// on the subsequent getMore will the query framework consider the limit, determine the cursor
112+
// is exhausted and return a cursorId of zero.
113+
// instead, if we determine there are no more documents to request from the server, we preemptively
114+
// close the cursor
115+
try {
116+
await this.close();
117+
} catch (error) {
118+
squashError(error);
124119
}
120+
return CursorResponse.emptyGetMore;
121+
}
122+
123+
// TODO(DRIVERS-1448): Remove logic to enforce `limit` in the driver
124+
let cleanup: () => void = noop;
125+
const { batchSize } = this.cursorOptions;
126+
if (batchSize != null && batchSize > remaining) {
127+
this.cursorOptions.batchSize = remaining;
128+
129+
// After executing the final getMore, re-assign the batchSize back to its original value so that
130+
// if the cursor is rewound and executed, the batchSize is still correct.
131+
cleanup = () => {
132+
this.cursorOptions.batchSize = batchSize;
133+
};
125134
}
126135

127-
const response = await super.getMore(batchSize);
128-
// TODO: wrap this in some logic to prevent it from happening if we don't need this support
129-
this.numReturned = this.numReturned + response.batchSize;
136+
try {
137+
const response = await super.getMore();
130138

131-
return response;
139+
this.numReturned = this.numReturned + response.batchSize;
140+
141+
return response;
142+
} finally {
143+
cleanup?.();
144+
}
132145
}
133146

134147
/**

src/cursor/run_command_cursor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ export class RunCommandCursor extends AbstractCursor {
159159
}
160160

161161
/** @internal */
162-
override async getMore(_batchSize: number): Promise<CursorResponse> {
162+
override async getMore(): Promise<CursorResponse> {
163163
if (!this.session) {
164164
throw new MongoRuntimeError(
165165
'Unexpected null session. A cursor creating command should have set this'

test/integration/command-logging-and-monitoring/command_monitoring.test.ts

Lines changed: 39 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -515,53 +515,49 @@ describe('Command Monitoring', function () {
515515
}
516516
});
517517

518-
it('should correctly decorate the apm result for aggregation with cursorId', {
519-
metadata: { requires: { topology: ['single', 'replicaset'], mongodb: '>=3.0.0' } },
520-
521-
test: function () {
522-
const started = [];
523-
const succeeded = [];
524-
525-
// Generate docs
526-
const docs = [];
527-
for (let i = 0; i < 2500; i++) docs.push({ a: i });
528-
529-
const client = this.configuration.newClient(
530-
{ writeConcern: { w: 1 } },
531-
{ maxPoolSize: 1, monitorCommands: true }
532-
);
518+
it('should correctly decorate the apm result for aggregation with cursorId', async function () {
519+
const started = [];
520+
const succeeded = [];
533521

534-
const desiredEvents = ['aggregate', 'getMore'];
535-
client.on('commandStarted', filterForCommands(desiredEvents, started));
536-
client.on('commandSucceeded', filterForCommands(desiredEvents, succeeded));
522+
// Generate docs
523+
const docs = [];
524+
for (let i = 0; i < 3500; i++) docs.push({ a: i });
537525

538-
const db = client.db(this.configuration.db);
539-
return db
540-
.collection('apm_test_u_4')
541-
.drop()
542-
.catch(ignoreNsNotFound)
543-
.then(() => db.collection('apm_test_u_4').insertMany(docs))
544-
.then(r => {
545-
expect(r).to.exist;
546-
return db
547-
.collection('apm_test_u_4')
548-
.aggregate([{ $match: {} }])
549-
.toArray();
550-
})
551-
.then(r => {
552-
expect(r).to.exist;
553-
expect(started).to.have.length(4);
554-
expect(succeeded).to.have.length(4);
555-
const cursors = succeeded.map(x => x.reply.cursor);
526+
const client = this.configuration.newClient(
527+
{ writeConcern: { w: 1 } },
528+
{ maxPoolSize: 1, monitorCommands: true }
529+
);
556530

557-
// Check we have a cursor
558-
expect(cursors[0].id).to.exist;
559-
expect(cursors[0].id.toString()).to.equal(cursors[1].id.toString());
560-
expect(cursors[3].id.toString()).to.equal('0');
531+
const desiredEvents = ['aggregate', 'getMore'];
532+
client.on('commandStarted', filterForCommands(desiredEvents, started));
533+
client.on('commandSucceeded', filterForCommands(desiredEvents, succeeded));
561534

562-
return client.close();
563-
});
564-
}
535+
const db = client.db(this.configuration.db);
536+
return db
537+
.collection('apm_test_u_4')
538+
.drop()
539+
.catch(ignoreNsNotFound)
540+
.then(() => db.collection('apm_test_u_4').insertMany(docs))
541+
.then(r => {
542+
expect(r).to.exist;
543+
return db
544+
.collection('apm_test_u_4')
545+
.aggregate([{ $match: {} }], { batchSize: 1000 })
546+
.toArray();
547+
})
548+
.then(r => {
549+
expect(r).to.exist;
550+
expect(started).to.have.length(4);
551+
expect(succeeded).to.have.length(4);
552+
const cursors = succeeded.map(x => x.reply.cursor);
553+
554+
// Check we have a cursor
555+
expect(cursors[0].id).to.exist;
556+
expect(cursors[0].id.toString()).to.equal(cursors[1].id.toString());
557+
expect(cursors[3].id.toString()).to.equal('0');
558+
559+
return client.close();
560+
});
565561
});
566562

567563
it('should correctly decorate the apm result for listCollections with cursorId', {

test/integration/node-specific/abstract_cursor.test.ts

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,8 +390,8 @@ describe('class AbstractCursor', function () {
390390

391391
afterEach(async function () {
392392
sinon.restore();
393-
await cursor.close();
394-
await client.close();
393+
await cursor?.close();
394+
await client?.close();
395395
});
396396

397397
it('iterates per batch not per document', async () => {
@@ -401,6 +401,39 @@ describe('class AbstractCursor', function () {
401401
const numDocuments = numBatches * batchSize;
402402
expect(nextSpy.callCount).to.be.lessThan(numDocuments);
403403
});
404+
405+
it(
406+
'does not exceed stack size for large arrays',
407+
// $documents was added in 6.0
408+
{ requires: { mongodb: '>=6.0' } },
409+
async function () {
410+
await client
411+
.db()
412+
.aggregate([
413+
{
414+
$documents: [
415+
{
416+
doc: 'foo'
417+
}
418+
]
419+
},
420+
{
421+
$set: {
422+
field: {
423+
$reduce: {
424+
input: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
425+
initialValue: [0],
426+
in: { $concatArrays: ['$$value', '$$value'] }
427+
}
428+
}
429+
}
430+
},
431+
{ $unwind: '$field' },
432+
{ $limit: 1000000 }
433+
])
434+
.toArray();
435+
}
436+
);
404437
});
405438

406439
describe('externally provided timeout contexts', function () {

0 commit comments

Comments
 (0)