Skip to content

Commit 161612d

Browse files
committed
Add cancelStream function.
1 parent 91e79de commit 161612d

File tree

3 files changed

+33
-10
lines changed

3 files changed

+33
-10
lines changed

src/apiV1/client.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ export const streamQueryV1 = async (
9393

9494
const url = config.endpoint ?? `${DEFAULT_DOMAIN}/v1/stream-query`;
9595

96-
const stream = await generateStream(requestHeaders, requestBody, url);
96+
const { stream } = await generateStream(requestHeaders, requestBody, url);
9797

9898
let previousAnswerText = "";
9999

src/apiV2/client.ts

+22-7
Original file line numberDiff line numberDiff line change
@@ -143,17 +143,32 @@ export const streamQueryV2 = async (
143143
const url = `${endpoint ?? DEFAULT_DOMAIN}${path}`;
144144

145145
try {
146-
const stream = await generateStream(headers, JSON.stringify(body), url);
147-
const buffer = new EventBuffer(onStreamEvent);
146+
const { cancelStream, stream } = await generateStream(
147+
headers,
148+
JSON.stringify(body),
149+
url
150+
);
148151

149-
for await (const chunk of stream) {
152+
new Promise(async (resolve, reject) => {
150153
try {
151-
buffer.consumeChunk(chunk);
152-
buffer.drainEvents();
154+
const buffer = new EventBuffer(onStreamEvent);
155+
156+
for await (const chunk of stream) {
157+
try {
158+
buffer.consumeChunk(chunk);
159+
buffer.drainEvents();
160+
} catch (error) {
161+
console.log("error", error);
162+
}
163+
}
164+
165+
resolve(undefined);
153166
} catch (error) {
154-
console.log("error", error);
167+
reject(error);
155168
}
156-
}
169+
});
170+
171+
return { cancelStream };
157172
} catch (error) {
158173
console.log("error", error);
159174
}

src/common/generateStream.ts

+10-2
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,23 @@ export const generateStream = async (
22
headers: Record<string, string>,
33
body: string,
44
url: string
5-
): Promise<AsyncIterable<string>> => {
5+
) => {
6+
let controller = new AbortController();
7+
68
const response = await fetch(url, {
79
method: "POST",
810
headers,
911
body,
12+
signal: controller.signal,
1013
});
14+
1115
if (response.status !== 200) throw new Error(response.status.toString());
1216
if (!response.body) throw new Error("Response body does not exist");
13-
return getIterableStream(response.body);
17+
18+
return {
19+
stream: getIterableStream(response.body),
20+
cancelStream: () => controller.abort(),
21+
};
1422
};
1523

1624
async function* getIterableStream(

0 commit comments

Comments
 (0)