Skip to content

Commit

Permalink
SceneQueryRunner: allow extra queries and processors to be rerun sepa…
Browse files Browse the repository at this point in the history
…rately

Followup from [this comment](#587 (comment))
- this adds the option for ExtraQueryProviders to state that they
only wish to have their processor rerun (probably with different state),
rather than the QueryRunner rerunning both the query _and_ the processor.

This is really useful for some ML-based providers which need to run an
extra query then transform the results, and also include interactivity
such as a slider, but _don't_ need to rerun the query as part of the
interactivity - just the processing.

There are some downsides here, most notably the extra complexity:

- the `ExtraQueryProvider` interface is more flexible but more complex
- the `SceneQueryRunner` needs another subscription and
  `ReplaySubject` in order to be able to re-send the latest
  unprocessed data to the processors again
  - I think this will also increase memory usage?

Perhaps there's a way to do this using transformations instead?
  • Loading branch information
sd2k committed Jun 7, 2024
1 parent 32a9012 commit c62b35d
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 64 deletions.
4 changes: 2 additions & 2 deletions packages/scenes/src/components/SceneTimeRangeCompare.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { sceneGraph } from '../core/sceneGraph';
import { SceneObjectBase } from '../core/SceneObjectBase';
import { SceneComponentProps, SceneObjectState, SceneObjectUrlValues } from '../core/types';
import { DataQueryExtended } from '../querying/SceneQueryRunner';
import { ExtraQueryDescriptor, ExtraQueryDataProcessor, ExtraQueryProvider } from '../querying/ExtraQueryProvider';
import { ExtraQueryDescriptor, ExtraQueryDataProcessor, ExtraQueryProvider, ExtraQueryShouldRerun } from '../querying/ExtraQueryProvider';
import { SceneObjectUrlSyncConfig } from '../services/SceneObjectUrlSyncConfig';
import { getCompareSeriesRefId } from '../utils/getCompareSeriesRefId';
import { parseUrlParam } from '../utils/parseUrlParam';
Expand Down Expand Up @@ -118,7 +118,7 @@ export class SceneTimeRangeCompare
}

// The query runner should rerun the comparison query if the compareWith value has changed.
public shouldRerun(prev: SceneTimeRangeCompareState, next: SceneTimeRangeCompareState): boolean {
public shouldRerun(prev: SceneTimeRangeCompareState, next: SceneTimeRangeCompareState): ExtraQueryShouldRerun {
return prev.compareWith !== next.compareWith;
}

Expand Down
2 changes: 1 addition & 1 deletion packages/scenes/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export { SceneTimeRange } from './core/SceneTimeRange';
export { SceneTimeZoneOverride } from './core/SceneTimeZoneOverride';

export { SceneQueryRunner, type QueryRunnerState } from './querying/SceneQueryRunner';
export { type ExtraQueryDescriptor, type ExtraQueryProvider, type ExtraQueryDataProcessor } from './querying/ExtraQueryProvider';
export { type ExtraQueryDescriptor, type ExtraQueryProvider, type ExtraQueryDataProcessor, type ExtraQueryShouldRerun } from './querying/ExtraQueryProvider';
export { SceneDataLayerSet, SceneDataLayerSetBase } from './querying/SceneDataLayerSet';
export { SceneDataLayerBase } from './querying/layers/SceneDataLayerBase';
export { SceneDataLayerControls } from './querying/layers/SceneDataLayerControls';
Expand Down
16 changes: 14 additions & 2 deletions packages/scenes/src/querying/ExtraQueryProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ export interface ExtraQueryDescriptor {
processor?: ExtraQueryDataProcessor;
}

// Whether extra queries, providers, or neither should be rerun as the result
// of a state change.
//
// Returning `true` or 'queries' will cause the query runner to completely rerun all queries
// _and_ processors.
// Returning 'processors' will avoid rerunning queries, and pass the most
// recent (unprocessed) query results to the processors again for reprocessing. This allows
// the processors to process differently depending on their most recent state, without incurring
// the cost of a query.
// Returning `false` will not rerun queries or processors.
export type ExtraQueryShouldRerun = boolean | 'queries' | 'processors';

// Indicates that this type wants to add extra requests, along with
// optional processing functions, to a query runner.
export interface ExtraQueryProvider<T extends SceneObjectState> extends SceneObjectBase<T> {
Expand All @@ -38,8 +50,8 @@ export interface ExtraQueryProvider<T extends SceneObjectState> extends SceneObj
//
// When the provider's state changes this function will be passed both the previous and the
// next state. The implementation can use this to determine whether the change should trigger
// a rerun of the query or not.
shouldRerun(prev: T, next: T): boolean;
// a rerun of the queries, processors or neither.
shouldRerun(prev: T, next: T): ExtraQueryShouldRerun;
}

export function isExtraQueryProvider(obj: any): obj is ExtraQueryProvider<any> {
Expand Down
62 changes: 57 additions & 5 deletions packages/scenes/src/querying/SceneQueryRunner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import { activateFullSceneTree } from '../utils/test/activateFullSceneTree';
import { SceneDeactivationHandler, SceneObjectState } from '../core/types';
import { LocalValueVariable } from '../variables/variants/LocalValueVariable';
import { SceneObjectBase } from '../core/SceneObjectBase';
import { ExtraQueryDescriptor, ExtraQueryProvider } from './ExtraQueryProvider';
import { ExtraQueryDescriptor, ExtraQueryProvider, ExtraQueryShouldRerun } from './ExtraQueryProvider';

const getDataSourceMock = jest.fn().mockReturnValue({
uid: 'test-uid',
Expand Down Expand Up @@ -1191,6 +1191,48 @@ describe('SceneQueryRunner', () => {

expect(runRequestMock.mock.calls.length).toEqual(2);
});

test('should run extra processors, but not queries, when providers declare it', async () => {
const timeRange = new SceneTimeRange({
from: '2023-08-24T05:00:00.000Z',
to: '2023-08-24T07:00:00.000Z',
});

const queryRunner = new SceneQueryRunner({
queries: [{ refId: 'A' }],
});
const provider = new TestExtraQueryProvider({ foo: 1 }, 'processors');
const scene = new EmbeddedScene({
$timeRange: timeRange,
$data: queryRunner,
controls: [provider],
body: new SceneCanvasText({ text: 'hello' }),
});

// activate the scene, which will also activate the provider
// and the provider will run the extra request
scene.activate();
await new Promise((r) => setTimeout(r, 1));

expect(runRequestMock.mock.calls.length).toEqual(2);
let runRequestCall = runRequestMock.mock.calls[0];
let extraRunRequestCall = runRequestMock.mock.calls[1];
expect(runRequestCall[1].targets[0].refId).toEqual('A');
expect(extraRunRequestCall[1].targets[0].refId).toEqual('Extra');
expect(extraRunRequestCall[1].targets[0].foo).toEqual(1);
expect(queryRunner.state.data?.series[3].fields[0].values[0]).toEqual(1);

// change the state of the provider, which will trigger the activation
// handler to run the processor again. The provider will
// return 'processors' from shouldRun, so we should not see any more queries.
provider.setState({ foo: 2 });
await new Promise((r) => setTimeout(r, 1));

expect(runRequestMock.mock.calls.length).toEqual(2);

// we _should_ see that the processor has rerun and updated the data, however.
expect(queryRunner.state.data?.series[3].fields[0].values[0]).toEqual(2);
});
});

describe('time frame comparison', () => {
Expand Down Expand Up @@ -2308,9 +2350,9 @@ interface TestExtraQueryProviderState extends SceneObjectState {
}

class TestExtraQueryProvider extends SceneObjectBase<TestExtraQueryProviderState> implements ExtraQueryProvider<{}> {
private _shouldRerun: boolean;
private _shouldRerun: ExtraQueryShouldRerun;

public constructor(state: { foo: number }, shouldRerun: boolean) {
public constructor(state: { foo: number }, shouldRerun: ExtraQueryShouldRerun) {
super(state);
this._shouldRerun = shouldRerun;
}
Expand All @@ -2324,11 +2366,21 @@ class TestExtraQueryProvider extends SceneObjectBase<TestExtraQueryProviderState
{ refId: 'Extra', foo: this.state.foo },
],
},
processor: (primary, secondary) => of({ ...primary, ...secondary }),
processor: (primary, secondary) => {
return of({
...primary,
...secondary,
series: [...primary.series, ...secondary.series, {
fields: [{ name: "foo", values: [this.state.foo], config: {}, type: FieldType.number }],
length: 1,
}],
});
},
},
];
}
public shouldRerun(prev: {}, next: {}): boolean {

public shouldRerun(prev: {}, next: {}): ExtraQueryShouldRerun {
return this._shouldRerun;
}
}
94 changes: 72 additions & 22 deletions packages/scenes/src/querying/SceneQueryRunner.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { cloneDeep, isEqual } from 'lodash';
import { forkJoin, ReplaySubject, Unsubscribable } from 'rxjs';
import { forkJoin, map, mergeMap, Observable, of, ReplaySubject, Unsubscribable } from 'rxjs';

import { DataQuery, DataSourceRef, LoadingState } from '@grafana/schema';

Expand Down Expand Up @@ -36,7 +36,6 @@ import { VariableValueRecorder } from '../variables/VariableValueRecorder';
import { emptyPanelData } from '../core/SceneDataNode';
import { getClosest } from '../core/sceneGraph/utils';
import { isExtraQueryProvider, ExtraQueryDataProcessor, ExtraQueryProvider } from './ExtraQueryProvider';
import { passthroughProcessor, extraQueryProcessingOperator } from './extraQueryProcessingOperator';
import { filterAnnotations } from './layers/annotations/filterAnnotations';
import { getEnrichedDataRequest } from './getEnrichedDataRequest';
import { findActiveAdHocFilterVariableByUid } from '../variables/adhoc/patchGetAdhocFilters';
Expand Down Expand Up @@ -98,6 +97,9 @@ interface PreparedRequests {
processors: Map<string, ExtraQueryDataProcessor>;
}

// Passthrough processor for secondary requests which don't define a processor.
const passthroughProcessor: ExtraQueryDataProcessor = (_, secondary) => of(secondary);

export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implements SceneDataProvider {
private _querySub?: Unsubscribable;
private _dataLayersSub?: Unsubscribable;
Expand All @@ -111,6 +113,14 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
private _layerAnnotations?: DataFrame[];
private _resultAnnotations?: DataFrame[];

// The results of the latest query before it was processed by any extra query providers.
private _unprocessedResults = new ReplaySubject<[PanelData, ...PanelData[]]>(1);
// The subscription to the unprocessed results.
private _unprocessedSub?: Unsubscribable;
// The processors provided by any extra query providers.
// The key is the request ID of the secondary request.
private _processors?: Map<string, ExtraQueryDataProcessor>;

private _adhocFiltersVar?: AdHocFiltersVariable;
private _groupByVar?: GroupByVariable;

Expand Down Expand Up @@ -139,8 +149,13 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
for (const provider of providers) {
this._subs.add(
provider.subscribeToState((n, p) => {
if (provider.shouldRerun(p, n)) {
const shouldRerun = provider.shouldRerun(p, n);
if (shouldRerun === true || shouldRerun === 'queries') {
// don't explicitly run processors here, that's done automatically
// as part of `this.runQueries`.
this.runQueries();
} else if (shouldRerun === 'processors') {
this.runProcessors();
}
})
);
Expand Down Expand Up @@ -321,6 +336,10 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
this._dataLayersSub = undefined;
}

if (this._unprocessedSub) {
this._unprocessedSub.unsubscribe();
}

this._timeSub?.unsubscribe();
this._timeSub = undefined;
this._timeSubRange = undefined;
Expand Down Expand Up @@ -368,13 +387,24 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
this._timeSubRange = timeRange;
this._timeSub = timeRange.subscribeToState(() => {
this.runWithTimeRange(timeRange);
this.runProcessors();
});
}

public runQueries() {
const timeRange = sceneGraph.getTimeRange(this);
this.subscribeToTimeRangeChanges(timeRange);
this.runWithTimeRange(timeRange);
this.runProcessors();
}

private runProcessors() {
if (this._unprocessedSub) {
this._unprocessedSub.unsubscribe();
}
this._unprocessedSub = this._unprocessedResults
.pipe((data) => this.processResults(data))
.subscribe((data) => this.onDataReceived(data));
}

private getMaxDataPoints() {
Expand Down Expand Up @@ -435,32 +465,33 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen

const runRequest = getRunRequest();
const { primary, secondaries, processors } = this.prepareRequests(timeRange, ds);
this._processors = processors;

writeSceneLog('SceneQueryRunner', 'Starting runRequest', this.state.key);

let stream = runRequest(ds, primary);

if (secondaries.length > 0) {
// Submit all secondary requests in parallel.
const secondaryStreams = secondaries.map((r) => runRequest(ds, r));
// Create the rxjs operator which will combine the primary and secondary responses
// by calling the correct processor functions provided by the
// extra request providers.
const op = extraQueryProcessingOperator(processors);
// Combine the primary and secondary streams into a single stream, and apply the operator.
stream = forkJoin([stream, ...secondaryStreams]).pipe(op);
}

stream = stream.pipe(
registerQueryWithController({
let primaryStream = runRequest(ds, primary)
.pipe(registerQueryWithController({
type: 'data',
request: primary,
origin: this,
cancel: () => this.cancelQuery(),
})
);

this._querySub = stream.subscribe(this.onDataReceived);
}));

if (secondaries.length === 0) {
this._querySub = primaryStream
.pipe(map((data) => [data] as [PanelData, ...PanelData[]]))
.subscribe((data) => this._unprocessedResults.next(data));
} else {
const secondaryStreams = secondaries.map((r) => runRequest(ds, r)
.pipe(registerQueryWithController({
type: 'data',
request: r,
origin: this,
cancel: () => this.cancelQuery(),
})));
const stream = forkJoin([primaryStream, ...secondaryStreams]);
this._querySub = stream.subscribe((data) => this._unprocessedResults.next(data));
}
} catch (err) {
console.error('PanelQueryRunner Error', err);

Expand All @@ -473,6 +504,25 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
}
}

private processResults(data: Observable<[PanelData, ...PanelData[]]>): Observable<PanelData> {
return data.pipe(
mergeMap(([primary, ...secondaries]: [PanelData, ...PanelData[]]) => {
if (this._processors === undefined || secondaries.length === 0) {
return of([primary]);
}
const processedSecondaries = secondaries.flatMap((s) => {
return this._processors!.get(s.request!.requestId)?.(primary, s) ?? of(s);
});
return forkJoin([of(primary), ...processedSecondaries]);
}),
map(([primary, ...processedSecondaries]) => ({
...primary,
series: [...primary.series, ...processedSecondaries.flatMap((s) => s.series)],
annotations: [...(primary.annotations ?? []), ...processedSecondaries.flatMap((s) => s.annotations ?? [])],
}))
)
}

public clone(withState?: Partial<QueryRunnerState>) {
const clone = super.clone(withState);

Expand Down
32 changes: 0 additions & 32 deletions packages/scenes/src/querying/extraQueryProcessingOperator.ts

This file was deleted.

0 comments on commit c62b35d

Please sign in to comment.