diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index baabdcb3b23..5b402ca1803 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -4,7 +4,7 @@ import { on, once } from 'events'; import { gte, lt } from 'semver'; import * as sinon from 'sinon'; import { PassThrough } from 'stream'; -import { setTimeout } from 'timers'; +import { clearTimeout, setTimeout } from 'timers'; import { type ChangeStream, @@ -773,115 +773,136 @@ describe('Change Streams', function () { }); }); - describe('should properly handle a changeStream event being processed mid-close', function () { - let client, coll, changeStream; - - function write() { - return Promise.resolve() - .then(() => coll.insertOne({ a: 1 })) - .then(() => coll.insertOne({ b: 2 })); - } - - function lastWrite() { - return coll.insertOne({ c: 3 }); + describe('when close is called while changes are pending', function () { + let client; + let db; + let collection: Collection<{ insertCount: number }>; + let changeStream: ChangeStream<{ insertCount: number }>; + let insertInterval = undefined; + let insertCount = 0; + + /** insertOne every 300ms without running the next insert before the previous one completes */ + function setInsertInterval() { + // start an insert + // if first one, create a timeout and refresh + // if NOT first one, just refresh + collection?.insertOne({ insertCount: insertCount++ }).then(() => { + insertInterval ??= setTimeout(setInsertInterval, 300); + insertInterval.refresh(); + }); } - beforeEach(function () { + beforeEach(async function () { client = this.configuration.newClient(); - return client.connect().then(_client => { - client = _client; - coll = client.db(this.configuration.db).collection('tester'); - changeStream = coll.watch(); - }); + await client.connect(); + db = client.db('test'); + collection = db.collection('test_close'); + await collection.drop().catch(() => null); + changeStream = collection.watch(); + + insertCount = 0; + setInsertInterval(); }); afterEach(async function () { - await changeStream?.close(); - await client?.close(); - coll = undefined; - changeStream = undefined; + clearTimeout(insertInterval); + await collection.drop().catch(() => null); + await client.close(); + + db = undefined; client = undefined; + collection = undefined; + changeStream = undefined; + insertInterval = undefined; + insertCount = 0; }); - it('when invoked with promises', { - metadata: { requires: { topology: 'replicaset' } }, - test: function () { - const read = () => { - return Promise.resolve() - .then(() => changeStream.next()) - .then(() => changeStream.next()) - .then(() => { - this.defer(lastWrite()); - const nextP = changeStream.next(); - return changeStream.close().then(() => nextP); - }); - }; + it( + 'rejects promises already returned by next', + { requires: { topology: 'replicaset' } }, + async function () { + const changes = Array.from({ length: 20 }, () => changeStream.next()); + await changeStream.close(); + const results = await Promise.allSettled(changes); + + const statuses = results.map(({ status, reason, value }) => { + const res = + status === 'rejected' + ? reason.message + : value.operationType === 'insert' + ? `insert count = ${value.fullDocument.insertCount}` + : null; + return `${status}:${res}`; + }); - return Promise.all([read(), write()]).then( - () => Promise.reject(new Error('Expected operation to fail with error')), - err => expect(err.message).to.equal('ChangeStream is closed') + expect(statuses).to.deep.equal( + Array.from({ length: 20 }, () => 'rejected:ChangeStream is closed') ); } - }); - - it('when invoked with callbacks', { - metadata: { requires: { topology: 'replicaset' } }, - test: function (done) { - const ops = []; - changeStream.next(() => { - changeStream.next(() => { - ops.push(lastWrite()); - - // explicitly close the change stream after the write has begun - ops.push(changeStream.close()); + ); - changeStream.next(err => { - try { - expect(err) - .property('message') - .to.match(/ChangeStream is closed/); - Promise.all(ops).then(() => done(), done); - } catch (e) { - done(e); - } - }); - }); + it.skip( + 'rejects promises already returned by next after awaiting the first one', + { requires: { topology: 'replicaset' } }, + async function () { + const changes = Array.from({ length: 20 }, () => changeStream.next()); + await changes[0]; + const allChanges = Promise.allSettled(changes); + + await changeStream.close(); + + const results = await allChanges; + + const statuses = results.map(({ status, reason, value }) => { + const res = + status === 'rejected' + ? reason.message + : value.operationType === 'insert' + ? `insert count = ${value.fullDocument.insertCount}` + : null; + return `${status}:${res}`; }); - ops.push( - write().catch(() => { - // ignore - }) - ); + console.log(statuses); + + expect(statuses).to.deep.equal([ + 'fulfilled:insert count = 1', + ...Array.from({ length: 19 }, () => 'rejected:ChangeStream is closed') + ]); } - }); + ).skipReason = 'TODO(NODE-5221): Parallel change streams and close are nondeterministic'; - it.skip('when invoked using eventEmitter API', { - metadata: { - requires: { topology: 'replicaset' } - }, - async test() { - const changes = on(changeStream, 'change'); - await once(changeStream.cursor, 'init'); + it.skip( + 'rejects promises already returned by next after awaiting half of them', + { requires: { topology: 'replicaset' } }, + async function () { + const changes = Array.from({ length: 20 }, () => changeStream.next()); + const allChanges = Promise.allSettled(changes); - await write(); - await lastWrite().catch(() => null); + await Promise.allSettled(changes.slice(10)); - let counter = 0; + await changeStream.close(); - for await (const _ of changes) { - counter += 1; - if (counter === 2) { - await changeStream.close(); - break; - } - } + const results = await allChanges; + + const statuses = results.map(({ status, reason, value }) => { + const res = + status === 'rejected' + ? reason.message + : value.operationType === 'insert' + ? `insert count = ${value.fullDocument.insertCount}` + : null; + return `${status}:${res}`; + }); + + console.log(statuses); - const result = await Promise.race([changes.next(), sleep(800).then(() => 42)]); - expect(result, 'should not have recieved a third event').to.equal(42); + expect(statuses).to.deep.equal([ + ...Array.from({ length: 1 }, () => 'fulfilled:insert count = 0'), + ...Array.from({ length: 19 }, () => 'fulfilled:insert count = 1') + ]); } - }).skipReason = - 'This test only worked because of timing, changeStream.close does not remove the change listener'; + ).skipReason = 'TODO(NODE-5221): Parallel change streams and close are nondeterministic'; }); describe('iterator api', function () {