Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi chain rewind service #2673

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/node-core/src/blockchain.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export interface IBlockchainService<
// Unfinalized blocks
getHeaderForHash(hash: string): Promise<Header>;
getHeaderForHeight(height: number): Promise<Header>;
getRequiredHeaderForHeight(height: number): Promise<Required<Header>>;

// Dynamic Ds sevice
/**
Expand Down
14 changes: 14 additions & 0 deletions packages/node-core/src/db/db.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import {DynamicModule, Global} from '@nestjs/common';
import {Sequelize, Options as SequelizeOption} from '@subql/x-sequelize';
import {PoolConfig} from 'pg';
import {NodeConfig} from '../configure/NodeConfig';
import {getLogger} from '../logger';
import {exitWithError} from '../process';
Expand Down Expand Up @@ -90,6 +91,19 @@ export async function establishNewSequelize(nodeConfig: NodeConfig): Promise<Seq
return sequelizeFactory(buildSequelizeOptions(nodeConfig, DEFAULT_DB_OPTION))();
}

export function getPgPoolConfig(nodeConfig: NodeConfig): PoolConfig {
const sequelizeOptions = buildSequelizeOptions(nodeConfig, DEFAULT_DB_OPTION);
return {
user: sequelizeOptions.username,
password: sequelizeOptions.password,
host: sequelizeOptions.host,
port: sequelizeOptions.port,
database: sequelizeOptions.database,
max: 1,
ssl: sequelizeOptions.ssl,
};
}

@Global()
export class DbModule {
static forRootWithConfig(nodeConfig: NodeConfig, option: DbOption = DEFAULT_DB_OPTION): DynamicModule {
Expand Down
4 changes: 4 additions & 0 deletions packages/node-core/src/db/sync-helper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,8 @@ describe('sync helper test', () => {
);
}, 10_000);
});

describe('rewind lock', () => {
// TODO
});
});
39 changes: 39 additions & 0 deletions packages/node-core/src/db/sync-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
Utils,
} from '@subql/x-sequelize';
import {ModelAttributeColumnReferencesOptions, ModelIndexesOptions} from '@subql/x-sequelize/types/model';
import {MultiChainRewindEvent} from '../events';
import {EnumType} from '../utils';
import {formatAttributes, generateIndexName, modelToTableName} from './sequelizeUtil';
// eslint-disable-next-line @typescript-eslint/no-var-requires
Expand Down Expand Up @@ -297,6 +298,44 @@ export function createSchemaTriggerFunction(schema: string): string {
$$ LANGUAGE plpgsql;`;
}

export function createRewindTrigger(schema: string): string {
const triggerName = hashName(schema, 'rewind_trigger', '_global');

return `
CREATE TRIGGER "${triggerName}"
AFTER INSERT OR UPDATE OR DELETE
ON "${schema}"."_global"
FOR EACH ROW
EXECUTE FUNCTION "${schema}".rewind_notification();
`;
}

export function createRewindTriggerFunction(schema: string): string {
const triggerName = hashName(schema, 'rewind_trigger', '_global');

return `
CREATE OR REPLACE FUNCTION "${schema}".rewind_notification()
RETURNS trigger AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.Rewind}');
END IF;

-- During a rollback, there is a chain that needs to be rolled back to an earlier height.
IF TG_OP = 'UPDATE' AND (NEW.value ->> 'timestamp')::BIGINT < (OLD.value ->> 'timestamp')::BIGINT THEN
PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.RewindTimestampDecreased}');
END IF;

IF TG_OP = 'DELETE' THEN
PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.RewindComplete}');
END IF;

RETURN NULL;
END;
$$ LANGUAGE plpgsql;
`;
}

export function getExistedIndexesQuery(schema: string): string {
return `SELECT indexname FROM pg_indexes WHERE schemaname = '${schema}'`;
}
Expand Down
10 changes: 10 additions & 0 deletions packages/node-core/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ export enum PoiEvent {
PoiTarget = 'poi_target',
}

export enum MultiChainRewindEvent {
Rewind = 'rewind',
RewindComplete = 'rewind_complete',
RewindTimestampDecreased = 'timestamp_decreased',
}

export interface RewindPayload {
success: boolean;
height: number;
Expand Down Expand Up @@ -61,3 +67,7 @@ export interface NetworkMetadataPayload {
specName: string;
genesisHash: string;
}

export interface MultiChainRewindPayload {
height: number;
}
52 changes: 52 additions & 0 deletions packages/node-core/src/indexer/entities/GlobalData.entity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {blake2AsHex} from '@subql/utils';
import {BuildOptions, DataTypes, Model, Sequelize} from '@subql/x-sequelize';

export const RewindTimestampKeyPrefix = 'rewindTimestamp';
export const RewindLockKey = 'rewindLock';
export type RewindTimestampKey = `${typeof RewindTimestampKeyPrefix}_${string}`;

export type RewindLockInfo = {
/** Timestamp to rewind to. */
timestamp: number;
chainNum: number;
};
export interface GlobalDataKeys {
rewindLock: RewindLockInfo;
[key: RewindTimestampKey]: number;
}

export interface GlobalData<k extends keyof GlobalDataKeys = keyof GlobalDataKeys> {
key: k;
value: GlobalDataKeys[k];
}

interface GlobalDataEntity extends Model<GlobalData>, GlobalData {}

export type GlobalDataRepo = typeof Model & {
new (values?: unknown, options?: BuildOptions): GlobalDataEntity;
};

export function GlobalDataFactory(sequelize: Sequelize, schema: string): GlobalDataRepo {
const tableName = '_global';

return <GlobalDataRepo>sequelize.define(
tableName,
{
key: {
type: DataTypes.STRING,
primaryKey: true,
},
value: {
type: DataTypes.JSONB,
},
},
{freezeTableName: true, schema: schema}
);
}

export function generateRewindTimestampKey(chainId: string): RewindTimestampKey {
return `${RewindTimestampKeyPrefix}_${blake2AsHex(chainId)})`.substring(0, 63) as RewindTimestampKey;
}
1 change: 1 addition & 0 deletions packages/node-core/src/indexer/entities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@

export * from './Poi.entity';
export * from './Metadata.entity';
export * from './GlobalData.entity';
25 changes: 21 additions & 4 deletions packages/node-core/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@

import assert from 'assert';
import {Inject, Injectable, OnApplicationShutdown} from '@nestjs/common';
import {EventEmitter2} from '@nestjs/event-emitter';
import {EventEmitter2, OnEvent} from '@nestjs/event-emitter';
import {SchedulerRegistry} from '@nestjs/schedule';
import {BaseDataSource} from '@subql/types-core';
import {range} from 'lodash';
import {IBlockchainService} from '../blockchain.service';
import {NodeConfig} from '../configure';
import {IndexerEvent} from '../events';
import {EventPayload, IndexerEvent, MultiChainRewindEvent, MultiChainRewindPayload} from '../events';
import {getLogger} from '../logger';
import {delay, filterBypassBlocks, getModulos} from '../utils';
import {IBlockDispatcher} from './blockDispatcher';
import {mergeNumAndBlocksToNums} from './dictionary';
import {DictionaryService} from './dictionary/dictionary.service';
import {mergeNumAndBlocks} from './dictionary/utils';
import {IMultiChainHandler, MultiChainRewindService, RewindStatus} from './multiChainRewind.service';
import {IStoreModelProvider} from './storeModelProvider';
import {BypassBlocks, IBlock, IProjectService} from './types';
import {IUnfinalizedBlocksServiceUtil} from './unfinalizedBlocks.service';
Expand All @@ -24,7 +25,7 @@ const logger = getLogger('FetchService');

@Injectable()
export class FetchService<DS extends BaseDataSource, B extends IBlockDispatcher<FB>, FB>
implements OnApplicationShutdown
implements OnApplicationShutdown, IMultiChainHandler
{
private _latestBestHeight?: number;
private _latestFinalizedHeight?: number;
Expand All @@ -39,7 +40,8 @@ export class FetchService<DS extends BaseDataSource, B extends IBlockDispatcher<
private schedulerRegistry: SchedulerRegistry,
@Inject('IUnfinalizedBlocksService') private unfinalizedBlocksService: IUnfinalizedBlocksServiceUtil,
@Inject('IStoreModelProvider') private storeModelProvider: IStoreModelProvider,
@Inject('IBlockchainService') private blockchainSevice: IBlockchainService<DS>
@Inject('IBlockchainService') private blockchainSevice: IBlockchainService<DS>,
private multiChainRewindService: MultiChainRewindService
) {}

private get latestBestHeight(): number {
Expand Down Expand Up @@ -196,6 +198,14 @@ export class FetchService<DS extends BaseDataSource, B extends IBlockDispatcher<
// Update the target height, this happens here to stay in sync with the rest of indexing
void this.storeModelProvider.metadata.set('targetHeight', latestHeight);

// If we're rewinding, we should wait until it's done
const multiChainStatus = this.multiChainRewindService.getStatus();
if (RewindStatus.Normal !== multiChainStatus) {
logger.info(`Wait for all chains to complete rewind, current chainId: ${this.multiChainRewindService.chainId}`);
await delay(3);
continue;
}

// This could be latestBestHeight, dictionary should never include finalized blocks
// TODO add buffer so dictionary not used when project synced
if (startBlockHeight < this.latestBestHeight - scaledBatchSize) {
Expand Down Expand Up @@ -383,4 +393,11 @@ export class FetchService<DS extends BaseDataSource, B extends IBlockDispatcher<
this.updateDictionary();
this.blockDispatcher.flushQueue(blockHeight);
}

@OnEvent(MultiChainRewindEvent.Rewind)
@OnEvent(MultiChainRewindEvent.RewindTimestampDecreased)
handleMultiChainRewindEvent(payload: MultiChainRewindPayload) {
logger.info(`Received rewind event, height: ${payload.height}`);
this.resetForNewDs(payload.height);
}
}
1 change: 1 addition & 0 deletions packages/node-core/src/indexer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ export * from './indexer.manager';
export * from './ds-processor.service';
export * from './unfinalizedBlocks.service';
export * from './monitor.service';
export * from './multiChainRewind.service';
export * from './core.module';
Loading
Loading