Skip to content

Commit

Permalink
Endblock feature (#2064)
Browse files Browse the repository at this point in the history
* add endBlock to datasources

* improve log messages

* code cleanup and update dictionary queries

* add unit tests

* add ds endBlock to queriesMap

* fix queriesMap

* accomodate endBlock values in ds height map

* remove unused import

* fix endBlock calculation is ds blockHeightMap

* fix tests

* overwrite active datasources from subsequent projects

* fix overwriting of active dataSources between project upgrades

* add empty datasource heights to bypassBlocks

* check for available datasources from dsMap
  • Loading branch information
guplersaxanoid authored Oct 29, 2023
1 parent 65b1636 commit a25ec50
Show file tree
Hide file tree
Showing 12 changed files with 413 additions and 36 deletions.
12 changes: 3 additions & 9 deletions packages/common-substrate/src/project/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: GPL-3.0

import {RegisteredTypes, RegistryTypes, OverrideModuleType, OverrideBundleType} from '@polkadot/types/types';
import {BlockFilterImpl, ProcessorImpl} from '@subql/common';
import {BaseDataSource, BlockFilterImpl, ProcessorImpl} from '@subql/common';
import {
SubstrateBlockFilter,
SubstrateBlockHandler,
Expand All @@ -24,7 +24,6 @@ import {
IsArray,
IsBoolean,
IsEnum,
IsInt,
IsOptional,
IsString,
IsObject,
Expand Down Expand Up @@ -149,15 +148,12 @@ export class CustomMapping implements BaseMapping<SubstrateCustomHandler> {
file: string;
}

export class RuntimeDataSourceBase implements SubstrateRuntimeDatasource {
export class RuntimeDataSourceBase extends BaseDataSource implements SubstrateRuntimeDatasource {
@IsEnum(SubstrateDatasourceKind, {groups: [SubstrateDatasourceKind.Runtime]})
kind: SubstrateDatasourceKind.Runtime;
@Type(() => RuntimeMapping)
@ValidateNested()
mapping: RuntimeMapping;
@IsOptional()
@IsInt()
startBlock?: number;
}

export class FileReferenceImpl implements FileReference {
Expand All @@ -166,16 +162,14 @@ export class FileReferenceImpl implements FileReference {
}

export class CustomDataSourceBase<K extends string, M extends CustomMapping, O = any>
extends BaseDataSource
implements SubstrateCustomDatasource<K, M, O>
{
@IsString()
kind: K;
@Type(() => CustomMapping)
@ValidateNested()
mapping: M;
@IsOptional()
@IsInt()
startBlock?: number;
@Type(() => FileReferenceImpl)
@ValidateNested({each: true})
assets: Map<string, FileReference>;
Expand Down
16 changes: 15 additions & 1 deletion packages/common/src/project/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import fs from 'fs';
import os from 'os';
import path from 'path';
import {FileReference, MultichainProjectManifest, ProjectRootAndManifest} from '@subql/types-core';
import {BaseDataSource, FileReference, MultichainProjectManifest, ProjectRootAndManifest} from '@subql/types-core';
import {
registerDecorator,
validateSync,
Expand Down Expand Up @@ -295,3 +295,17 @@ export class FileReferenceImp<T> implements ValidatorConstraintInterface {
}

export const tsProjectYamlPath = (tsManifestEntry: string) => tsManifestEntry.replace('.ts', '.yaml');

@ValidatorConstraint({async: false})
export class IsEndBlockGreater implements ValidatorConstraintInterface {
validate(endBlock: number, args: ValidationArguments) {
const object = args.object as BaseDataSource;
return object.startBlock !== undefined && object.endBlock !== undefined
? object.endBlock >= object.startBlock
: true;
}

defaultMessage(args: ValidationArguments) {
return 'End block must be greater than or equal to start block';
}
}
24 changes: 22 additions & 2 deletions packages/common/src/project/versioned/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,19 @@

import {FileReference, ParentProject, Processor} from '@subql/types-core';
import {plainToInstance, Type} from 'class-transformer';
import {Allow, Equals, IsObject, IsOptional, IsString, ValidateNested, validateSync} from 'class-validator';
import {
Allow,
Equals,
IsInt,
IsObject,
IsOptional,
IsString,
Validate,
ValidateNested,
validateSync,
} from 'class-validator';
import yaml from 'js-yaml';
import {toJsonObject} from '../utils';
import {IsEndBlockGreater, toJsonObject} from '../utils';
import {ParentProjectModel} from './v1_0_0/models';

export abstract class ProjectManifestBaseImpl<D extends BaseDeploymentV1_0_0> {
Expand Down Expand Up @@ -78,3 +88,13 @@ export class BaseDeploymentV1_0_0 {
});
}
}

export class BaseDataSource {
@IsOptional()
@IsInt()
startBlock?: number;
@Validate(IsEndBlockGreater)
@IsOptional()
@IsInt()
endBlock?: number;
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS> implements IBloc
// Flush all data from cache and wait
await this.storeCacheService.flushCache(false, true);
}

if (!this.projectService.hasDataSourcesAfterHeight(height)) {
logger.info(`All data sources have been processed up to block number ${height}. Exiting gracefully...`);
await this.storeCacheService.flushCache(false, true);
process.exit(0);
}
}

/**
Expand Down
8 changes: 3 additions & 5 deletions packages/node-core/src/indexer/dictionary.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ export class DictionaryService {
dataSources: BlockHeightMap<DS[]>,
buildDictionaryQueryEntries: (dataSources: DS[]) => DictionaryQueryEntry[]
): void {
this.queriesMap = dataSources.map((dataSources) => buildDictionaryQueryEntries(dataSources));
this.queriesMap = dataSources.map(buildDictionaryQueryEntries);
}

async scopedDictionaryEntries(
Expand All @@ -360,10 +360,8 @@ export class DictionaryService {
const queryDetails = this.queriesMap?.getDetails(startBlockHeight);
const queryEntry: DictionaryQueryEntry[] = queryDetails?.value ?? [];

// Update end block if query changes
if (queryDetails?.endHeight && queryDetails?.endHeight < queryEndBlock) {
queryEndBlock = queryDetails?.endHeight;
}
queryEndBlock =
queryDetails?.endHeight && queryDetails?.endHeight < queryEndBlock ? queryDetails.endHeight : queryEndBlock;

const dict = await this.getDictionary(startBlockHeight, queryEndBlock, scaledBatchSize, queryEntry);

Expand Down
61 changes: 58 additions & 3 deletions packages/node-core/src/indexer/fetch.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import {EventEmitter2} from '@nestjs/event-emitter';
import {SchedulerRegistry} from '@nestjs/schedule';
import {BaseDataSource, BaseHandler, BaseMapping, DictionaryQueryEntry, IProjectNetworkConfig} from '@subql/types-core';
import {range} from 'lodash';
import {
BlockDispatcher,
delay,
Expand Down Expand Up @@ -80,7 +81,6 @@ const mockDs: BaseDataSource = {

const projectService = {
getStartBlockFromDataSources: jest.fn(() => mockDs.startBlock),
getDataSourcesMap: jest.fn(() => new BlockHeightMap(new Map([[1, [mockDs, mockDs]]]))), // TODO
getAllDataSources: jest.fn(() => [mockDs]),
} as any as IProjectService<any>;

Expand Down Expand Up @@ -147,6 +147,10 @@ describe('Fetch Service', () => {
eventEmitter,
schedulerRegistry
);

(fetchService as any).projectService.getDataSourcesMap = jest.fn(
() => new BlockHeightMap(new Map([[1, [mockDs, mockDs]]]))
);
});

const enableDictionary = () => {
Expand Down Expand Up @@ -176,6 +180,58 @@ describe('Fetch Service', () => {
expect(preHookLoopSpy).toHaveBeenCalled();
});

it('adds bypassBlocks for empty datasources', async () => {
(fetchService as any).projectService.getDataSourcesMap = jest.fn().mockReturnValueOnce(
new BlockHeightMap(
new Map([
[
1,
[
{startBlock: 1, endBlock: 300},
{startBlock: 1, endBlock: 100},
],
],
[
10,
[
{startBlock: 1, endBlock: 300},
{startBlock: 1, endBlock: 100},
{startBlock: 10, endBlock: 20},
],
],
[
21,
[
{startBlock: 1, endBlock: 300},
{startBlock: 1, endBlock: 100},
],
],
[
50,
[
{startBlock: 1, endBlock: 300},
{startBlock: 1, endBlock: 100},
{startBlock: 50, endBlock: 200},
],
],
[
101,
[
{startBlock: 1, endBlock: 300},
{startBlock: 50, endBlock: 200},
],
],
[201, [{startBlock: 1, endBlock: 300}]],
[301, []],
[500, [{startBlock: 500}]],
])
)
);

await fetchService.init(1);
expect((fetchService as any).bypassBlocks).toEqual(range(301, 500));
});

it('checks chain heads at an interval', async () => {
const finalizedSpy = jest.spyOn(fetchService, 'getFinalizedHeight');
const bestSpy = jest.spyOn(fetchService, 'getBestHeight');
Expand Down Expand Up @@ -308,8 +364,7 @@ describe('Fetch Service', () => {
});

it('skips bypassBlocks', async () => {
// This doesn't get set in init because networkConfig doesn't define it, so we can set it
(fetchService as any).bypassBlocks = [3];
(fetchService as any).networkConfig.bypassBlocks = [3];

const enqueueBlocksSpy = jest.spyOn(blockDispatcher, 'enqueueBlocks');

Expand Down
23 changes: 22 additions & 1 deletion packages/node-core/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,31 @@ export abstract class BaseFetchService<
);
}

private updateBypassBlocksFromDatasources(): void {
const datasources = this.projectService.getDataSourcesMap().getAll();

const heights = Array.from(datasources.keys());

for (let i = 0; i < heights.length - 1; i++) {
const currentHeight = heights[i];
const nextHeight = heights[i + 1];

const currentDS = datasources.get(currentHeight);
// If the value for the current height is an empty array, then it's a gap
if (currentDS && currentDS.length === 0) {
this.bypassBlocks.push(...range(currentHeight, nextHeight));
}
}
}

async init(startHeight: number): Promise<void> {
this.bypassBlocks = [];

if (this.networkConfig?.bypassBlocks !== undefined) {
this.bypassBlocks = transformBypassBlocks(this.networkConfig.bypassBlocks).filter((blk) => blk >= startHeight);
}

this.updateBypassBlocksFromDatasources();
const interval = await this.getChainInterval();

await Promise.all([this.getFinalizedBlockHead(), this.getBestBlockHead()]);
Expand Down Expand Up @@ -243,7 +264,7 @@ export abstract class BaseFetchService<
startBlockHeight >= this.dictionaryService.startHeight &&
startBlockHeight < this.latestFinalizedHeight
) {
/* queryEndBlock needs to be limited by the latest height.
/* queryEndBlock needs to be limited by the latest height or the maximum value of endBlock in datasources.
* Dictionaries could be in the future depending on if they index unfinalized blocks or the node is using an RPC endpoint that is behind.
*/
const queryEndBlock = Math.min(startBlockHeight + DICTIONARY_MAX_QUERY_SIZE, this.latestFinalizedHeight);
Expand Down
16 changes: 13 additions & 3 deletions packages/node-core/src/indexer/indexer.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,12 @@ export abstract class BaseIndexerManager<
private filterDataSources(nextProcessingHeight: number, dataSources: DS[]): DS[] {
let filteredDs: DS[];

filteredDs = dataSources.filter((ds) => ds.startBlock !== undefined && ds.startBlock <= nextProcessingHeight);
filteredDs = dataSources.filter(
(ds) =>
ds.startBlock !== undefined &&
ds.startBlock <= nextProcessingHeight &&
(ds.endBlock ?? Number.MAX_SAFE_INTEGER) >= nextProcessingHeight
);

// perform filter for custom ds
filteredDs = filteredDs.filter((ds) => {
Expand All @@ -146,8 +151,13 @@ export abstract class BaseIndexerManager<
private assertDataSources(ds: DS[], blockHeight: number) {
if (!ds.length) {
logger.error(
`Your start block of all the datasources is greater than the current indexed block height in your database. Either change your startBlock (project.yaml) to <= ${blockHeight}
or delete your database and start again from the currently specified startBlock`
`Issue detected with data sources: \n
Either all data sources have a 'startBlock' greater than the current indexed block height (${blockHeight}),
or they have an 'endBlock' less than the current block. \n
Solution options: \n
1. Adjust 'startBlock' in project.yaml to be less than or equal to ${blockHeight},
and 'endBlock' to be greater than or equal to ${blockHeight}. \n
2. Delete your database and start again with the currently specified 'startBlock' and 'endBlock'.`
);
process.exit(1);
}
Expand Down
Loading

0 comments on commit a25ec50

Please sign in to comment.