Skip to content

Commit a01f596

Browse files
authored
fix(Queue): calling stop should not drop pushed values (#4510)
1 parent 8aa1567 commit a01f596

File tree

2 files changed

+20
-11
lines changed

2 files changed

+20
-11
lines changed

src/execution/Queue.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,12 @@ export class Queue<T> {
5959
}
6060

6161
private _nextBatch(): Promise<Generator<T> | undefined> {
62-
if (this._stopped) {
63-
return Promise.resolve(undefined);
64-
}
6562
if (this._items.length) {
6663
return Promise.resolve(this.batch());
6764
}
65+
if (this._stopped) {
66+
return Promise.resolve(undefined);
67+
}
6868
const { promise, resolve } = promiseWithResolvers<
6969
Generator<T> | undefined
7070
>();
@@ -73,8 +73,10 @@ export class Queue<T> {
7373
}
7474

7575
private _push(item: T): void {
76-
this._items.push(item);
77-
this._resolve(this.batch());
76+
if (!this._stopped) {
77+
this._items.push(item);
78+
this._resolve(this.batch());
79+
}
7880
}
7981

8082
private _resolve(maybeIterable: Generator<T> | undefined): void {

src/execution/__tests__/Queue-test.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ import { describe, it } from 'mocha';
33

44
import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js';
55

6-
import { promiseWithResolvers } from '../../jsutils/promiseWithResolvers.js';
7-
86
import { Queue } from '../Queue.js';
97

108
describe('Queue', () => {
@@ -91,6 +89,17 @@ describe('Queue', () => {
9189
});
9290

9391
it('should allow the executor to indicate completion', async () => {
92+
const queue = new Queue<number>((push, stop) => {
93+
push(1);
94+
stop();
95+
});
96+
97+
const sub = queue.subscribe((batch) => Array.from(batch));
98+
expect(await sub.next()).to.deep.equal({ done: false, value: [1] });
99+
expect(await sub.next()).to.deep.equal({ done: true, value: undefined });
100+
});
101+
102+
it('should allow the executor to indicate completion prior to any push calls', async () => {
94103
const queue = new Queue<number>((push, stop) => {
95104
stop();
96105
push(1); // should be ignored
@@ -101,10 +110,8 @@ describe('Queue', () => {
101110
});
102111

103112
it('should allow a consumer to abort a pending call to next', async () => {
104-
const queue = new Queue<number>(async () => {
105-
const { promise } = promiseWithResolvers();
106-
// wait forever
107-
await promise;
113+
const queue = new Queue<number>(() => {
114+
// no pushes
108115
});
109116

110117
const sub = queue.subscribe((batch) => batch);

0 commit comments

Comments
 (0)