Skip to content

Commit

Permalink
Merge pull request #40 from catalystdao/jsanmi/overhaul-logs
Browse files Browse the repository at this point in the history
[chore]: Overhaul logs
  • Loading branch information
jsanmigimeno authored Jun 28, 2024
2 parents bb7ba67 + 726c15a commit 905378e
Show file tree
Hide file tree
Showing 17 changed files with 399 additions and 157 deletions.
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
"pino": "^8.15.1",
"reflect-metadata": "^0.1.13",
"rxjs": "^7.8.1",
"viem": "^2.15.1"
"viem": "^2.15.1",
"winston": "^3.13.0",
"winston-transport": "^4.7.0"
},
"devDependencies": {
"@nestjs/cli": "^10.0.0",
Expand Down
14 changes: 10 additions & 4 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

87 changes: 62 additions & 25 deletions src/collector/layer-zero/layer-zero.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import { LayerZeroEnpointV2__factory } from 'src/contracts';
import { Resolver, loadResolver } from 'src/resolvers/resolver';
import { ParsePayload } from 'src/payload/decode.payload';
import { LayerZeroEnpointV2Interface, PacketSentEvent } from 'src/contracts/LayerZeroEnpointV2';
import { STATUS_LOG_INTERVAL } from 'src/logger/logger.service';

interface LayerZeroWorkerDataWithMapping extends LayerZeroWorkerData {
layerZeroChainIdMap: Record<string, string>;
Expand All @@ -64,6 +65,8 @@ class LayerZeroWorker {
private currentStatus: MonitorStatus | null = null;
private monitor: MonitorInterface;

private fromBlock: number = 0;

constructor() {
this.config = workerData as LayerZeroWorkerDataWithMapping;
this.chainId = this.config.chainId;
Expand Down Expand Up @@ -91,6 +94,8 @@ class LayerZeroWorker {
this.receiveULN302Interface.getEvent('PayloadVerified').topicHash,]
];
this.monitor = this.startListeningToMonitor(this.config.monitorPort);

this.initiateIntervalStatusLog();
}

// Initialization helpers
Expand Down Expand Up @@ -163,45 +168,30 @@ class LayerZeroWorker {
},
`LayerZero collector worker started.`,
);
let fromBlock = null;
while (fromBlock == null) {
if (this.currentStatus != null) {
if (this.config.startingBlock != null) {
if (this.config.startingBlock < 0) {
fromBlock = this.currentStatus.blockNumber + this.config.startingBlock;
if (fromBlock < 0) {
throw new Error(`Invalid 'startingBlock': negative offset is larger than the current block number.`)
}
} else {
fromBlock = this.config.startingBlock;
}
} else {
fromBlock = this.currentStatus.blockNumber;
}
}
await wait(this.config.processingInterval);
}

this.fromBlock = await this.getStartingBlock();
const stopBlock = this.config.stoppingBlock ?? Infinity;

while (true) {
try {
let toBlock = this.currentStatus?.blockNumber;
if (!toBlock || fromBlock > toBlock) {
if (!toBlock || this.fromBlock > toBlock) {
await wait(this.config.processingInterval);
continue;
}
if (toBlock > stopBlock) {
toBlock = stopBlock;
}
const blocksToProcess = toBlock - fromBlock;
const blocksToProcess = toBlock - this.fromBlock;
if (
this.config.maxBlocks != null &&
blocksToProcess > this.config.maxBlocks
) {
toBlock = fromBlock + this.config.maxBlocks;
toBlock = this.fromBlock + this.config.maxBlocks;
}
await this.queryAndProcessEvents(fromBlock, toBlock);
this.logger.info(
{ fromBlock, toBlock },
await this.queryAndProcessEvents(this.fromBlock, toBlock);
this.logger.debug(
{ fromBlock: this.fromBlock, toBlock },
`Scanning LayerZero events.`,
);
if (toBlock >= stopBlock) {
Expand All @@ -211,7 +201,7 @@ class LayerZeroWorker {
);
break;
}
fromBlock = toBlock + 1;
this.fromBlock = toBlock + 1;
} catch (error) {
this.logger.error(error, `Error on Layer Zero worker: processing blocks.`);
await wait(this.config.retryInterval);
Expand All @@ -222,6 +212,35 @@ class LayerZeroWorker {
await this.store.quit();
}

private async getStartingBlock(): Promise<number> {
let fromBlock: number | null = null;
while (fromBlock == null) {

// Do not initialize 'fromBlock' whilst 'currentStatus' is null, even if
// 'startingBlock' is specified.
if (this.currentStatus == null) {
await wait(this.config.processingInterval);
continue;
}

if (this.config.startingBlock == null) {
fromBlock = this.currentStatus.blockNumber;
break;
}

if (this.config.startingBlock < 0) {
fromBlock = this.currentStatus.blockNumber + this.config.startingBlock;
if (fromBlock < 0) {
throw new Error(`Invalid 'startingBlock': negative offset is larger than the current block number.`)
}
} else {
fromBlock = this.config.startingBlock;
}
}

return fromBlock;
}

/**
* Queries and processes events between two blocks.
*
Expand Down Expand Up @@ -542,6 +561,24 @@ class LayerZeroWorker {
const payload = `0x${guid}${message}`;
return ethers.keccak256(payload);
}



// Misc Helpers
// ********************************************************************************************

private initiateIntervalStatusLog(): void {
const logStatus = () => {
this.logger.info(
{
latestBlock: this.currentStatus?.blockNumber,
currentBlock: this.fromBlock,
},
'LayerZero collector status.'
);
};
setInterval(logStatus, STATUS_LOG_INTERVAL);
}
}

/**
Expand Down
88 changes: 60 additions & 28 deletions src/collector/mock/mock.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { MockWorkerData } from './mock';
import { IncentivizedMockEscrowInterface, MessageEvent } from 'src/contracts/IncentivizedMockEscrow';
import { MonitorInterface, MonitorStatus } from 'src/monitor/monitor.interface';
import { Resolver, loadResolver } from 'src/resolvers/resolver';
import { STATUS_LOG_INTERVAL } from 'src/logger/logger.service';


/**
Expand Down Expand Up @@ -53,6 +54,8 @@ class MockCollectorWorker {
private currentStatus: MonitorStatus | null = null;
private monitor: MonitorInterface;

private fromBlock: number = 0;


constructor() {
this.config = workerData as MockWorkerData;
Expand Down Expand Up @@ -87,6 +90,8 @@ class MockCollectorWorker {

// Start listening to the monitor service (get the latest block data).
this.monitor = this.startListeningToMonitor(this.config.monitorPort);

this.initiateIntervalStatusLog();
}


Expand Down Expand Up @@ -142,34 +147,14 @@ class MockCollectorWorker {
);

// Get the effective starting and stopping blocks.
let fromBlock = null;
while (fromBlock == null) {
// Do not initialize 'fromBlock' whilst 'currentStatus' is null, even if
// 'startingBlock' is specified.
if (this.currentStatus != null) {
if (this.config.startingBlock != null) {
if (this.config.startingBlock < 0) {
fromBlock = this.currentStatus.blockNumber + this.config.startingBlock;
if (fromBlock < 0) {
throw new Error(`Invalid 'startingBlock': negative offset is larger than the current block number.`)
}
} else {
fromBlock = this.config.startingBlock;
}
} else {
fromBlock = this.currentStatus.blockNumber;
}
}

await wait(this.config.processingInterval);
}
this.fromBlock = await this.getStartingBlock();

const stopBlock = this.config.stoppingBlock ?? Infinity;

while (true) {
try {
let toBlock = this.currentStatus?.blockNumber;
if (!toBlock || fromBlock > toBlock) {
if (!toBlock || this.fromBlock > toBlock) {
await wait(this.config.processingInterval);
continue;
}
Expand All @@ -180,20 +165,20 @@ class MockCollectorWorker {
}

// Do not process more than 'maxBlocks' within a single rpc call.
const blocksToProcess = toBlock - fromBlock;
const blocksToProcess = toBlock - this.fromBlock;
if (this.config.maxBlocks != null && blocksToProcess > this.config.maxBlocks) {
toBlock = fromBlock + this.config.maxBlocks;
toBlock = this.fromBlock + this.config.maxBlocks;
}

this.logger.info(
this.logger.debug(
{
fromBlock,
fromBlock: this.fromBlock,
toBlock,
},
`Scanning mock messages.`,
);

await this.queryAndProcessEvents(fromBlock, toBlock);
await this.queryAndProcessEvents(this.fromBlock, toBlock);

if (toBlock >= stopBlock) {
this.logger.info(
Expand All @@ -203,7 +188,7 @@ class MockCollectorWorker {
break;
}

fromBlock = toBlock + 1;
this.fromBlock = toBlock + 1;
}
catch (error) {
this.logger.error(error, `Error on mock.worker`);
Expand All @@ -218,6 +203,35 @@ class MockCollectorWorker {
await this.store.quit();
}

private async getStartingBlock(): Promise<number> {
let fromBlock: number | null = null;
while (fromBlock == null) {

// Do not initialize 'fromBlock' whilst 'currentStatus' is null, even if
// 'startingBlock' is specified.
if (this.currentStatus == null) {
await wait(this.config.processingInterval);
continue;
}

if (this.config.startingBlock == null) {
fromBlock = this.currentStatus.blockNumber;
break;
}

if (this.config.startingBlock < 0) {
fromBlock = this.currentStatus.blockNumber + this.config.startingBlock;
if (fromBlock < 0) {
throw new Error(`Invalid 'startingBlock': negative offset is larger than the current block number.`)
}
} else {
fromBlock = this.config.startingBlock;
}
}

return fromBlock;
}

private async queryAndProcessEvents(
fromBlock: number,
toBlock: number
Expand Down Expand Up @@ -357,6 +371,24 @@ class MockCollectorWorker {
await this.store.submitProof(destinationChainId, ambPayload);
}



// Misc Helpers
// ********************************************************************************************

private initiateIntervalStatusLog(): void {
const logStatus = () => {
this.logger.info(
{
latestBlock: this.currentStatus?.blockNumber,
currentBlock: this.fromBlock,
},
'Mock collector status.'
);
};
setInterval(logStatus, STATUS_LOG_INTERVAL);
}

}

void new MockCollectorWorker().run();
Loading

0 comments on commit 905378e

Please sign in to comment.