diff --git a/packages/node-core/src/blockchain.service.ts b/packages/node-core/src/blockchain.service.ts index 419e431801..23f6b9614a 100644 --- a/packages/node-core/src/blockchain.service.ts +++ b/packages/node-core/src/blockchain.service.ts @@ -58,6 +58,7 @@ export interface IBlockchainService< // Unfinalized blocks getHeaderForHash(hash: string): Promise
; getHeaderForHeight(height: number): Promise
; + getRequiredHeaderForHeight(height: number): Promise>; // Dynamic Ds sevice /** diff --git a/packages/node-core/src/db/sync-helper.ts b/packages/node-core/src/db/sync-helper.ts index b40ded6a2a..6e93bb1de0 100644 --- a/packages/node-core/src/db/sync-helper.ts +++ b/packages/node-core/src/db/sync-helper.ts @@ -17,7 +17,6 @@ import { } from '@subql/x-sequelize'; import {ModelAttributeColumnReferencesOptions, ModelIndexesOptions} from '@subql/x-sequelize/types/model'; import {MultiChainRewindEvent} from '../events'; -import {RewindLockKey} from '../indexer'; import {EnumType} from '../utils'; import {formatAttributes, generateIndexName, modelToTableName} from './sequelizeUtil'; // eslint-disable-next-line @typescript-eslint/no-var-requires @@ -307,8 +306,8 @@ export function createRewindTrigger(schema: string): string { AFTER INSERT OR UPDATE OR DELETE ON "${schema}"."_global" FOR EACH ROW - WHEN ( new.key = '${RewindLockKey}') - EXECUTE FUNCTION "${schema}".rewind_notification();`; + EXECUTE FUNCTION "${schema}".rewind_notification(); + `; } export function createRewindTriggerFunction(schema: string): string { @@ -323,7 +322,7 @@ export function createRewindTriggerFunction(schema: string): string { END IF; -- During a rollback, there is a chain that needs to be rolled back to an earlier height. - IF TG_OP = 'UPDATE' AND (NEW.value ->> 'timestamp')::int < (OLD.value ->> 'timestamp')::int THEN + IF TG_OP = 'UPDATE' AND (NEW.value ->> 'timestamp')::BIGINT < (OLD.value ->> 'timestamp')::BIGINT THEN PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.RewindTimestampDecreased}'); END IF; diff --git a/packages/node-core/src/indexer/entities/GlobalData.entity.ts b/packages/node-core/src/indexer/entities/GlobalData.entity.ts index c7c7456de4..1288751ae5 100644 --- a/packages/node-core/src/indexer/entities/GlobalData.entity.ts +++ b/packages/node-core/src/indexer/entities/GlobalData.entity.ts @@ -11,8 +11,7 @@ export type RewindTimestampKey = `${typeof RewindTimestampKeyPrefix}_${string}`; export type RewindLockInfo = { /** Timestamp to rewind to. */ timestamp: number; - /** Number of additional chains to rewind. */ - rewindNum: number; + chainNum: number; }; export interface GlobalDataKeys { rewindLock: RewindLockInfo; diff --git a/packages/node-core/src/indexer/multiChainRewind.service.ts b/packages/node-core/src/indexer/multiChainRewind.service.ts index 085b5aeda7..e918b49028 100644 --- a/packages/node-core/src/indexer/multiChainRewind.service.ts +++ b/packages/node-core/src/indexer/multiChainRewind.service.ts @@ -10,7 +10,7 @@ import dayjs from 'dayjs'; import {Pool, PoolClient} from 'pg'; import {IBlockchainService} from '../blockchain.service'; import {NodeConfig} from '../configure'; -import {getPgPoolConfig} from '../db'; +import {createRewindTrigger, createRewindTriggerFunction, getPgPoolConfig, getTriggers} from '../db'; import {MultiChainRewindEvent, MultiChainRewindPayload} from '../events'; import {getLogger} from '../logger'; import { @@ -62,8 +62,9 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl private _status: RewindStatus = RewindStatus.Normal; private _chainId?: string; private _dbSchema?: string; - waitRewindHeader?: Header; + private _rewindTriggerName?: string; private pgListener?: PoolClient; + waitRewindHeader?: Required
; constructor( private nodeConfig: NodeConfig, private eventEmitter: EventEmitter2, @@ -89,6 +90,14 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl private set dbSchema(dbSchema: string) { this._dbSchema = dbSchema; } + private set rewindTriggerName(rewindTriggerName: string) { + this._rewindTriggerName = rewindTriggerName; + } + + get rewindTriggerName(): string { + assert(this._rewindTriggerName, 'rewindTriggerName is not set'); + return this._rewindTriggerName; + } private set status(status: RewindStatus) { this._status = status; @@ -105,8 +114,16 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl async init(chainId: string, dbSchema: string, reindex: (targetHeader: Header) => Promise) { this.chainId = chainId; this.dbSchema = dbSchema; + this.rewindTriggerName = hashName(this.dbSchema, 'rewind_trigger', '_global'); if (this.storeService.historical === 'timestamp') { + await this.sequelize.query(`${createRewindTriggerFunction(this.dbSchema)}`); + + const rewindTriggers = await getTriggers(this.sequelize, this.rewindTriggerName); + if (rewindTriggers.length === 0) { + await this.sequelize.query(`${createRewindTrigger(this.dbSchema)}`); + } + // Register a listener and create a schema notification sending function. await this.registerPgListener(); @@ -155,7 +172,8 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl }); }); - await this.pgListener.query(`LISTEN "${hashName(this.dbSchema, 'rewind_trigger', '_global')}"`); + await this.pgListener.query(`LISTEN "${this.rewindTriggerName}"`); + logger.info(`Register rewind listener success, chainId: ${this.chainId}`); // Check whether the current state is in rollback. const {rewindLock, rewindTimestamp} = await this.getGlobalRewindStatus(); @@ -222,7 +240,7 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl SET "key" = EXCLUDED."key", "value" = EXCLUDED."value", "updatedAt" = EXCLUDED."updatedAt" - WHERE "key" = '${RewindLockKey}' AND ("value"->>'timestamp')::int > ${rewindTimestamp}`, + WHERE "${globalTable}"."key" = '${RewindLockKey}' AND ("${globalTable}"."value"->>'timestamp')::BIGINT > ${rewindTimestamp}`, { type: QueryTypes.INSERT, transaction: tx, @@ -262,10 +280,10 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl SET value = jsonb_set( value, '{chainNum}', - to_jsonb(COALESCE((value ->> 'chainNum')::int, 0) - 1), + to_jsonb(COALESCE(("${globalTable}"."value" ->> 'chainNum')::BIGINT, 0) - 1), false ) - WHERE "key" = '${RewindLockKey}' AND ("value"->>'timestamp')::int = ${rewindTimestamp} + WHERE "${globalTable}"."key" = '${RewindLockKey}' AND ("${globalTable}"."value"->>'timestamp')::BIGINT = ${rewindTimestamp} RETURNING value`, { type: QueryTypes.SELECT, @@ -280,7 +298,7 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl ); return 0; } - const rewindNum = results[0].value.rewindNum; + const chainNum = results[0].value.chainNum; const rewindTimestampKey = generateRewindTimestampKey(this.chainId); const [affectedCount] = await this.storeService.globalDataRepo.update( @@ -298,32 +316,32 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl `not found rewind timestamp key in global data, chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}` ); - if (rewindNum === 0) { + if (chainNum === 0) { await this.storeService.globalDataRepo.destroy({where: {key: RewindLockKey}, transaction: tx}); } // The current chain has completed the rewind, and we still need to wait for other chains to finish. // When fully synchronized, set the status back to normal by pgListener. this.status = RewindStatus.WaitOtherChain; - logger.info(`Rewind success chainId: ${JSON.stringify({rewindNum, chainId: this.chainId, rewindTimestamp})}`); - return rewindNum; + logger.info(`Rewind success chainId: ${JSON.stringify({chainNum, chainId: this.chainId, rewindTimestamp})}`); + return chainNum; } /** * Get the block header closest to the given timestamp * @param timestamp To find the block closest to a given timestamp - * @returns undefined if the timestamp is less than the first block timestamp + * @returns */ - async getHeaderByBinarySearch(timestamp: Header['timestamp']): Promise
{ + async getHeaderByBinarySearch(timestamp: Header['timestamp']): Promise> { assert(timestamp, 'getHeaderByBinarySearch `timestamp` is required'); let left = 0; let {height: right} = await this.storeService.getLastProcessedBlock(); - + let searchNum = 0; while (left < right) { + searchNum++; const mid = Math.floor((left + right) / 2); - const header = await this.blockchainService.getHeaderForHeight(mid); - assert(header.timestamp, 'getHeader return `timestamp` is undfined'); + const header = await this.blockchainService.getRequiredHeaderForHeight(mid); if (header.timestamp === timestamp) { return header; @@ -334,6 +352,16 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl } } - return left ? this.blockchainService.getHeaderForHeight(left) : ({blockHeight: 0} as Header); + const targetHeader = left + ? await this.blockchainService.getRequiredHeaderForHeight(left) + : { + blockHash: '', + blockHeight: 0, + parentHash: '', + timestamp, + }; + logger.info(`Binary search times: ${searchNum}, target Header: ${JSON.stringify(targetHeader)}`); + + return targetHeader; } } diff --git a/packages/node-core/src/indexer/project.service.ts b/packages/node-core/src/indexer/project.service.ts index 8867f5c7f0..9be824f0e1 100644 --- a/packages/node-core/src/indexer/project.service.ts +++ b/packages/node-core/src/indexer/project.service.ts @@ -138,15 +138,11 @@ export class ProjectService< void this.poiSyncService.syncPoi(undefined); } - const reindexMultiChain = await this.initMultiChainRewindService(); - const reindexedUpgrade = await this.initUpgradeService(this.startHeight); // Unfinalized is dependent on POI in some cases, it needs to be init after POI is init const reindexedUnfinalized = await this.initUnfinalizedInternal(); - if (reindexMultiChain !== undefined) { - this._startHeight = reindexMultiChain.blockHeight; - } + const reindexMultiChain = await this.initMultiChainRewindService(); if (reindexedUnfinalized !== undefined) { this._startHeight = reindexedUnfinalized.blockHeight; @@ -156,6 +152,10 @@ export class ProjectService< this._startHeight = reindexedUpgrade; } + if (reindexMultiChain !== undefined) { + this._startHeight = reindexMultiChain.blockHeight; + } + // Flush any pending operations to set up DB await cacheProviderFlushData(this.storeService.modelProvider, true); } else { diff --git a/packages/node/src/blockchain.service.spec.ts b/packages/node/src/blockchain.service.spec.ts index 9e8652d900..57589b9ba6 100644 --- a/packages/node/src/blockchain.service.spec.ts +++ b/packages/node/src/blockchain.service.spec.ts @@ -55,4 +55,10 @@ describe('BlockchainService', () => { const interval = await blockchainService.getChainInterval(); expect(interval).toEqual(5000); }); + + it('can get the chain create time', async () => { + const requiredHeader = + await blockchainService.getRequiredHeaderForHeight(24723095); + expect(requiredHeader.timestamp.getTime()).toEqual(1739501268001); + }); }); diff --git a/packages/node/src/blockchain.service.ts b/packages/node/src/blockchain.service.ts index 374bdac380..405133c491 100644 --- a/packages/node/src/blockchain.service.ts +++ b/packages/node/src/blockchain.service.ts @@ -132,6 +132,23 @@ export class BlockchainService return substrateHeaderToHeader(finalizedHeader); } + // async test(blockHeight: number): Promise { + // // 连接到 Polkadot 节点 + // const wsProvider = new WsProvider('wss://rpc.polkadot.io'); + // const api = await ApiPromise.create({ provider: wsProvider }); + + // // 获取区块 + // const blockHash = await api.rpc.chain.getBlockHash(blockHeight); + + // // 获取区块的时间戳 + // const block = await api.rpc.chain.getBlock(blockHash); + // const timestamp = await api.at(blockHash); + + // console.log(`Block #${blockHeight} timestamp: ${timestamp.toString()}`); + // api.disconnect(); + // return; + // } + async getBestHeight(): Promise { const bestHeader = await this.apiService.unsafeApi.rpc.chain.getHeader(); return bestHeader.number.toNumber(); @@ -160,6 +177,26 @@ export class BlockchainService return this.getHeaderForHash(hash.toHex()); } + @mainThreadOnly() + async getRequiredHeaderForHeight(height: number): Promise> { + const blockHeader = await this.getHeaderForHeight(height); + + let timestamp: Date | undefined = blockHeader.timestamp; + + if (!timestamp) { + const blockTimestamp = await ( + await this.apiService.unsafeApi.at(blockHeader.blockHash) + ).query.timestamp.now(); + + timestamp = new Date(blockTimestamp.toNumber()); + } + + return { + ...blockHeader, + timestamp, + }; + } + // eslint-disable-next-line @typescript-eslint/require-await async updateDynamicDs( params: DatasourceParams,