Skip to content

Commit

Permalink
fix: reconnect behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
denis-pingin committed Oct 4, 2023
1 parent caa9bc0 commit 0ad70c9
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 39 deletions.
42 changes: 17 additions & 25 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export {

export class GelatoRelay {
#config: Config;
readonly #websocketHandler?: WebsocketHandler;
readonly #websocketHandler: WebsocketHandler;

constructor(config?: Partial<Config>) {
this.#config = this._getConfiguration(config);
Expand Down Expand Up @@ -115,8 +115,8 @@ export class GelatoRelay {
this.#config
);

if (this.#websocketHandler) {
this.#websocketHandler.subscribe(response.taskId);
if (this.#websocketHandler.hasHandlers()) {
await this.#websocketHandler.subscribe(response.taskId);
}

return response;
Expand Down Expand Up @@ -148,8 +148,8 @@ export class GelatoRelay {
this.#config
);

if (this.#websocketHandler) {
this.#websocketHandler.subscribe(response.taskId);
if (this.#websocketHandler.hasHandlers()) {
await this.#websocketHandler.subscribe(response.taskId);
}

return response;
Expand All @@ -172,8 +172,8 @@ export class GelatoRelay {
this.#config
);

if (this.#websocketHandler) {
this.#websocketHandler.subscribe(response.taskId);
if (this.#websocketHandler.hasHandlers()) {
await this.#websocketHandler.subscribe(response.taskId);
}

return response;
Expand Down Expand Up @@ -203,8 +203,8 @@ export class GelatoRelay {
this.#config
);

if (this.#websocketHandler) {
this.#websocketHandler.subscribe(response.taskId);
if (this.#websocketHandler.hasHandlers()) {
await this.#websocketHandler.subscribe(response.taskId);
}

return response;
Expand Down Expand Up @@ -268,8 +268,8 @@ export class GelatoRelay {
this.#config
);

if (this.#websocketHandler) {
this.#websocketHandler.subscribe(response.taskId);
if (this.#websocketHandler.hasHandlers()) {
await this.#websocketHandler.subscribe(response.taskId);
}

return response;
Expand Down Expand Up @@ -302,8 +302,8 @@ export class GelatoRelay {
this.#config
);

if (this.#websocketHandler) {
this.#websocketHandler.subscribe(response.taskId);
if (this.#websocketHandler.hasHandlers()) {
await this.#websocketHandler.subscribe(response.taskId);
}

return response;
Expand Down Expand Up @@ -381,9 +381,7 @@ export class GelatoRelay {
onTaskStatusUpdate = (
handler: (taskStatus: TransactionStatusResponse) => void
): void => {
if (this.#websocketHandler) {
this.#websocketHandler.onUpdate(handler);
}
this.#websocketHandler.onUpdate(handler);
};

/**
Expand All @@ -393,28 +391,22 @@ export class GelatoRelay {
offTaskStatusUpdate = (
handler: (taskStatus: TransactionStatusResponse) => void
): void => {
if (this.#websocketHandler) {
this.#websocketHandler.offUpdate(handler);
}
this.#websocketHandler.offUpdate(handler);
};

/**
* @param {callback} handler - Callback function to be called on error
*
*/
onError = (handler: (error: Error) => void): void => {
if (this.#websocketHandler) {
this.#websocketHandler.onError(handler);
}
this.#websocketHandler.onError(handler);
};

/**
* @param {callback} handler - Callback function to be unregistered as an error handler
*
*/
offError = (handler: (error: Error) => void): void => {
if (this.#websocketHandler) {
this.#websocketHandler.offError(handler);
}
this.#websocketHandler.offError(handler);
};
}
73 changes: 59 additions & 14 deletions src/utils/websocketHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export class WebsocketHandler {
#errorHandlers: ((error: Error) => void)[] = [];
#websocket?: WebSocket;
readonly #reconnectIntervalMillis = 1000;
readonly #connectTimeoutMillis = 10000;

constructor(url: string) {
this.#url = `${url}/tasks/ws/status`;
Expand Down Expand Up @@ -70,40 +71,44 @@ export class WebsocketHandler {
this._disconnectIfUnused();
}

public subscribe(taskId: string) {
public async subscribe(taskId: string) {
if (this.#subscriptions.has(taskId)) {
return;
}

this.#subscriptions.add(taskId);

this._sendWebsocketMessage({
await this._sendWebsocketMessage({
action: "subscribe",
taskId,
});
}

public unsubscribe(taskId: string) {
public async unsubscribe(taskId: string) {
if (!this.#subscriptions.has(taskId)) {
return;
}

this.#subscriptions.delete(taskId);

this._sendWebsocketMessage({
await this._sendWebsocketMessage({
action: "unsubscribe",
taskId,
});
}

public hasHandlers(): boolean {
return this.#updateHandlers.length > 0 || this.#errorHandlers.length > 0;
}

private _connect() {
if (this.#websocket) {
return;
}

this.#websocket = new WebSocket(this.#url);

this.#websocket.onopen = () => {
this.#websocket.onopen = async () => {
this.#subscriptions.forEach((taskId) => {
this._sendWebsocketMessage({
action: "subscribe",
Expand All @@ -114,11 +119,11 @@ export class WebsocketHandler {

this.#websocket.onclose = () => {
setTimeout(() => {
this._connect();
this._reconnect();
}, this.#reconnectIntervalMillis);
};

this.#websocket.onmessage = (data: WebSocket.MessageEvent) => {
this.#websocket.onmessage = async (data: WebSocket.MessageEvent) => {
const message = JSON.parse(
data.data.toString()
) as WebsocketMessage<unknown>;
Expand All @@ -138,13 +143,13 @@ export class WebsocketHandler {
const taskStatus: TransactionStatusResponse =
updateWebsocketMessage.payload;

if (isFinalTaskState(taskStatus.taskState)) {
this.unsubscribe(taskStatus.taskId);
}

this.#updateHandlers.forEach((handler) => {
handler(taskStatus);
});

if (isFinalTaskState(taskStatus.taskState)) {
await this.unsubscribe(taskStatus.taskId);
}
break;
}
default: {
Expand All @@ -154,20 +159,60 @@ export class WebsocketHandler {
};
}

private _sendWebsocketMessage(message: unknown): void {
if (this.#websocket && this.#websocket.readyState === WebSocket.OPEN) {
private async _sendWebsocketMessage(message: unknown): Promise<void> {
const isConnected = await this._ensureIsConnected();
if (isConnected) {
this.#websocket.send(JSON.stringify(message));
}
}

private _disconnectIfUnused() {
private _disconnectIfUnused(): void {
if (
this.#updateHandlers.length === 0 &&
this.#errorHandlers.length === 0 &&
this.#websocket
) {
this._disconnect();
}
}

private _disconnect(): void {
if (this.#websocket) {
this.#websocket.close();
this.#websocket = undefined;
}
}

private _reconnect(): void {
this._disconnect();
this._connect();
}

private async _ensureIsConnected(): Promise<boolean> {
if (!this.#websocket) {
this._connect();
} else if (this.#websocket.readyState !== WebSocket.OPEN) {
this._reconnect();
}
return await this._awaitConnection();
}

private async _awaitConnection(): Promise<boolean> {
const start = Date.now();
while (!this.#websocket || this.#websocket.readyState !== WebSocket.OPEN) {
const elapsed = Date.now() - start;
if (elapsed > this.#connectTimeoutMillis) {
const error = new Error(
`Timeout connecting to ${this.#url} after ${elapsed}ms`
);
this.#errorHandlers.forEach((handler) => {
handler(error);
});
return false;
}

await new Promise((resolve) => setTimeout(resolve, 10));
}
return true;
}
}

0 comments on commit 0ad70c9

Please sign in to comment.