Skip to content

Commit

Permalink
fix some bug
Browse files Browse the repository at this point in the history
  • Loading branch information
yoozo committed Feb 14, 2025
1 parent 4190451 commit 56ea202
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 27 deletions.
1 change: 1 addition & 0 deletions packages/node-core/src/blockchain.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export interface IBlockchainService<
// Unfinalized blocks
getHeaderForHash(hash: string): Promise<Header>;
getHeaderForHeight(height: number): Promise<Header>;
getRequiredHeaderForHeight(height: number): Promise<Required<Header>>;

// Dynamic Ds sevice
/**
Expand Down
7 changes: 3 additions & 4 deletions packages/node-core/src/db/sync-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import {
} from '@subql/x-sequelize';
import {ModelAttributeColumnReferencesOptions, ModelIndexesOptions} from '@subql/x-sequelize/types/model';
import {MultiChainRewindEvent} from '../events';
import {RewindLockKey} from '../indexer';
import {EnumType} from '../utils';
import {formatAttributes, generateIndexName, modelToTableName} from './sequelizeUtil';
// eslint-disable-next-line @typescript-eslint/no-var-requires
Expand Down Expand Up @@ -307,8 +306,8 @@ export function createRewindTrigger(schema: string): string {
AFTER INSERT OR UPDATE OR DELETE
ON "${schema}"."_global"
FOR EACH ROW
WHEN ( new.key = '${RewindLockKey}')
EXECUTE FUNCTION "${schema}".rewind_notification();`;
EXECUTE FUNCTION "${schema}".rewind_notification();
`;
}

export function createRewindTriggerFunction(schema: string): string {
Expand All @@ -323,7 +322,7 @@ export function createRewindTriggerFunction(schema: string): string {
END IF;
-- During a rollback, there is a chain that needs to be rolled back to an earlier height.
IF TG_OP = 'UPDATE' AND (NEW.value ->> 'timestamp')::int < (OLD.value ->> 'timestamp')::int THEN
IF TG_OP = 'UPDATE' AND (NEW.value ->> 'timestamp')::BIGINT < (OLD.value ->> 'timestamp')::BIGINT THEN
PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.RewindTimestampDecreased}');
END IF;
Expand Down
3 changes: 1 addition & 2 deletions packages/node-core/src/indexer/entities/GlobalData.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ export type RewindTimestampKey = `${typeof RewindTimestampKeyPrefix}_${string}`;
export type RewindLockInfo = {
/** Timestamp to rewind to. */
timestamp: number;
/** Number of additional chains to rewind. */
rewindNum: number;
chainNum: number;
};
export interface GlobalDataKeys {
rewindLock: RewindLockInfo;
Expand Down
60 changes: 44 additions & 16 deletions packages/node-core/src/indexer/multiChainRewind.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import dayjs from 'dayjs';
import {Pool, PoolClient} from 'pg';
import {IBlockchainService} from '../blockchain.service';
import {NodeConfig} from '../configure';
import {getPgPoolConfig} from '../db';
import {createRewindTrigger, createRewindTriggerFunction, getPgPoolConfig, getTriggers} from '../db';
import {MultiChainRewindEvent, MultiChainRewindPayload} from '../events';
import {getLogger} from '../logger';
import {
Expand Down Expand Up @@ -62,8 +62,9 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl
private _status: RewindStatus = RewindStatus.Normal;
private _chainId?: string;
private _dbSchema?: string;
waitRewindHeader?: Header;
private _rewindTriggerName?: string;
private pgListener?: PoolClient;
waitRewindHeader?: Required<Header>;
constructor(
private nodeConfig: NodeConfig,
private eventEmitter: EventEmitter2,
Expand All @@ -89,6 +90,14 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl
private set dbSchema(dbSchema: string) {
this._dbSchema = dbSchema;
}
private set rewindTriggerName(rewindTriggerName: string) {
this._rewindTriggerName = rewindTriggerName;
}

get rewindTriggerName(): string {
assert(this._rewindTriggerName, 'rewindTriggerName is not set');
return this._rewindTriggerName;
}

private set status(status: RewindStatus) {
this._status = status;
Expand All @@ -105,8 +114,16 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl
async init(chainId: string, dbSchema: string, reindex: (targetHeader: Header) => Promise<void>) {
this.chainId = chainId;
this.dbSchema = dbSchema;
this.rewindTriggerName = hashName(this.dbSchema, 'rewind_trigger', '_global');

if (this.storeService.historical === 'timestamp') {
await this.sequelize.query(`${createRewindTriggerFunction(this.dbSchema)}`);

const rewindTriggers = await getTriggers(this.sequelize, this.rewindTriggerName);
if (rewindTriggers.length === 0) {
await this.sequelize.query(`${createRewindTrigger(this.dbSchema)}`);
}

// Register a listener and create a schema notification sending function.
await this.registerPgListener();

Expand Down Expand Up @@ -155,7 +172,8 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl
});
});

await this.pgListener.query(`LISTEN "${hashName(this.dbSchema, 'rewind_trigger', '_global')}"`);
await this.pgListener.query(`LISTEN "${this.rewindTriggerName}"`);
logger.info(`Register rewind listener success, chainId: ${this.chainId}`);

// Check whether the current state is in rollback.
const {rewindLock, rewindTimestamp} = await this.getGlobalRewindStatus();
Expand Down Expand Up @@ -222,7 +240,7 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl
SET "key" = EXCLUDED."key",
"value" = EXCLUDED."value",
"updatedAt" = EXCLUDED."updatedAt"
WHERE "key" = '${RewindLockKey}' AND ("value"->>'timestamp')::int > ${rewindTimestamp}`,
WHERE "${globalTable}"."key" = '${RewindLockKey}' AND ("${globalTable}"."value"->>'timestamp')::BIGINT > ${rewindTimestamp}`,
{
type: QueryTypes.INSERT,
transaction: tx,
Expand Down Expand Up @@ -262,10 +280,10 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl
SET value = jsonb_set(
value,
'{chainNum}',
to_jsonb(COALESCE((value ->> 'chainNum')::int, 0) - 1),
to_jsonb(COALESCE(("${globalTable}"."value" ->> 'chainNum')::BIGINT, 0) - 1),
false
)
WHERE "key" = '${RewindLockKey}' AND ("value"->>'timestamp')::int = ${rewindTimestamp}
WHERE "${globalTable}"."key" = '${RewindLockKey}' AND ("${globalTable}"."value"->>'timestamp')::BIGINT = ${rewindTimestamp}
RETURNING value`,
{
type: QueryTypes.SELECT,
Expand All @@ -280,7 +298,7 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl
);
return 0;
}
const rewindNum = results[0].value.rewindNum;
const chainNum = results[0].value.chainNum;

const rewindTimestampKey = generateRewindTimestampKey(this.chainId);
const [affectedCount] = await this.storeService.globalDataRepo.update(
Expand All @@ -298,32 +316,32 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl
`not found rewind timestamp key in global data, chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}`
);

if (rewindNum === 0) {
if (chainNum === 0) {
await this.storeService.globalDataRepo.destroy({where: {key: RewindLockKey}, transaction: tx});
}

// The current chain has completed the rewind, and we still need to wait for other chains to finish.
// When fully synchronized, set the status back to normal by pgListener.
this.status = RewindStatus.WaitOtherChain;
logger.info(`Rewind success chainId: ${JSON.stringify({rewindNum, chainId: this.chainId, rewindTimestamp})}`);
return rewindNum;
logger.info(`Rewind success chainId: ${JSON.stringify({chainNum, chainId: this.chainId, rewindTimestamp})}`);
return chainNum;
}

/**
* Get the block header closest to the given timestamp
* @param timestamp To find the block closest to a given timestamp
* @returns undefined if the timestamp is less than the first block timestamp
* @returns
*/
async getHeaderByBinarySearch(timestamp: Header['timestamp']): Promise<Header> {
async getHeaderByBinarySearch(timestamp: Header['timestamp']): Promise<Required<Header>> {
assert(timestamp, 'getHeaderByBinarySearch `timestamp` is required');

let left = 0;
let {height: right} = await this.storeService.getLastProcessedBlock();

let searchNum = 0;
while (left < right) {
searchNum++;
const mid = Math.floor((left + right) / 2);
const header = await this.blockchainService.getHeaderForHeight(mid);
assert(header.timestamp, 'getHeader return `timestamp` is undfined');
const header = await this.blockchainService.getRequiredHeaderForHeight(mid);

if (header.timestamp === timestamp) {
return header;
Expand All @@ -334,6 +352,16 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl
}
}

return left ? this.blockchainService.getHeaderForHeight(left) : ({blockHeight: 0} as Header);
const targetHeader = left
? await this.blockchainService.getRequiredHeaderForHeight(left)
: {
blockHash: '',
blockHeight: 0,
parentHash: '',
timestamp,
};
logger.info(`Binary search times: ${searchNum}, target Header: ${JSON.stringify(targetHeader)}`);

return targetHeader;
}
}
10 changes: 5 additions & 5 deletions packages/node-core/src/indexer/project.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,11 @@ export class ProjectService<
void this.poiSyncService.syncPoi(undefined);
}

const reindexMultiChain = await this.initMultiChainRewindService();

const reindexedUpgrade = await this.initUpgradeService(this.startHeight);
// Unfinalized is dependent on POI in some cases, it needs to be init after POI is init
const reindexedUnfinalized = await this.initUnfinalizedInternal();

if (reindexMultiChain !== undefined) {
this._startHeight = reindexMultiChain.blockHeight;
}
const reindexMultiChain = await this.initMultiChainRewindService();

if (reindexedUnfinalized !== undefined) {
this._startHeight = reindexedUnfinalized.blockHeight;
Expand All @@ -156,6 +152,10 @@ export class ProjectService<
this._startHeight = reindexedUpgrade;
}

if (reindexMultiChain !== undefined) {
this._startHeight = reindexMultiChain.blockHeight;
}

// Flush any pending operations to set up DB
await cacheProviderFlushData(this.storeService.modelProvider, true);
} else {
Expand Down
6 changes: 6 additions & 0 deletions packages/node/src/blockchain.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,10 @@ describe('BlockchainService', () => {
const interval = await blockchainService.getChainInterval();
expect(interval).toEqual(5000);
});

it('can get the chain create time', async () => {
const requiredHeader =
await blockchainService.getRequiredHeaderForHeight(24723095);
expect(requiredHeader.timestamp.getTime()).toEqual(1739501268001);
});
});
37 changes: 37 additions & 0 deletions packages/node/src/blockchain.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,23 @@ export class BlockchainService
return substrateHeaderToHeader(finalizedHeader);
}

// async test(blockHeight: number): Promise<any> {
// // 连接到 Polkadot 节点
// const wsProvider = new WsProvider('wss://rpc.polkadot.io');
// const api = await ApiPromise.create({ provider: wsProvider });

// // 获取区块
// const blockHash = await api.rpc.chain.getBlockHash(blockHeight);

// // 获取区块的时间戳
// const block = await api.rpc.chain.getBlock(blockHash);
// const timestamp = await api.at(blockHash);

// console.log(`Block #${blockHeight} timestamp: ${timestamp.toString()}`);
// api.disconnect();
// return;
// }

async getBestHeight(): Promise<number> {
const bestHeader = await this.apiService.unsafeApi.rpc.chain.getHeader();
return bestHeader.number.toNumber();
Expand Down Expand Up @@ -160,6 +177,26 @@ export class BlockchainService
return this.getHeaderForHash(hash.toHex());
}

@mainThreadOnly()
async getRequiredHeaderForHeight(height: number): Promise<Required<Header>> {
const blockHeader = await this.getHeaderForHeight(height);

let timestamp: Date | undefined = blockHeader.timestamp;

if (!timestamp) {
const blockTimestamp = await (
await this.apiService.unsafeApi.at(blockHeader.blockHash)
).query.timestamp.now();

timestamp = new Date(blockTimestamp.toNumber());
}

return {
...blockHeader,
timestamp,
};
}

// eslint-disable-next-line @typescript-eslint/require-await
async updateDynamicDs(
params: DatasourceParams,
Expand Down

0 comments on commit 56ea202

Please sign in to comment.