Skip to content
156 changes: 156 additions & 0 deletions webapp/packages/core-root/src/LongPollingSubject.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* CloudBeaver - Cloud Database Manager
* Copyright (C) 2020-2025 DBeaver Corp and others
*
* Licensed under the Apache License, Version 2.0.
* you may not use this file except in compliance with the License.
*/

import type { Subscription, Subscriber, NextObserver } from 'rxjs';
import { Subject } from 'rxjs/internal/Subject';

import type { ISessionEvent } from './SessionEventSource.js';

export interface ILongPollingOptions {
url: string;
startObserver?: NextObserver<void>;
stopObserver?: NextObserver<void>;
}

interface IPollResponse {
events: ISessionEvent[];
}

export class LongPollingSubject<T> extends Subject<T> {
readonly options: ILongPollingOptions;

private output: Subject<T>;
private refCount = 0;
private isPolling: boolean;
private abortController: AbortController | null;

constructor(options: ILongPollingOptions) {
super();

this.options = options;
this.isPolling = false;
this.abortController = null;

this.output = new Subject();
}

override next(value: T): void {
this.send(value).catch(exception => {
this.output.error(exception);
});
}

private async send(data: T): Promise<void> {
const response = await fetch(this.options.url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
credentials: 'include',
body: JSON.stringify(data),
});

if (!response.ok) {
throw new Error(`Failed to send event, status: ${response.status}`);
}
}

protected _subscribe(subscriber: Subscriber<T>): Subscription {
this.refCount++;

if (this.output.closed) {
this.output = new Subject();
}

if (!this.isPolling) {
this.startPolling();
}

const subscription = this.output.subscribe(subscriber);

subscriber.add(() => {
this.refCount--;

if (this.refCount === 0) {
this.stopPolling();
}
});

return subscription;
}

private startPolling() {
if (this.isPolling) {
return;
}

this.isPolling = true;
this.poll();

if (this.options.startObserver) {
this.options.startObserver.next();
}
}

private stopPolling() {
if (!this.isPolling) {
return;
}

this.isPolling = false;

if (this.abortController) {
this.abortController.abort();
this.abortController = null;
}

if (this.options.stopObserver) {
this.options.stopObserver.next();
}
}

/** 25s long poll */
private async poll() {
if (!this.isPolling) {
return;
}

this.abortController = new AbortController();

try {
const response = await fetch(this.options.url, {
method: 'GET',
headers: {
Accept: 'application/json',
},
credentials: 'include',
signal: this.abortController.signal,
});

if (!response.ok) {
throw new Error(`Failed to poll events, status: ${response.status}`);
}

const data: IPollResponse | null = await response.json().catch(() => null);

if (Array.isArray(data?.events)) {
for (const event of data.events) {
this.output.next(event as T);
}
}

this.poll();
} catch (exception) {
if (exception instanceof Error && exception.name === 'AbortError') {
return;
}

this.output.error(exception);
}
}
}
27 changes: 8 additions & 19 deletions webapp/packages/core-root/src/SessionEventSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import {
throwError,
timer,
} from 'rxjs';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';

import { injectable } from '@cloudbeaver/core-di';
import { Executor, type IExecutor, type ISyncExecutor, SyncExecutor } from '@cloudbeaver/core-executor';
Expand All @@ -41,6 +40,7 @@ import {

import type { IBaseServerEvent, IServerEventCallback, IServerEventEmitter, Unsubscribe } from './ServerEventEmitter/IServerEventEmitter.js';
import { SessionExpireService } from './SessionExpireService.js';
import { TransportSubject } from './TransportSubject.js';

export { ServerEventId, SessionEventTopic, ClientEventId };

Expand All @@ -66,10 +66,8 @@ export class SessionEventSource implements IServerEventEmitter<ISessionEvent, IS
readonly onActivate: IExecutor;
readonly onInit: ISyncExecutor;

private readonly closeSubject: Subject<CloseEvent>;
private readonly openSubject: Subject<Event>;
private readonly errorSubject: Subject<Error>;
private readonly subject: WebSocketSubject<ISessionEvent>;
private readonly subject: TransportSubject<ISessionEvent>;
private readonly oldEventsSubject: Subject<ISessionEvent>;
private readonly emitSubject: Subject<ISessionEvent>;
private readonly disconnectSubject: Subject<boolean>;
Expand All @@ -83,15 +81,10 @@ export class SessionEventSource implements IServerEventEmitter<ISessionEvent, IS
this.onInit = new SyncExecutor();
this.oldEventsSubject = new Subject();
this.disconnectSubject = new Subject();
this.closeSubject = new Subject();
this.openSubject = new Subject();
this.errorSubject = new Subject();
this.disconnected = false;
this.subject = webSocket({
url: environmentService.wsEndpoint,
closeObserver: this.closeSubject,
openObserver: this.openSubject,
});

this.subject = new TransportSubject<ISessionEvent>(environmentService);

const ready$ = defer(() => from(this.onActivate.execute())).pipe(shareReplay(1));

Expand All @@ -103,18 +96,14 @@ export class SessionEventSource implements IServerEventEmitter<ISessionEvent, IS
)
.subscribe(this.subject);

this.openSubject.subscribe(() => {
this.subject.ready$.subscribe(() => {
this.onInit.execute();
});

this.closeSubject.subscribe(event => {
console.warn(`Websocket closed (${event.code}): ${event.reason}`);
});

this.eventsSubject = merge(this.oldEventsSubject, ready$.pipe(switchMap(() => this.subject))).pipe(this.handleErrors());

this.errorSubject.pipe(debounceTime(1000)).subscribe(error => {
console.error('Websocket:', error);
console.error('Transport:', error);
});

this.errorHandler = this.errorHandler.bind(this);
Expand Down Expand Up @@ -217,7 +206,7 @@ export class SessionEventSource implements IServerEventEmitter<ISessionEvent, IS

const delayIndex = Math.min(retryCount - 1, RETRY_INTERVALS.length - 1);
const delayTime = RETRY_INTERVALS[delayIndex]!;
console.warn(`WebSocket retry attempt ${retryCount}/${MAX_RETRY_ATTEMPTS} in ${delayTime}ms`);
console.warn(`Transport retry attempt ${retryCount}/${MAX_RETRY_ATTEMPTS} in ${delayTime}ms`);

return timer(delayTime);
},
Expand All @@ -229,7 +218,7 @@ export class SessionEventSource implements IServerEventEmitter<ISessionEvent, IS
}

private errorHandler(error: any): Observable<ISessionEvent> {
this.errorSubject.next(new ServiceError('WebSocket connection error', { cause: error }));
this.errorSubject.next(new ServiceError('Transport error', { cause: error }));
return throwError(() => error);
}
}
110 changes: 110 additions & 0 deletions webapp/packages/core-root/src/TransportSubject.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* CloudBeaver - Cloud Database Manager
* Copyright (C) 2020-2025 DBeaver Corp and others
*
* Licensed under the Apache License, Version 2.0.
* you may not use this file except in compliance with the License.
*/

import { catchError, EMPTY, Observable, shareReplay, Subject, Subscriber, Subscription, take, throwError } from 'rxjs';
import { webSocket } from 'rxjs/webSocket';

import type { EnvironmentService } from '@cloudbeaver/core-sdk';
import { GlobalConstants } from '@cloudbeaver/core-utils';

import { longPolling } from './longPolling.js';

export class TransportSubject<T> extends Subject<T> {
readonly ready$: Observable<void>;

private ws: ReturnType<typeof webSocket<T>>;
private poll: ReturnType<typeof longPolling<T>>;

private ready: Subject<void>;
private active: Subject<T>;
private output: Subject<T>;

private sub: Subscription | null;

constructor(environmentService: EnvironmentService) {
super();

this.output = new Subject();
this.ready = new Subject();

this.ready$ = this.ready.pipe(take(1), shareReplay(1));

this.sub = null;

this.ws = webSocket<T>({
url: environmentService.wsEndpoint,
openObserver: { next: () => this.ready.next() },
closeObserver: {
next: event => {
console.warn(`Websocket closed (${event.code}): ${event.reason}`);
},
},
});

this.poll = longPolling<T>({
url: GlobalConstants.absoluteServiceHTTPUrl('events'),
startObserver: { next: () => this.ready.next() },
});

this.active = window.WebSocket ? this.ws : this.poll;
}

override next(value: T): void {
this.active.next(value);
}

protected _subscribe(subscriber: Subscriber<T>): Subscription {
if (this.output.closed) {
this.output = new Subject();
}

if ((!this.sub || this.sub.closed) && !this.closed) {
this.connect();
}

const subscription = this.output.subscribe(subscriber);
return subscription;
}

private connect() {
if (this.sub) {
this.sub.unsubscribe();
}

this.sub = this.active
.pipe(
catchError(err => {
if (this.active === this.ws) {
console.warn('WebSocket failed, switching to polling');

this.active = this.poll;
this.connect();
return EMPTY;
}

return throwError(() => err);
}),
)
.subscribe({
next: value => this.output.next(value),
error: err => this.output.error(err),
complete: () => this.output.complete(),
});
Comment on lines +79 to +97
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should use similar approach as socket.io to establish connection:

  1. open long polling
  2. try to open websocket same time
  3. if websocket opened, close long pooling and use websocket

}

override unsubscribe(): void {
if (this.sub) {
this.sub.unsubscribe();
this.sub = null;
}

this.ready.complete();
this.output.complete();
super.unsubscribe();
}
}
13 changes: 13 additions & 0 deletions webapp/packages/core-root/src/longPolling.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* CloudBeaver - Cloud Database Manager
* Copyright (C) 2020-2025 DBeaver Corp and others
*
* Licensed under the Apache License, Version 2.0.
* you may not use this file except in compliance with the License.
*/

import { type ILongPollingOptions, LongPollingSubject } from './LongPollingSubject.js';

export function longPolling<T>(options: ILongPollingOptions): LongPollingSubject<T> {
return new LongPollingSubject<T>(options);
}
Loading