diff --git a/packages/scenes/src/components/SceneTimeRangeCompare.tsx b/packages/scenes/src/components/SceneTimeRangeCompare.tsx index b47b1186f..dade66a08 100644 --- a/packages/scenes/src/components/SceneTimeRangeCompare.tsx +++ b/packages/scenes/src/components/SceneTimeRangeCompare.tsx @@ -6,7 +6,7 @@ import { sceneGraph } from '../core/sceneGraph'; import { SceneObjectBase } from '../core/SceneObjectBase'; import { SceneComponentProps, SceneObjectState, SceneObjectUrlValues } from '../core/types'; import { DataQueryExtended } from '../querying/SceneQueryRunner'; -import { ExtraQueryDescriptor, ExtraQueryDataProcessor, ExtraQueryProvider } from '../querying/ExtraQueryProvider'; +import { ExtraQueryDescriptor, ExtraQueryDataProcessor, ExtraQueryProvider, ExtraQueryShouldRerun } from '../querying/ExtraQueryProvider'; import { SceneObjectUrlSyncConfig } from '../services/SceneObjectUrlSyncConfig'; import { getCompareSeriesRefId } from '../utils/getCompareSeriesRefId'; import { parseUrlParam } from '../utils/parseUrlParam'; @@ -118,7 +118,7 @@ export class SceneTimeRangeCompare } // The query runner should rerun the comparison query if the compareWith value has changed. - public shouldRerun(prev: SceneTimeRangeCompareState, next: SceneTimeRangeCompareState): boolean { + public shouldRerun(prev: SceneTimeRangeCompareState, next: SceneTimeRangeCompareState): ExtraQueryShouldRerun { return prev.compareWith !== next.compareWith; } diff --git a/packages/scenes/src/index.ts b/packages/scenes/src/index.ts index 2160e642d..d594946f2 100644 --- a/packages/scenes/src/index.ts +++ b/packages/scenes/src/index.ts @@ -28,7 +28,7 @@ export { SceneTimeRange } from './core/SceneTimeRange'; export { SceneTimeZoneOverride } from './core/SceneTimeZoneOverride'; export { SceneQueryRunner, type QueryRunnerState } from './querying/SceneQueryRunner'; -export { type ExtraQueryDescriptor, type ExtraQueryProvider, type ExtraQueryDataProcessor } from './querying/ExtraQueryProvider'; +export { type ExtraQueryDescriptor, type ExtraQueryProvider, type ExtraQueryDataProcessor, type ExtraQueryShouldRerun } from './querying/ExtraQueryProvider'; export { SceneDataLayerSet, SceneDataLayerSetBase } from './querying/SceneDataLayerSet'; export { SceneDataLayerBase } from './querying/layers/SceneDataLayerBase'; export { SceneDataLayerControls } from './querying/layers/SceneDataLayerControls'; diff --git a/packages/scenes/src/querying/ExtraQueryProvider.ts b/packages/scenes/src/querying/ExtraQueryProvider.ts index 7637c57a3..35dc49a6b 100644 --- a/packages/scenes/src/querying/ExtraQueryProvider.ts +++ b/packages/scenes/src/querying/ExtraQueryProvider.ts @@ -29,6 +29,18 @@ export interface ExtraQueryDescriptor { processor?: ExtraQueryDataProcessor; } +// Whether extra queries, providers, or neither should be rerun as the result +// of a state change. +// +// Returning `true` or 'queries' will cause the query runner to completely rerun all queries +// _and_ processors. +// Returning 'processors' will avoid rerunning queries, and pass the most +// recent (unprocessed) query results to the processors again for reprocessing. This allows +// the processors to process differently depending on their most recent state, without incurring +// the cost of a query. +// Returning `false` will not rerun queries or processors. +export type ExtraQueryShouldRerun = boolean | 'queries' | 'processors'; + // Indicates that this type wants to add extra requests, along with // optional processing functions, to a query runner. export interface ExtraQueryProvider extends SceneObjectBase { @@ -38,8 +50,8 @@ export interface ExtraQueryProvider extends SceneObj // // When the provider's state changes this function will be passed both the previous and the // next state. The implementation can use this to determine whether the change should trigger - // a rerun of the query or not. - shouldRerun(prev: T, next: T): boolean; + // a rerun of the queries, processors or neither. + shouldRerun(prev: T, next: T): ExtraQueryShouldRerun; } export function isExtraQueryProvider(obj: any): obj is ExtraQueryProvider { diff --git a/packages/scenes/src/querying/SceneQueryRunner.test.ts b/packages/scenes/src/querying/SceneQueryRunner.test.ts index 4166a6a28..2238c6056 100644 --- a/packages/scenes/src/querying/SceneQueryRunner.test.ts +++ b/packages/scenes/src/querying/SceneQueryRunner.test.ts @@ -35,7 +35,7 @@ import { activateFullSceneTree } from '../utils/test/activateFullSceneTree'; import { SceneDeactivationHandler, SceneObjectState } from '../core/types'; import { LocalValueVariable } from '../variables/variants/LocalValueVariable'; import { SceneObjectBase } from '../core/SceneObjectBase'; -import { ExtraQueryDescriptor, ExtraQueryProvider } from './ExtraQueryProvider'; +import { ExtraQueryDescriptor, ExtraQueryProvider, ExtraQueryShouldRerun } from './ExtraQueryProvider'; const getDataSourceMock = jest.fn().mockReturnValue({ uid: 'test-uid', @@ -1191,6 +1191,48 @@ describe('SceneQueryRunner', () => { expect(runRequestMock.mock.calls.length).toEqual(2); }); + + test('should run extra processors, but not queries, when providers declare it', async () => { + const timeRange = new SceneTimeRange({ + from: '2023-08-24T05:00:00.000Z', + to: '2023-08-24T07:00:00.000Z', + }); + + const queryRunner = new SceneQueryRunner({ + queries: [{ refId: 'A' }], + }); + const provider = new TestExtraQueryProvider({ foo: 1 }, 'processors'); + const scene = new EmbeddedScene({ + $timeRange: timeRange, + $data: queryRunner, + controls: [provider], + body: new SceneCanvasText({ text: 'hello' }), + }); + + // activate the scene, which will also activate the provider + // and the provider will run the extra request + scene.activate(); + await new Promise((r) => setTimeout(r, 1)); + + expect(runRequestMock.mock.calls.length).toEqual(2); + let runRequestCall = runRequestMock.mock.calls[0]; + let extraRunRequestCall = runRequestMock.mock.calls[1]; + expect(runRequestCall[1].targets[0].refId).toEqual('A'); + expect(extraRunRequestCall[1].targets[0].refId).toEqual('Extra'); + expect(extraRunRequestCall[1].targets[0].foo).toEqual(1); + expect(queryRunner.state.data?.series[3].fields[0].values[0]).toEqual(1); + + // change the state of the provider, which will trigger the activation + // handler to run the processor again. The provider will + // return 'processors' from shouldRun, so we should not see any more queries. + provider.setState({ foo: 2 }); + await new Promise((r) => setTimeout(r, 1)); + + expect(runRequestMock.mock.calls.length).toEqual(2); + + // we _should_ see that the processor has rerun and updated the data, however. + expect(queryRunner.state.data?.series[3].fields[0].values[0]).toEqual(2); + }); }); describe('time frame comparison', () => { @@ -2308,9 +2350,9 @@ interface TestExtraQueryProviderState extends SceneObjectState { } class TestExtraQueryProvider extends SceneObjectBase implements ExtraQueryProvider<{}> { - private _shouldRerun: boolean; + private _shouldRerun: ExtraQueryShouldRerun; - public constructor(state: { foo: number }, shouldRerun: boolean) { + public constructor(state: { foo: number }, shouldRerun: ExtraQueryShouldRerun) { super(state); this._shouldRerun = shouldRerun; } @@ -2324,11 +2366,21 @@ class TestExtraQueryProvider extends SceneObjectBase of({ ...primary, ...secondary }), + processor: (primary, secondary) => { + return of({ + ...primary, + ...secondary, + series: [...primary.series, ...secondary.series, { + fields: [{ name: "foo", values: [this.state.foo], config: {}, type: FieldType.number }], + length: 1, + }], + }); + }, }, ]; } - public shouldRerun(prev: {}, next: {}): boolean { + + public shouldRerun(prev: {}, next: {}): ExtraQueryShouldRerun { return this._shouldRerun; } } diff --git a/packages/scenes/src/querying/SceneQueryRunner.ts b/packages/scenes/src/querying/SceneQueryRunner.ts index 8b9f9b757..165f0185d 100644 --- a/packages/scenes/src/querying/SceneQueryRunner.ts +++ b/packages/scenes/src/querying/SceneQueryRunner.ts @@ -1,5 +1,5 @@ import { cloneDeep, isEqual } from 'lodash'; -import { forkJoin, ReplaySubject, Unsubscribable } from 'rxjs'; +import { forkJoin, map, mergeMap, Observable, of, ReplaySubject, Unsubscribable } from 'rxjs'; import { DataQuery, DataSourceRef, LoadingState } from '@grafana/schema'; @@ -36,7 +36,6 @@ import { VariableValueRecorder } from '../variables/VariableValueRecorder'; import { emptyPanelData } from '../core/SceneDataNode'; import { getClosest } from '../core/sceneGraph/utils'; import { isExtraQueryProvider, ExtraQueryDataProcessor, ExtraQueryProvider } from './ExtraQueryProvider'; -import { passthroughProcessor, extraQueryProcessingOperator } from './extraQueryProcessingOperator'; import { filterAnnotations } from './layers/annotations/filterAnnotations'; import { getEnrichedDataRequest } from './getEnrichedDataRequest'; import { findActiveAdHocFilterVariableByUid } from '../variables/adhoc/patchGetAdhocFilters'; @@ -98,6 +97,9 @@ interface PreparedRequests { processors: Map; } +// Passthrough processor for secondary requests which don't define a processor. +const passthroughProcessor: ExtraQueryDataProcessor = (_, secondary) => of(secondary); + export class SceneQueryRunner extends SceneObjectBase implements SceneDataProvider { private _querySub?: Unsubscribable; private _dataLayersSub?: Unsubscribable; @@ -111,6 +113,14 @@ export class SceneQueryRunner extends SceneObjectBase implemen private _layerAnnotations?: DataFrame[]; private _resultAnnotations?: DataFrame[]; + // The results of the latest query before it was processed by any extra query providers. + private _unprocessedResults = new ReplaySubject<[PanelData, ...PanelData[]]>(1); + // The subscription to the unprocessed results. + private _unprocessedSub?: Unsubscribable; + // The processors provided by any extra query providers. + // The key is the request ID of the secondary request. + private _processors?: Map; + private _adhocFiltersVar?: AdHocFiltersVariable; private _groupByVar?: GroupByVariable; @@ -139,8 +149,13 @@ export class SceneQueryRunner extends SceneObjectBase implemen for (const provider of providers) { this._subs.add( provider.subscribeToState((n, p) => { - if (provider.shouldRerun(p, n)) { + const shouldRerun = provider.shouldRerun(p, n); + if (shouldRerun === true || shouldRerun === 'queries') { + // don't explicitly run processors here, that's done automatically + // as part of `this.runQueries`. this.runQueries(); + } else if (shouldRerun === 'processors') { + this.runProcessors(); } }) ); @@ -321,6 +336,10 @@ export class SceneQueryRunner extends SceneObjectBase implemen this._dataLayersSub = undefined; } + if (this._unprocessedSub) { + this._unprocessedSub.unsubscribe(); + } + this._timeSub?.unsubscribe(); this._timeSub = undefined; this._timeSubRange = undefined; @@ -368,6 +387,7 @@ export class SceneQueryRunner extends SceneObjectBase implemen this._timeSubRange = timeRange; this._timeSub = timeRange.subscribeToState(() => { this.runWithTimeRange(timeRange); + this.runProcessors(); }); } @@ -375,6 +395,16 @@ export class SceneQueryRunner extends SceneObjectBase implemen const timeRange = sceneGraph.getTimeRange(this); this.subscribeToTimeRangeChanges(timeRange); this.runWithTimeRange(timeRange); + this.runProcessors(); + } + + private runProcessors() { + if (this._unprocessedSub) { + this._unprocessedSub.unsubscribe(); + } + this._unprocessedSub = this._unprocessedResults + .pipe((data) => this.processResults(data)) + .subscribe((data) => this.onDataReceived(data)); } private getMaxDataPoints() { @@ -435,32 +465,33 @@ export class SceneQueryRunner extends SceneObjectBase implemen const runRequest = getRunRequest(); const { primary, secondaries, processors } = this.prepareRequests(timeRange, ds); + this._processors = processors; writeSceneLog('SceneQueryRunner', 'Starting runRequest', this.state.key); - let stream = runRequest(ds, primary); - - if (secondaries.length > 0) { - // Submit all secondary requests in parallel. - const secondaryStreams = secondaries.map((r) => runRequest(ds, r)); - // Create the rxjs operator which will combine the primary and secondary responses - // by calling the correct processor functions provided by the - // extra request providers. - const op = extraQueryProcessingOperator(processors); - // Combine the primary and secondary streams into a single stream, and apply the operator. - stream = forkJoin([stream, ...secondaryStreams]).pipe(op); - } - - stream = stream.pipe( - registerQueryWithController({ + let primaryStream = runRequest(ds, primary) + .pipe(registerQueryWithController({ type: 'data', request: primary, origin: this, cancel: () => this.cancelQuery(), - }) - ); - - this._querySub = stream.subscribe(this.onDataReceived); + })); + + if (secondaries.length === 0) { + this._querySub = primaryStream + .pipe(map((data) => [data] as [PanelData, ...PanelData[]])) + .subscribe((data) => this._unprocessedResults.next(data)); + } else { + const secondaryStreams = secondaries.map((r) => runRequest(ds, r) + .pipe(registerQueryWithController({ + type: 'data', + request: r, + origin: this, + cancel: () => this.cancelQuery(), + }))); + const stream = forkJoin([primaryStream, ...secondaryStreams]); + this._querySub = stream.subscribe((data) => this._unprocessedResults.next(data)); + } } catch (err) { console.error('PanelQueryRunner Error', err); @@ -473,6 +504,25 @@ export class SceneQueryRunner extends SceneObjectBase implemen } } + private processResults(data: Observable<[PanelData, ...PanelData[]]>): Observable { + return data.pipe( + mergeMap(([primary, ...secondaries]: [PanelData, ...PanelData[]]) => { + if (this._processors === undefined || secondaries.length === 0) { + return of([primary]); + } + const processedSecondaries = secondaries.flatMap((s) => { + return this._processors!.get(s.request!.requestId)?.(primary, s) ?? of(s); + }); + return forkJoin([of(primary), ...processedSecondaries]); + }), + map(([primary, ...processedSecondaries]) => ({ + ...primary, + series: [...primary.series, ...processedSecondaries.flatMap((s) => s.series)], + annotations: [...(primary.annotations ?? []), ...processedSecondaries.flatMap((s) => s.annotations ?? [])], + })) + ) + } + public clone(withState?: Partial) { const clone = super.clone(withState); diff --git a/packages/scenes/src/querying/extraQueryProcessingOperator.ts b/packages/scenes/src/querying/extraQueryProcessingOperator.ts deleted file mode 100644 index a6ec9608b..000000000 --- a/packages/scenes/src/querying/extraQueryProcessingOperator.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { PanelData } from '@grafana/data'; -import { forkJoin, of, map, mergeMap, Observable } from 'rxjs'; -import { ExtraQueryDataProcessor } from './ExtraQueryProvider'; - -// Passthrough processor for use with ExtraQuerys. -export const passthroughProcessor: ExtraQueryDataProcessor = (_, secondary) => of(secondary); - -// Factory function which takes a map from request ID to processor functions and -// returns an rxjs operator which operates on an array of panel data responses. -// -// Each secondary response is transformed according to the processor function -// identified by it's request ID. The processor function is passed the primary -// response and the secondary response to be processed. -// -// The output is a single frame with the primary series and all processed -// secondary series combined. -export const extraQueryProcessingOperator = (processors: Map) => - (data: Observable<[PanelData, ...PanelData[]]>) => { - return data.pipe( - mergeMap(([primary, ...secondaries]) => { - const processedSecondaries = secondaries.flatMap((s) => { - return processors.get(s.request!.requestId)?.(primary, s) ?? of(s); - }); - return forkJoin([of(primary), ...processedSecondaries]); - }), - map(([primary, ...processedSecondaries]) => ({ - ...primary, - series: [...primary.series, ...processedSecondaries.flatMap((s) => s.series)], - annotations: [...(primary.annotations ?? []), ...processedSecondaries.flatMap((s) => s.annotations ?? [])], - })) - ); - }