Skip to content

Commit 6dfed83

Browse files
Merge pull request #258 from appwrite/web-improve-realtime
fix(web): improve realtime subscriptions
2 parents b89a511 + f96887d commit 6dfed83

File tree

2 files changed

+150
-86
lines changed

2 files changed

+150
-86
lines changed

specs/swagger-appwrite.0.10.0.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

templates/web/src/sdk.ts.twig

Lines changed: 149 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import "isomorphic-form-data";
2-
import { fetch } from "cross-fetch";
1+
import 'isomorphic-form-data';
2+
import { fetch } from 'cross-fetch';
33

44
type Payload = {
55
[key: string]: any;
@@ -10,12 +10,12 @@ type Headers = {
1010
}
1111

1212
type RealtimeResponse = {
13-
type: "error"|"event"|"connected"|"response";
13+
type: 'error'|'event'|'connected'|'response';
1414
data: RealtimeResponseAuthenticated|RealtimeResponseConnected|RealtimeResponseError|RealtimeResponseEvent<unknown>;
1515
}
1616

1717
type RealtimeRequest = {
18-
type: "authentication";
18+
type: 'authentication';
1919
data: RealtimeRequestAuthenticate;
2020
}
2121

@@ -49,13 +49,21 @@ type RealtimeRequestAuthenticate = {
4949
type Realtime = {
5050
socket?: WebSocket;
5151
timeout?: number;
52+
url?: string;
5253
lastMessage?: RealtimeResponse;
53-
channels: {
54-
[key: string]: ((event: MessageEvent) => void)[]
55-
},
54+
channels: Set<string>;
55+
subscriptions: Map<number, {
56+
channels: string[];
57+
callback: (payload: RealtimeResponseEvent<any>) => void
58+
}>;
59+
subscriptionsCounter: number;
60+
reconnect: boolean;
61+
reconnectAttempts: number;
62+
getTimeout: () => number;
63+
connect: () => void;
5664
createSocket: () => void;
57-
authenticate: (event: MessageEvent) => void;
58-
onMessage: <T extends unknown>(channel: string, callback: (response: RealtimeResponseEvent<T>) => void) => (event: MessageEvent) => void;
65+
cleanUp: (channels: string[]) => void;
66+
onMessage: (event: MessageEvent) => void;
5967
}
6068

6169
class {{spec.title | caseUcfirst}}Exception extends Error {
@@ -96,7 +104,7 @@ class {{ spec.title | caseUcfirst }} {
96104
*/
97105
setEndpoint(endpoint: string): this {
98106
this.config.endpoint = endpoint;
99-
this.config.endpointRealtime = this.config.endpointRealtime || this.config.endpoint.replace("https://", "wss://").replace("http://", "ws://");
107+
this.config.endpointRealtime = this.config.endpointRealtime || this.config.endpoint.replace('https://', 'wss://').replace('http://', 'ws://');
100108

101109
return this;
102110
}
@@ -137,71 +145,136 @@ class {{ spec.title | caseUcfirst }} {
137145
private realtime: Realtime = {
138146
socket: undefined,
139147
timeout: undefined,
140-
channels: {},
148+
url: '',
149+
channels: new Set(),
150+
subscriptions: new Map(),
151+
subscriptionsCounter: 0,
152+
reconnect: true,
153+
reconnectAttempts: 0,
141154
lastMessage: undefined,
155+
connect: () => {
156+
clearTimeout(this.realtime.timeout);
157+
this.realtime.timeout = window?.setTimeout(() => {
158+
this.realtime.createSocket();
159+
}, 50);
160+
},
161+
getTimeout: () => {
162+
switch (true) {
163+
case this.realtime.reconnectAttempts < 5:
164+
return 1000;
165+
case this.realtime.reconnectAttempts < 15:
166+
return 5000;
167+
case this.realtime.reconnectAttempts < 100:
168+
return 10_000;
169+
default:
170+
return 60_000;
171+
}
172+
},
142173
createSocket: () => {
174+
if (this.realtime.channels.size < 1) return;
175+
143176
const channels = new URLSearchParams();
144177
channels.set('project', this.config.project);
145-
for (const property in this.realtime.channels) {
146-
channels.append('channels[]', property);
147-
}
148-
if (this.realtime.socket?.readyState === WebSocket.OPEN) {
149-
this.realtime.socket.close();
150-
}
151-
152-
this.realtime.socket = new WebSocket(this.config.endpointRealtime + '/realtime?' + channels.toString());
153-
this.realtime.socket?.addEventListener('message', this.realtime.authenticate);
178+
this.realtime.channels.forEach(channel => {
179+
channels.append('channels[]', channel);
180+
});
181+
182+
const url = this.config.endpointRealtime + '/realtime?' + channels.toString();
183+
184+
if (
185+
url !== this.realtime.url || // Check if URL is present
186+
!this.realtime.socket || // Check if WebSocket has not been created
187+
this.realtime.socket?.readyState > WebSocket.OPEN // Check if WebSocket is CLOSING (3) or CLOSED (4)
188+
) {
189+
if (
190+
this.realtime.socket &&
191+
this.realtime.socket?.readyState < WebSocket.CLOSING // Close WebSocket if it is CONNECTING (0) or OPEN (1)
192+
) {
193+
this.realtime.reconnect = false;
194+
this.realtime.socket.close();
195+
}
154196

155-
for (const channel in this.realtime.channels) {
156-
this.realtime.channels[channel].forEach(callback => {
157-
this.realtime.socket?.addEventListener('message', callback);
197+
this.realtime.url = url;
198+
this.realtime.socket = new WebSocket(url);
199+
this.realtime.socket.addEventListener('message', this.realtime.onMessage);
200+
this.realtime.socket.addEventListener('open', _event => {
201+
this.realtime.reconnectAttempts = 0;
158202
});
159-
}
203+
this.realtime.socket.addEventListener('close', event => {
204+
if (
205+
!this.realtime.reconnect ||
206+
(
207+
this.realtime?.lastMessage?.type === 'error' && // Check if last message was of type error
208+
(<RealtimeResponseError>this.realtime?.lastMessage.data).code === 1008 // Check for policy violation 1008
209+
)
210+
) {
211+
this.realtime.reconnect = true;
212+
return;
213+
}
160214

161-
this.realtime.socket.addEventListener('close', event => {
162-
if (this.realtime?.lastMessage?.type === 'error' && (<RealtimeResponseError>this.realtime?.lastMessage.data).code === 1008) {
163-
return;
164-
}
165-
console.error('Realtime got disconnected. Reconnect will be attempted in 1 second.', event.reason);
166-
setTimeout(() => {
167-
this.realtime.createSocket();
168-
}, 1000);
169-
})
215+
const timeout = this.realtime.getTimeout();
216+
console.error(`Realtime got disconnected. Reconnect will be attempted in ${timeout / 1000} seconds.`, event.reason);
217+
218+
setTimeout(() => {
219+
this.realtime.reconnectAttempts++;
220+
this.realtime.createSocket();
221+
}, timeout);
222+
})
223+
}
170224
},
171-
authenticate: (event) => {
172-
const message: RealtimeResponse = JSON.parse(event.data);
173-
if (message.type === 'connected' && this.realtime.socket?.readyState === WebSocket.OPEN) {
174-
const cookie = JSON.parse(window.localStorage.getItem('cookieFallback') ?? "{}");
175-
const session = cookie?.[`a_session_${this.config.project}`];
176-
const data = <RealtimeResponseConnected>message.data;
177-
178-
if (session && !data.user) {
179-
this.realtime.socket?.send(JSON.stringify(<RealtimeRequest>{
180-
type: "authentication",
181-
data: {
182-
session
225+
onMessage: (event) => {
226+
try {
227+
const message: RealtimeResponse = JSON.parse(event.data);
228+
this.realtime.lastMessage = message;
229+
switch (message.type) {
230+
case 'connected':
231+
const cookie = JSON.parse(window.localStorage.getItem('cookieFallback') ?? '{}');
232+
const session = cookie?.[`a_session_${this.config.project}`];
233+
const messageData = <RealtimeResponseConnected>message.data;
234+
235+
if (session && !messageData.user) {
236+
this.realtime.socket?.send(JSON.stringify(<RealtimeRequest>{
237+
type: 'authentication',
238+
data: {
239+
session
240+
}
241+
}));
242+
}
243+
break;
244+
case 'event':
245+
let data = <RealtimeResponseEvent<unknown>>message.data;
246+
if (data?.channels) {
247+
const isSubscribed = data.channels.some(channel => this.realtime.channels.has(channel));
248+
if (!isSubscribed) return;
249+
this.realtime.subscriptions.forEach(subscription => {
250+
if (data.channels.some(channel => subscription.channels.includes(channel))) {
251+
setTimeout(() => subscription.callback(data));
252+
}
253+
})
183254
}
184-
}));
255+
break;
256+
case 'error':
257+
throw message.data;
258+
default:
259+
break;
185260
}
261+
} catch (e) {
262+
console.error(e);
186263
}
187264
},
188-
onMessage: <T extends unknown>(channel: string, callback: (response: RealtimeResponseEvent<T>) => void) =>
189-
(event) => {
190-
try {
191-
const message: RealtimeResponse = JSON.parse(event.data);
192-
this.realtime.lastMessage = message;
193-
if (message.type === 'event') {
194-
let data = <RealtimeResponseEvent<T>>message.data;
195-
if (data.channels && data.channels.includes(channel)) {
196-
callback(data);
197-
}
198-
} else if (message.type === 'error') {
199-
throw message.data;
265+
cleanUp: channels => {
266+
this.realtime.channels.forEach(channel => {
267+
if (channels.includes(channel)) {
268+
let found = Array.from(this.realtime.subscriptions).some(([_key, subscription] )=> {
269+
return subscription.channels.includes(channel);
270+
})
271+
272+
if (!found) {
273+
this.realtime.channels.delete(channel);
200274
}
201-
} catch (e) {
202-
console.error(e);
203275
}
204-
}
276+
})
277+
}
205278
}
206279

207280
/**
@@ -231,29 +304,20 @@ class {{ spec.title | caseUcfirst }} {
231304
*/
232305
subscribe<T extends unknown>(channels: string | string[], callback: (payload: RealtimeResponseEvent<T>) => void): () => void {
233306
let channelArray = typeof channels === 'string' ? [channels] : channels;
234-
let savedChannels: {
235-
name: string;
236-
index: number;
237-
}[] = [];
238-
channelArray.forEach((channel, index) => {
239-
if (!(channel in this.realtime.channels)) {
240-
this.realtime.channels[channel] = [];
241-
}
242-
savedChannels[index] = {
243-
name: channel,
244-
index: (this.realtime.channels[channel].push(this.realtime.onMessage<T>(channel, callback)) - 1)
245-
};
246-
clearTimeout(this.realtime.timeout);
247-
this.realtime.timeout = window?.setTimeout(() => {
248-
this.realtime.createSocket();
249-
}, 1);
307+
channelArray.forEach(channel => this.realtime.channels.add(channel));
308+
309+
const counter = this.realtime.subscriptionsCounter++;
310+
this.realtime.subscriptions.set(counter, {
311+
channels: channelArray,
312+
callback
250313
});
251314

315+
this.realtime.connect();
316+
252317
return () => {
253-
savedChannels.forEach(channel => {
254-
this.realtime.socket?.removeEventListener('message', this.realtime.channels[channel.name][channel.index]);
255-
this.realtime.channels[channel.name].splice(channel.index, 1);
256-
})
318+
this.realtime.subscriptions.delete(counter);
319+
this.realtime.cleanUp(channelArray);
320+
this.realtime.connect();
257321
}
258322
}
259323

@@ -270,7 +334,7 @@ class {{ spec.title | caseUcfirst }} {
270334
};
271335

272336
if (typeof window !== 'undefined' && window.localStorage) {
273-
headers['X-Fallback-Cookies'] = window.localStorage.getItem('cookieFallback') ?? "";
337+
headers['X-Fallback-Cookies'] = window.localStorage.getItem('cookieFallback') ?? '';
274338
}
275339

276340
if (method === 'GET') {
@@ -304,14 +368,14 @@ class {{ spec.title | caseUcfirst }} {
304368
let data = null;
305369
const response = await fetch(url.toString(), options);
306370

307-
if (response.headers.get("content-type")?.includes("application/json")) {
371+
if (response.headers.get('content-type')?.includes('application/json')) {
308372
data = await response.json();
309373
} else {
310374
data = {
311375
message: await response.text()
312376
};
313377
}
314-
378+
315379
if (400 <= response.status) {
316380
throw new {{spec.title | caseUcfirst}}Exception(data?.message, response.status, data);
317381
}
@@ -368,7 +432,7 @@ class {{ spec.title | caseUcfirst }} {
368432
* @returns {% if method.type == 'webAuth' %}{void|string}{% elseif method.type == 'location' %}{URL}{% else %}{Promise}{% endif %}
369433

370434
*/
371-
{{ method.name | caseCamel }}: {% if method.type != "location" and method.type != 'webAuth'%}async <T extends unknown>{% endif %}({% for parameter in method.parameters.all %}{{ parameter.name | caseCamel | escapeKeyword }}{% if not parameter.required %}?{% endif %}: {{ parameter.type | typeName }}{% if not loop.last %}, {% endif %}{% endfor %}): {% if method.type == 'webAuth' %}void | URL{% elseif method.type == 'location' %}URL{% else %}Promise<T>{% endif %} => {
435+
{{ method.name | caseCamel }}: {% if method.type != 'location' and method.type != 'webAuth'%}async <T extends unknown>{% endif %}({% for parameter in method.parameters.all %}{{ parameter.name | caseCamel | escapeKeyword }}{% if not parameter.required %}?{% endif %}: {{ parameter.type | typeName }}{% if not loop.last %}, {% endif %}{% endfor %}): {% if method.type == 'webAuth' %}void | URL{% elseif method.type == 'location' %}URL{% else %}Promise<T>{% endif %} => {
372436
{% for parameter in method.parameters.all %}
373437
{% if parameter.required %}
374438
if (typeof {{ parameter.name | caseCamel | escapeKeyword }} === 'undefined') {

0 commit comments

Comments
 (0)