Skip to content

Commit

Permalink
chore(headless): internalize fetch-event-source (#4986)
Browse files Browse the repository at this point in the history
splits from #4921

Replace
[`@microsoft/fetch-event-source`](https://github.com/Azure/fetch-event-source).

Essentially a good old copy of the sources, with our patch applied on
top and some tweaks in the UT so that they work with Vitest.

Includes a copy of the
[LICENSE](https://github.com/Azure/fetch-event-source/blob/a0529492576e094374602f24d5e64b3a271b4576/LICENSE).
  • Loading branch information
louis-bompart authored Feb 20, 2025
1 parent 5f81597 commit 12eeb66
Show file tree
Hide file tree
Showing 12 changed files with 794 additions and 154 deletions.
7 changes: 0 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion packages/headless/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@
"@coveo/bueno": "1.0.7",
"@coveo/relay": "1.0.0",
"@coveo/relay-event-types": "13.1.3",
"@microsoft/fetch-event-source": "2.0.1",
"@reduxjs/toolkit": "2.5.1",
"abortcontroller-polyfill": "1.7.8",
"coveo.analytics": "2.30.43",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {fetchEventSource} from '@microsoft/fetch-event-source';
import {Logger} from 'pino';
import {AsyncThunkOptions} from '../../app/async-thunk-options.js';
import {ClientThunkExtraArguments} from '../../app/thunk-extra-arguments.js';
import {GeneratedAnswerErrorPayload} from '../../features/generated-answer/generated-answer-actions.js';
import {SearchAppState} from '../../state/search-app-state.js';
import {fetchEventSource} from '../../utils/fetch-event-source/fetch.js';
import {URLPath} from '../../utils/url-utils.js';
import {resetTimeout} from '../../utils/utils.js';
import {GeneratedAnswerStreamEventData} from './generated-answer-event-payload.js';
Expand Down
6 changes: 2 additions & 4 deletions packages/headless/src/api/knowledge/stream-answer-api.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
import {
EventSourceMessage,
fetchEventSource,
} from '@microsoft/fetch-event-source';
import {createSelector, ThunkDispatch, UnknownAction} from '@reduxjs/toolkit';
import {
setAnswerContentFormat,
Expand All @@ -27,6 +23,8 @@ import {
InsightConfigurationSection,
} from '../../state/state-sections.js';
import {getFacets} from '../../utils/facet-utils.js';
import {fetchEventSource} from '../../utils/fetch-event-source/fetch.js';
import {EventSourceMessage} from '../../utils/fetch-event-source/parse.js';
import {GeneratedAnswerCitation} from '../generated-answer/generated-answer-event-payload.js';
import {getOrganizationEndpoint} from '../platform-client.js';
import {SearchRequest} from '../search/search/search-request.js';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import {EventSourceMessage} from '@microsoft/fetch-event-source';
import {EventSourceMessage} from '../../../utils/fetch-event-source/parse';
import {
constructAnswerQueryParams,
GeneratedAnswerStream,
Expand Down
21 changes: 21 additions & 0 deletions packages/headless/src/utils/fetch-event-source/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) Microsoft Corporation.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
174 changes: 174 additions & 0 deletions packages/headless/src/utils/fetch-event-source/fetch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import {EventSourceMessage, getBytes, getLines, getMessages} from './parse.js';

export const EventStreamContentType = 'text/event-stream';

const DefaultRetryInterval = 1000;
const LastEventId = 'last-event-id';

function isBrowser() {
return typeof window !== 'undefined';
}

export interface FetchEventSourceInit extends RequestInit {
/**
* The request headers. FetchEventSource only supports the Record<string,string> format.
*/
headers?: Record<string, string>;

/**
* Called when a response is received. Use this to validate that the response
* actually matches what you expect (and throw if it doesn't.) If not provided,
* will default to a basic validation to ensure the content-type is text/event-stream.
*/
onopen?: (response: Response) => Promise<void>;

/**
* Called when a message is received. NOTE: Unlike the default browser
* EventSource.onmessage, this callback is called for _all_ events,
* even ones with a custom `event` field.
*/
onmessage?: (ev: EventSourceMessage) => void;

/**
* Called when a response finishes. If you don't expect the server to kill
* the connection, you can throw an exception here and retry using onerror.
*/
onclose?: () => void;

/**
* Called when there is any error making the request / processing messages /
* handling callbacks etc. Use this to control the retry strategy: if the
* error is fatal, rethrow the error inside the callback to stop the entire
* operation. Otherwise, you can return an interval (in milliseconds) after
* which the request will automatically retry (with the last-event-id).
* If this callback is not specified, or it returns undefined, fetchEventSource
* will treat every error as retryable and will try again after 1 second.
*/
onerror?: (err: unknown) => number | null | undefined | void;

/**
* If true, will keep the request open even if the document is hidden.
* By default, fetchEventSource will close the request and reopen it
* automatically when the document becomes visible again.
*/
openWhenHidden?: boolean;

/** The Fetch function to use. Defaults to window.fetch */
fetch?: typeof fetch;
}

export function fetchEventSource(
input: RequestInfo,
{
signal: inputSignal,
headers: inputHeaders,
onopen: inputOnOpen,
onmessage,
onclose,
onerror,
openWhenHidden,
fetch: inputFetch,
...rest
}: FetchEventSourceInit
) {
return new Promise<void>((resolve, reject) => {
// make a copy of the input headers since we may modify it below:
const headers = {...inputHeaders};
if (!headers.accept) {
headers.accept = EventStreamContentType;
}

let curRequestController: AbortController | null;
function onVisibilityChange() {
curRequestController?.abort(); // close existing request on every visibility change
if (!document.hidden) {
create(); // page is now visible again, recreate request.
}
}

if (!openWhenHidden && isBrowser()) {
document.addEventListener('visibilitychange', onVisibilityChange);
}

let retryInterval = DefaultRetryInterval;
let retryTimer: string | number | NodeJS.Timeout;
function dispose() {
if (isBrowser()) {
document.removeEventListener('visibilitychange', onVisibilityChange);
}
clearTimeout(retryTimer);
curRequestController?.abort();
}

// if the incoming signal aborts, dispose resources and resolve:
inputSignal?.addEventListener('abort', () => {
dispose();
resolve(); // don't waste time windconstructing/logging errors
});

const outputFetch = inputFetch ?? fetch;
const onopen = inputOnOpen ?? defaultOnOpen;
async function create(): Promise<void> {
curRequestController = AbortController ? new AbortController() : null;
try {
const response = await outputFetch(input, {
...rest,
headers,
signal: curRequestController?.signal,
});

await onopen(response);

await getBytes(
response.body!,
getLines(
getMessages(
(id) => {
if (id) {
// store the id and send it back on the next retry:
headers[LastEventId] = id;
} else {
// don't send the last-event-id header anymore:
delete headers[LastEventId];
}
},
(retry) => {
retryInterval = retry;
},
onmessage
)
)
);

onclose?.();
dispose();
resolve();
} catch (err) {
if (!curRequestController?.signal?.aborted) {
// if we haven't aborted the request ourselves:
try {
// check if we need to retry:
const interval: number = onerror?.(err) ?? retryInterval;
clearTimeout(retryTimer);
retryTimer = setTimeout(create, interval);
} catch (innerErr) {
// we should not retry anymore:
dispose();
reject(innerErr);
}
}
}
}

create();
});
}

function defaultOnOpen(response: Response) {
const contentType = response.headers.get('content-type');
if (!contentType?.startsWith(EventStreamContentType)) {
throw new Error(
`Expected content-type to be ${EventStreamContentType}, Actual: ${contentType}`
);
}
}
6 changes: 6 additions & 0 deletions packages/headless/src/utils/fetch-event-source/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export {
fetchEventSource,
type FetchEventSourceInit,
EventStreamContentType,
} from './fetch.js';
export {type EventSourceMessage} from './parse.js';
Loading

0 comments on commit 12eeb66

Please sign in to comment.