-
Notifications
You must be signed in to change notification settings - Fork 508
dbeaver/pro#7603 add long polling transport #4002
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
efc884e
962909c
8664b44
b0c7cdf
f073e3c
663f6ba
79e4c28
d1fee95
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| /* | ||
| * CloudBeaver - Cloud Database Manager | ||
| * Copyright (C) 2020-2025 DBeaver Corp and others | ||
| * | ||
| * Licensed under the Apache License, Version 2.0. | ||
| * you may not use this file except in compliance with the License. | ||
| */ | ||
|
|
||
| import type { Subscription, Subscriber, NextObserver } from 'rxjs'; | ||
| import { Subject } from 'rxjs/internal/Subject'; | ||
|
|
||
| import type { ISessionEvent } from './SessionEventSource.js'; | ||
|
|
||
| export interface ILongPollingOptions { | ||
| url: string; | ||
| startObserver?: NextObserver<void>; | ||
| stopObserver?: NextObserver<void>; | ||
| } | ||
|
|
||
| interface IPollResponse { | ||
| events: ISessionEvent[]; | ||
| } | ||
|
|
||
| export class LongPollingSubject<T> extends Subject<T> { | ||
| readonly options: ILongPollingOptions; | ||
|
|
||
| private output: Subject<T>; | ||
| private isPolling: boolean; | ||
| private abortController: AbortController | null; | ||
|
|
||
| constructor(options: ILongPollingOptions) { | ||
| super(); | ||
|
|
||
| this.options = options; | ||
| this.isPolling = false; | ||
| this.abortController = null; | ||
|
|
||
| this.output = new Subject(); | ||
| } | ||
|
|
||
| override next(value: T): void { | ||
| this.send(value).catch(exception => { | ||
| this.output.error(exception); | ||
| }); | ||
| } | ||
|
|
||
| async send(data: T): Promise<void> { | ||
| const response = await fetch(this.options.url, { | ||
| method: 'POST', | ||
| headers: { | ||
| 'Content-Type': 'application/json', | ||
| }, | ||
| credentials: 'include', | ||
| body: JSON.stringify(data), | ||
| }); | ||
|
|
||
| if (!response.ok) { | ||
| throw new Error(`Failed to send event, status: ${response.status}`); | ||
| } | ||
| } | ||
|
|
||
| protected _subscribe(subscriber: Subscriber<T>): Subscription { | ||
| if (!this.isPolling) { | ||
| this.startPolling(); | ||
| } | ||
|
|
||
| const subscription = this.output.subscribe(subscriber); | ||
|
|
||
| subscriber.add(() => { | ||
| if (this.output.observers.length === 0) { | ||
| this.stopPolling(); | ||
| } | ||
| }); | ||
|
|
||
| return subscription; | ||
| } | ||
|
Comment on lines
63
to
85
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure about this implementation. In RxJS, observables are lazy; they can be subscribed to multiple times, but if there are no listeners, all of them will be "cold" |
||
|
|
||
| private startPolling() { | ||
| if (this.isPolling) { | ||
| return; | ||
| } | ||
|
|
||
| this.isPolling = true; | ||
| this.poll(); | ||
|
|
||
| if (this.options.startObserver) { | ||
| this.options.startObserver.next(); | ||
| } | ||
| } | ||
|
|
||
| private stopPolling() { | ||
| if (!this.isPolling) { | ||
| return; | ||
| } | ||
|
|
||
| this.isPolling = false; | ||
|
|
||
| if (this.abortController) { | ||
| this.abortController.abort(); | ||
| this.abortController = null; | ||
| } | ||
|
|
||
| if (this.options.stopObserver) { | ||
| this.options.stopObserver.next(); | ||
| } | ||
| } | ||
|
|
||
| /** 25s long poll */ | ||
| private async poll() { | ||
| if (!this.isPolling) { | ||
| return; | ||
| } | ||
|
|
||
| this.abortController = new AbortController(); | ||
|
|
||
| try { | ||
| const response = await fetch(this.options.url, { | ||
| method: 'GET', | ||
| headers: { | ||
| Accept: 'application/json', | ||
| }, | ||
| credentials: 'include', | ||
| signal: this.abortController.signal, | ||
| }); | ||
|
|
||
| if (!response.ok) { | ||
| throw new Error(`Failed to poll events, status: ${response.status}`); | ||
| } | ||
|
|
||
| const data: IPollResponse | null = await response.json().catch(() => null); | ||
|
|
||
| if (Array.isArray(data?.events)) { | ||
| for (const event of data.events) { | ||
| this.output.next(event as T); | ||
| } | ||
| } | ||
|
|
||
| this.poll(); | ||
| } catch (exception) { | ||
| if (exception instanceof Error && exception.name === 'AbortError') { | ||
| return; | ||
| } | ||
|
|
||
| this.output.error(exception); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| /* | ||
| * CloudBeaver - Cloud Database Manager | ||
| * Copyright (C) 2020-2025 DBeaver Corp and others | ||
| * | ||
| * Licensed under the Apache License, Version 2.0. | ||
| * you may not use this file except in compliance with the License. | ||
| */ | ||
|
|
||
| import { catchError, EMPTY, Observable, shareReplay, Subject, Subscriber, Subscription, take, throwError } from 'rxjs'; | ||
| import { webSocket } from 'rxjs/webSocket'; | ||
|
|
||
| import type { EnvironmentService } from '@cloudbeaver/core-sdk'; | ||
| import { GlobalConstants } from '@cloudbeaver/core-utils'; | ||
|
|
||
| import { longPolling } from './longPolling.js'; | ||
|
|
||
| export class TransportSubject<T> extends Subject<T> { | ||
| readonly ready$: Observable<void>; | ||
|
|
||
| private ws: ReturnType<typeof webSocket<T>>; | ||
| private poll: ReturnType<typeof longPolling<T>>; | ||
|
|
||
| private ready: Subject<void>; | ||
| private active: Subject<T>; | ||
| private output: Subject<T>; | ||
|
|
||
| private sub: Subscription | null; | ||
|
|
||
| constructor(environmentService: EnvironmentService) { | ||
| super(); | ||
|
|
||
| this.output = new Subject(); | ||
| this.ready = new Subject(); | ||
|
|
||
| this.ready$ = this.ready.pipe(take(1), shareReplay(1)); | ||
|
|
||
| this.sub = null; | ||
|
|
||
| this.ws = webSocket<T>({ | ||
| url: environmentService.wsEndpoint, | ||
| openObserver: { next: () => this.ready.next() }, | ||
| closeObserver: { | ||
| next: event => { | ||
| console.warn(`Websocket closed (${event.code}): ${event.reason}`); | ||
| }, | ||
| }, | ||
| }); | ||
|
|
||
| this.poll = longPolling<T>({ | ||
| url: GlobalConstants.absoluteServiceHTTPUrl('events'), | ||
| startObserver: { next: () => this.ready.next() }, | ||
| }); | ||
|
|
||
| this.active = window.WebSocket ? this.ws : this.poll; | ||
| } | ||
|
|
||
| override next(value: T): void { | ||
| this.active.next(value); | ||
| } | ||
|
|
||
| protected _subscribe(subscriber: Subscriber<T>): Subscription { | ||
| if (!this.sub && !this.closed) { | ||
| this.connect(); | ||
| } | ||
|
|
||
| const subscription = this.output.subscribe(subscriber); | ||
| return subscription; | ||
| } | ||
|
|
||
| private connect() { | ||
| if (this.sub) { | ||
| this.sub.unsubscribe(); | ||
| } | ||
|
|
||
| this.sub = this.active | ||
| .pipe( | ||
| catchError(err => { | ||
| if (this.active === this.ws) { | ||
| console.warn('WebSocket failed, switching to polling'); | ||
|
|
||
| this.active = this.poll; | ||
| this.connect(); | ||
| return EMPTY; | ||
| } | ||
|
|
||
| return throwError(() => err); | ||
| }), | ||
| ) | ||
| .subscribe({ | ||
| next: value => this.output.next(value), | ||
| error: err => this.output.error(err), | ||
| complete: () => this.output.complete(), | ||
| }); | ||
|
Comment on lines
+79
to
+97
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think we should use similar approach as socket.io to establish connection:
|
||
| } | ||
|
|
||
| override unsubscribe(): void { | ||
| if (this.sub) { | ||
| this.sub.unsubscribe(); | ||
| this.sub = null; | ||
| } | ||
|
|
||
| this.ready.complete(); | ||
| this.output.complete(); | ||
| super.unsubscribe(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| /* | ||
| * CloudBeaver - Cloud Database Manager | ||
| * Copyright (C) 2020-2025 DBeaver Corp and others | ||
| * | ||
| * Licensed under the Apache License, Version 2.0. | ||
| * you may not use this file except in compliance with the License. | ||
| */ | ||
|
|
||
| import { type ILongPollingOptions, LongPollingSubject } from './LongPollingSubject.js'; | ||
|
|
||
| export function longPolling<T>(options: ILongPollingOptions): LongPollingSubject<T> { | ||
| return new LongPollingSubject<T>(options); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it should be private