Skip to content

Commit 75d7f39

Browse files
authoredFeb 12, 2025··
fix: ignored subscriptions not being read (#3695)
1 parent 1277ae6 commit 75d7f39

File tree

3 files changed

+77
-1
lines changed

3 files changed

+77
-1
lines changed
 

‎.changeset/silent-points-punch.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@fuel-ts/account": patch
3+
---
4+
5+
fix: ignored subscriptions not being read

‎packages/account/src/providers/fuel-graphql-subscriber.ts

+24-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,12 @@ export class FuelGraphqlSubscriber implements AsyncIterator<unknown> {
3232
});
3333

3434
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
35-
const [errorReader, resultReader] = response.body!.tee().map((stream) => stream.getReader());
35+
const [backgroundStream, resultStream] = response.body!.tee();
36+
37+
// eslint-disable-next-line no-void
38+
void this.readInBackground(backgroundStream.getReader());
39+
40+
const [errorReader, resultReader] = resultStream.tee().map((stream) => stream.getReader());
3641

3742
/**
3843
* If the node threw an error, read it and throw it to the user
@@ -44,6 +49,24 @@ export class FuelGraphqlSubscriber implements AsyncIterator<unknown> {
4449
return new FuelGraphqlSubscriber(resultReader);
4550
}
4651

52+
/**
53+
* Reads the stream in the background,
54+
* thereby preventing the stream from not being read
55+
* if the user ignores the subscription.
56+
* Even though the read data is ignored in this function,
57+
* it is still available in the other streams
58+
* via internal mechanisms related to teeing.
59+
*/
60+
private static async readInBackground(reader: ReadableStreamDefaultReader<Uint8Array>) {
61+
// eslint-disable-next-line no-constant-condition
62+
while (true) {
63+
const { done } = await reader.read();
64+
if (done) {
65+
return;
66+
}
67+
}
68+
}
69+
4770
private events: Array<{ data: unknown; errors?: { message: string }[] }> = [];
4871
private parsingLeftover = '';
4972

‎packages/account/src/providers/provider.test.ts

+48
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { InputType, OutputType, ReceiptType } from '@fuel-ts/transactions';
99
import { DateTime, arrayify, hexlify, sleep } from '@fuel-ts/utils';
1010
import { ASSET_A, ASSET_B } from '@fuel-ts/utils/test-utils';
1111
import { versions } from '@fuel-ts/versions';
12+
import type { MockInstance } from 'vitest';
1213

1314
import { Wallet } from '..';
1415
import {
@@ -1753,6 +1754,53 @@ describe('Provider', () => {
17531754
expect(numberOfEvents).toEqual(2);
17541755
});
17551756

1757+
it('subscriptions: streams are consumed even if the async iterator is not', async () => {
1758+
using launched = await setupTestProviderAndWallets();
1759+
const { provider } = launched;
1760+
1761+
const sseResponse = new TextEncoder().encode(`data:{"field":"not-relevant"}\n\n`);
1762+
1763+
let pullCallNum = 0;
1764+
1765+
const underlyingSource: UnderlyingDefaultSource = {
1766+
pull: (controller) => {
1767+
pullCallNum += 1;
1768+
controller.enqueue(sseResponse);
1769+
if (pullCallNum === 20) {
1770+
controller.close();
1771+
}
1772+
},
1773+
};
1774+
1775+
const pullSpy: MockInstance = vi.spyOn(underlyingSource, 'pull');
1776+
1777+
vi.spyOn(global, 'fetch').mockImplementationOnce(() =>
1778+
Promise.resolve(
1779+
new Response(
1780+
new ReadableStream(
1781+
underlyingSource,
1782+
/**
1783+
* Only pull when .read() is called.
1784+
* Don't do any behind-the-scenes buffering
1785+
* so that we can test that the sdk itself is pulling
1786+
* even if the user isn't reading.
1787+
*/
1788+
{ highWaterMark: 0 }
1789+
)
1790+
)
1791+
)
1792+
);
1793+
1794+
await provider.operations.submitAndAwaitStatus({
1795+
encodedTransaction: "it's mocked so doesn't matter",
1796+
});
1797+
1798+
// give time for the pulls to be called in the background
1799+
await sleep(500);
1800+
1801+
expect(pullSpy).toHaveBeenCalledTimes(20);
1802+
});
1803+
17561804
it('subscriptions: throws if the stream data string parsing fails for some reason', async () => {
17571805
using launched = await setupTestProviderAndWallets();
17581806
const { provider } = launched;

0 commit comments

Comments
 (0)
Please sign in to comment.