diff --git a/src/submitter/queues/submit-queue.ts b/src/submitter/queues/submit-queue.ts index 738d1bd..70dda61 100644 --- a/src/submitter/queues/submit-queue.ts +++ b/src/submitter/queues/submit-queue.ts @@ -20,6 +20,7 @@ export class SubmitQueue extends ProcessingQueue< maxTries: number, private readonly incentivesContracts: Map, relayerAddress: string, + private readonly chainId: string, private readonly wallet: WalletInterface, private readonly logger: pino.Logger, ) { @@ -65,6 +66,7 @@ export class SubmitQueue extends ProcessingQueue< }; const txPromise = this.wallet.submitTransaction( + this.chainId, txRequest, order, ).then((transactionResult): SubmitOrderResult => { diff --git a/src/submitter/submitter.service.ts b/src/submitter/submitter.service.ts index ab8df54..4b6500b 100644 --- a/src/submitter/submitter.service.ts +++ b/src/submitter/submitter.service.ts @@ -179,7 +179,7 @@ export class SubmitterService { ), walletPublicKey: globalConfig.walletPublicKey, - walletPort: await this.walletService.attachToWallet(chainId), + walletPort: await this.walletService.attachToWallet(), loggerOptions: this.loggerService.loggerOptions, }; } diff --git a/src/submitter/submitter.worker.ts b/src/submitter/submitter.worker.ts index 3fb2ffd..ce776cf 100644 --- a/src/submitter/submitter.worker.ts +++ b/src/submitter/submitter.worker.ts @@ -108,6 +108,7 @@ class SubmitterWorker { maxTries, incentivesContracts, walletPublicKey, + chainId, wallet, logger, ); diff --git a/src/wallet/transaction-helper.ts b/src/wallet/transaction-helper.ts index 016b8b9..760e4a4 100644 --- a/src/wallet/transaction-helper.ts +++ b/src/wallet/transaction-helper.ts @@ -224,78 +224,102 @@ export class TransactionHelper { } } - getFeeDataForTransaction(priority?: boolean): GasFeeOverrides { - const queriedFeeData = this.feeData; - if (queriedFeeData == undefined) { - return {}; + getCachedFeeData(): FeeData | undefined { + return this.feeData; + } + + getAdjustedFeeData( + priority?: boolean, + ): FeeData | undefined { + const feeData = {...this.feeData}; + if (feeData == undefined) { + return undefined; } - const queriedMaxPriorityFeePerGas = queriedFeeData.maxPriorityFeePerGas; - if (queriedMaxPriorityFeePerGas != null) { - // Set fee data for an EIP 1559 transactions - let maxFeePerGas = this.maxFeePerGas; + // Override 'maxFeePerGas' if it is specified on the config. + if (this.maxFeePerGas) { + feeData.maxFeePerGas = this.maxFeePerGas; + } + // Adjust the 'maxPriorityFeePerGas' if present. + if (feeData.maxPriorityFeePerGas != undefined) { // Adjust the 'maxPriorityFeePerGas' by the adjustment factor - let maxPriorityFeePerGas; if (this.maxPriorityFeeAdjustmentFactor != undefined) { - maxPriorityFeePerGas = queriedMaxPriorityFeePerGas + feeData.maxPriorityFeePerGas = feeData.maxPriorityFeePerGas * this.maxPriorityFeeAdjustmentFactor / DECIMAL_BASE_BIG_INT; } // Apply the max allowed 'maxPriorityFeePerGas' if ( - maxPriorityFeePerGas != undefined && this.maxAllowedPriorityFeePerGas != undefined && - this.maxAllowedPriorityFeePerGas < maxPriorityFeePerGas + this.maxAllowedPriorityFeePerGas < feeData.maxPriorityFeePerGas ) { - maxPriorityFeePerGas = this.maxAllowedPriorityFeePerGas; - } - - if (priority) { - if (maxFeePerGas != undefined) { - maxFeePerGas = maxFeePerGas * this.priorityAdjustmentFactor / DECIMAL_BASE_BIG_INT; - } - - if (maxPriorityFeePerGas != undefined) { - maxPriorityFeePerGas = maxPriorityFeePerGas * this.priorityAdjustmentFactor / DECIMAL_BASE_BIG_INT; - } + feeData.maxPriorityFeePerGas = this.maxAllowedPriorityFeePerGas; } + } - return { - maxFeePerGas, - maxPriorityFeePerGas, - }; - } else { - // Set traditional gasPrice - const queriedGasPrice = queriedFeeData.gasPrice; - if (queriedGasPrice == null) return {}; - + // Adjust the 'gasPrice' if present. + if (feeData.gasPrice) { // Adjust the 'gasPrice' by the adjustment factor - let gasPrice; if (this.gasPriceAdjustmentFactor != undefined) { - gasPrice = queriedGasPrice + feeData.gasPrice = feeData.gasPrice * this.gasPriceAdjustmentFactor / DECIMAL_BASE_BIG_INT; } // Apply the max allowed 'gasPrice' if ( - gasPrice != undefined && this.maxAllowedGasPrice != undefined && - this.maxAllowedGasPrice < gasPrice + this.maxAllowedGasPrice < feeData.gasPrice ) { - gasPrice = this.maxAllowedGasPrice; + feeData.gasPrice = this.maxAllowedGasPrice; } + } - if (priority && gasPrice != undefined) { - gasPrice = gasPrice + // Apply the 'priority' adjustment factor + if (priority) { + if (feeData.maxFeePerGas != undefined) { + feeData.maxFeePerGas = feeData.maxFeePerGas + *this.priorityAdjustmentFactor + / DECIMAL_BASE_BIG_INT; + } + + if (feeData.maxPriorityFeePerGas != undefined) { + feeData.maxPriorityFeePerGas = feeData.maxPriorityFeePerGas + *this.priorityAdjustmentFactor + / DECIMAL_BASE_BIG_INT; + } + + if (feeData.gasPrice != undefined) { + feeData.gasPrice = feeData.gasPrice * this.priorityAdjustmentFactor / DECIMAL_BASE_BIG_INT; } + } + + return new FeeData( + feeData.gasPrice, + feeData.maxFeePerGas, + feeData.maxPriorityFeePerGas + ); + } + getFeeDataForTransaction(priority?: boolean): GasFeeOverrides { + const adjustedFeeData = this.getAdjustedFeeData(priority); + if (adjustedFeeData == undefined) { + return {}; + } + + if (adjustedFeeData.maxPriorityFeePerGas != undefined) { + // Set fee data for EIP 1559 transactions + return { + maxFeePerGas: adjustedFeeData.maxFeePerGas ?? undefined, + maxPriorityFeePerGas: adjustedFeeData.maxPriorityFeePerGas, + }; + } else { return { - gasPrice, + gasPrice: adjustedFeeData.gasPrice ?? undefined, }; } } diff --git a/src/wallet/wallet.interface.ts b/src/wallet/wallet.interface.ts index 7df2c29..eaf837c 100644 --- a/src/wallet/wallet.interface.ts +++ b/src/wallet/wallet.interface.ts @@ -1,7 +1,6 @@ -import { TransactionReceipt, TransactionRequest, TransactionResponse } from 'ethers6'; +import { FeeData, TransactionReceipt, TransactionRequest, TransactionResponse } from 'ethers6'; import { MessagePort } from 'worker_threads'; -import { WalletTransactionOptions, WalletTransactionRequestMessage, WalletTransactionRequestResponse } from './wallet.types'; -import { WALLET_WORKER_CRASHED_MESSAGE_ID } from './wallet.service'; +import { WALLET_WORKER_CRASHED_MESSAGE_ID, WalletFeeDataMessage, WalletGetFeeDataMessage, WalletMessageType, WalletPortData, WalletTransactionOptions, WalletTransactionRequestMessage, WalletTransactionRequestResponseMessage } from './wallet.types'; export interface TransactionResult { txRequest: TransactionRequest; @@ -22,6 +21,7 @@ export class WalletInterface { } async submitTransaction( + chainId: string, transaction: TransactionRequest, metadata?: T, options?: WalletTransactionOptions @@ -30,22 +30,25 @@ export class WalletInterface { const messageId = this.getNextPortMessageId(); const resultPromise = new Promise>(resolve => { - const listener = (data: any) => { + const listener = (data: WalletPortData) => { if (data.messageId === messageId) { this.port.off("message", listener); - const walletResponse = data as WalletTransactionRequestResponse; + const walletResponse = data.message as WalletTransactionRequestResponseMessage; const result = { txRequest: walletResponse.txRequest, metadata: walletResponse.metadata, tx: walletResponse.tx, txReceipt: walletResponse.txReceipt, - submissionError: data.submissionError, - confirmationError: data.confirmationError + submissionError: walletResponse.submissionError, + confirmationError: walletResponse.confirmationError }; resolve(result); - } else if (data.messageId === WALLET_WORKER_CRASHED_MESSAGE_ID) { + } else if ( + data.messageId === WALLET_WORKER_CRASHED_MESSAGE_ID + && data.chainId == chainId + ) { this.port.off("message", listener); const result = { @@ -59,13 +62,67 @@ export class WalletInterface { }; this.port.on("message", listener); - const request: WalletTransactionRequestMessage = { - messageId, + const message: WalletTransactionRequestMessage = { + type: WalletMessageType.TransactionRequest, txRequest: transaction, metadata, options }; - this.port.postMessage(request); + + const portData: WalletPortData = { + chainId, + messageId, + message, + } + this.port.postMessage(portData); + }); + + return resultPromise; + } + + async getFeeData( + chainId: string, + priority?: boolean, + ): Promise { + + const messageId = this.getNextPortMessageId(); + + const resultPromise = new Promise(resolve => { + const listener = (data: WalletPortData) => { + if (data.messageId === messageId) { + this.port.off("message", listener); + + const walletResponse = data.message as WalletFeeDataMessage; + + const result = { + gasPrice: walletResponse.gasPrice, + maxFeePerGas: walletResponse.maxFeePerGas, + maxPriorityFeePerGas: walletResponse.maxPriorityFeePerGas, + } as FeeData; + resolve(result); + } else if ( + data.messageId === WALLET_WORKER_CRASHED_MESSAGE_ID + && data.chainId == chainId + ) { + this.port.off("message", listener); + + const result = {} as FeeData; + resolve(result); + } + }; + this.port.on("message", listener); + + const message: WalletGetFeeDataMessage = { + type: WalletMessageType.GetFeeData, + priority: priority ?? false, + }; + + const portData: WalletPortData = { + chainId, + messageId, + message, + } + this.port.postMessage(portData); }); return resultPromise; diff --git a/src/wallet/wallet.service.ts b/src/wallet/wallet.service.ts index 566a5d9..02417a2 100644 --- a/src/wallet/wallet.service.ts +++ b/src/wallet/wallet.service.ts @@ -4,12 +4,10 @@ import { LoggerOptions } from 'pino'; import { Worker, MessagePort, MessageChannel } from 'worker_threads'; import { ConfigService } from 'src/config/config.service'; import { LoggerService, STATUS_LOG_INTERVAL } from 'src/logger/logger.service'; -import { WalletServiceRoutingMessage, WalletTransactionRequestMessage } from './wallet.types'; +import { WALLET_WORKER_CRASHED_MESSAGE_ID, WalletCrashedMessage, WalletMessageType, WalletPortData, WalletServiceRoutingData } from './wallet.types'; import { Wallet } from 'ethers6'; import { tryErrorToString } from 'src/common/utils'; -export const WALLET_WORKER_CRASHED_MESSAGE_ID = -1; - const DEFAULT_WALLET_RETRY_INTERVAL = 30000; const DEFAULT_WALLET_PROCESSING_INTERVAL = 100; const DEFAULT_WALLET_MAX_TRIES = 3; @@ -58,11 +56,6 @@ export interface WalletWorkerData { } -interface PortDescription { - chainId: string; - port: MessagePort; -} - @Global() @Injectable() export class WalletService implements OnModuleInit { @@ -70,9 +63,9 @@ export class WalletService implements OnModuleInit { private workers: Record = {}; private portsCount = 0; - private readonly ports: Record = {}; + private readonly ports: Record = {}; - private readonly queuedRequests: Record = {}; + private readonly queuedMessages: Record = {}; readonly publicKey: string; @@ -243,17 +236,23 @@ export class WalletService implements OnModuleInit { this.recoverQueuedMessages(chainId); }); - worker.on('message', (message: WalletServiceRoutingMessage) => { - const portDescription = this.ports[message.portId]; - if (portDescription == undefined) { + worker.on('message', (routingData: WalletServiceRoutingData) => { + const port = this.ports[routingData.portId]; + if (port == undefined) { this.loggerService.error( - message, + { routingData }, `Unable to route transaction response on wallet: port id not found.` ); return; } - portDescription.port.postMessage(message.data); + const portData: WalletPortData = { + chainId, + messageId: routingData.messageId, + message: routingData.message, + }; + + port.postMessage(portData); }); } @@ -274,38 +273,35 @@ export class WalletService implements OnModuleInit { setInterval(logStatus, STATUS_LOG_INTERVAL); } - async attachToWallet(chainId: string): Promise { + async attachToWallet(): Promise { const portId = this.portsCount++; const { port1, port2 } = new MessageChannel(); - port1.on('message', (message: WalletTransactionRequestMessage) => { - this.handleTransactionRequestMessage( - chainId, + port1.on('message', (portData: WalletPortData) => { + this.handleWalletPortData( portId, - message, + portData, ); }); - this.ports[portId] = { - chainId, - port: port1, - }; + this.ports[portId] = port1; return port2; } - private handleTransactionRequestMessage( - chainId: string, + private handleWalletPortData( portId: number, - message: WalletTransactionRequestMessage + portData: WalletPortData, ): void { + const chainId = portData.chainId; const worker = this.workers[chainId]; - const routingMessage: WalletServiceRoutingMessage = { + const routingData: WalletServiceRoutingData = { portId, - data: message + messageId: portData.messageId, + message: portData.message, }; if (worker == undefined) { @@ -313,51 +309,60 @@ export class WalletService implements OnModuleInit { { chainId, portId, - message + message: portData.message }, `Wallet does not exist for the requested chain. Queueing message.` ); - if (!(chainId in this.queuedRequests)) { - this.queuedRequests[chainId] = []; + if (!(chainId in this.queuedMessages)) { + this.queuedMessages[chainId] = []; } - this.queuedRequests[chainId]!.push(routingMessage); + this.queuedMessages[chainId]!.push(routingData); } else { - worker.postMessage(routingMessage); + worker.postMessage(routingData); } } private abortPendingRequests( chainId: string, ): void { - for (const portDescription of Object.values(this.ports)) { - if (portDescription.chainId === chainId) { - portDescription.port.postMessage({ - messageId: WALLET_WORKER_CRASHED_MESSAGE_ID - }); - } + const message: WalletCrashedMessage = { + type: WalletMessageType.WalletCrashed, + }; + + const walletCrashBroadcast: WalletPortData = { + chainId, + messageId: WALLET_WORKER_CRASHED_MESSAGE_ID, + message, + } + + for (const port of Object.values(this.ports)) { + port.postMessage(walletCrashBroadcast); } } private recoverQueuedMessages( chainId: string, ): void { - const queuedRequests = this.queuedRequests[chainId] ?? []; - this.queuedRequests[chainId] = []; + const queuedMessages = this.queuedMessages[chainId] ?? []; + this.queuedMessages[chainId] = []; this.loggerService.info( { chainId, - count: queuedRequests.length, + count: queuedMessages.length, }, `Recovering queued wallet requests.` ); - for (const request of queuedRequests) { - this.handleTransactionRequestMessage( - chainId, - request.portId, - request.data, + for (const queuedMessage of queuedMessages) { + this.handleWalletPortData( + queuedMessage.portId, + { + chainId, + messageId: queuedMessage.messageId, + message: queuedMessage.message, + }, ); } } diff --git a/src/wallet/wallet.types.ts b/src/wallet/wallet.types.ts index a800af5..0fc461b 100644 --- a/src/wallet/wallet.types.ts +++ b/src/wallet/wallet.types.ts @@ -4,21 +4,46 @@ import { TransactionRequest, TransactionReceipt, TransactionResponse } from "eth // Port Channels Types // ************************************************************************************************ -export interface WalletServiceRoutingMessage { + +export const WALLET_WORKER_CRASHED_MESSAGE_ID = -1; + +export interface WalletPortData { + chainId: string; + messageId: number; + message: WalletMessage; +} + +export interface WalletServiceRoutingData { portId: number; - data: T; + messageId: number; + message: WalletMessage; } +export enum WalletMessageType { + TransactionRequest, + TransactionRequestResponse, + GetFeeData, + FeeData, + WalletCrashed, +} + +export type WalletMessage = WalletTransactionRequestMessage + | WalletTransactionRequestResponseMessage + | WalletGetFeeDataMessage + | WalletFeeDataMessage + | WalletCrashedMessage; + + //TODO add 'priority' export interface WalletTransactionRequestMessage { - messageId: number; + type: WalletMessageType.TransactionRequest, txRequest: TransactionRequest; metadata: T; options?: WalletTransactionOptions; } -export interface WalletTransactionRequestResponse { - messageId: number; +export interface WalletTransactionRequestResponseMessage { + type: WalletMessageType.TransactionRequestResponse, txRequest: TransactionRequest; metadata: T; tx?: TransactionResponse; @@ -27,6 +52,23 @@ export interface WalletTransactionRequestResponse { confirmationError?: any; } +export interface WalletGetFeeDataMessage { + type: WalletMessageType.GetFeeData, + priority: boolean, +} + +export interface WalletFeeDataMessage { + type: WalletMessageType.FeeData, + priority: boolean, + gasPrice?: bigint; + maxFeePerGas?: bigint; + maxPriorityFeePerGas?: bigint; +} + +export interface WalletCrashedMessage { + type: WalletMessageType.WalletCrashed, +} + // Processing Types diff --git a/src/wallet/wallet.worker.ts b/src/wallet/wallet.worker.ts index 70b0605..fb137cd 100644 --- a/src/wallet/wallet.worker.ts +++ b/src/wallet/wallet.worker.ts @@ -6,7 +6,7 @@ import { STATUS_LOG_INTERVAL } from "src/logger/logger.service"; import { TransactionHelper } from "./transaction-helper"; import { ConfirmQueue } from "./queues/confirm-queue"; import { WalletWorkerData } from "./wallet.service"; -import { ConfirmedTransaction, GasFeeConfig, PendingTransaction, WalletTransactionOptions, WalletTransactionRequest, WalletTransactionRequestResponse, BalanceConfig, WalletServiceRoutingMessage } from "./wallet.types"; +import { ConfirmedTransaction, GasFeeConfig, PendingTransaction, WalletTransactionOptions, WalletTransactionRequest, WalletTransactionRequestResponseMessage, BalanceConfig, WalletServiceRoutingData, WalletMessageType, WalletFeeDataMessage } from "./wallet.types"; import { SubmitQueue } from "./queues/submit-queue"; @@ -150,17 +150,38 @@ class WalletWorker { } private initializePort(): void { - parentPort!.on('message', (message: WalletServiceRoutingMessage) => { - this.addTransaction( - message.portId, - message.data.messageId, - message.data.txRequest, - message.data.metadata, - message.data.options - ); + parentPort!.on('message', (message: WalletServiceRoutingData) => { + this.processRequest(message); }); } + private processRequest(data: WalletServiceRoutingData): void { + const messageType = data.message.type; + switch(messageType) { + case WalletMessageType.TransactionRequest: + this.addTransaction( + data.portId, + data.messageId, + data.message.txRequest, + data.message.metadata, + data.message.options + ) + break; + case WalletMessageType.GetFeeData: + this.handleGetFeeDataRequest( + data.portId, + data.messageId, + data.message.priority, + ); + break; + default: + this.logger.error( + data, + 'Unable to process request: wallet message type unsupported.' + ); + } + } + private addTransaction( portId: number, messageId: number, @@ -182,6 +203,30 @@ class WalletWorker { this.newRequestsQueue.push(request); } + private handleGetFeeDataRequest( + portId: number, + messageId: number, + priority: boolean, + ): void { + const adjustedFeeData = this.transactionHelper.getAdjustedFeeData(priority); + + const feeDataMessage: WalletFeeDataMessage = { + type: WalletMessageType.FeeData, + priority, + maxFeePerGas: adjustedFeeData?.maxFeePerGas ?? undefined, + maxPriorityFeePerGas: adjustedFeeData?.maxPriorityFeePerGas ?? undefined, + gasPrice: adjustedFeeData?.gasPrice ?? undefined, + }; + + const routingResponse: WalletServiceRoutingData = { + portId, + messageId, + message: feeDataMessage, + }; + + parentPort!.postMessage(routingResponse); + } + private initiateIntervalStatusLog(): void { const logStatus = () => { const status = { @@ -458,8 +503,8 @@ class WalletWorker { confirmationError?: any, ): void { - const transactionResponse: WalletTransactionRequestResponse = { - messageId: request.messageId, + const transactionResponse: WalletTransactionRequestResponseMessage = { + type: WalletMessageType.TransactionRequestResponse, txRequest: request.txRequest, metadata: request.metadata, tx, @@ -468,9 +513,10 @@ class WalletWorker { confirmationError: tryErrorToString(confirmationError), } - const routingResponse: WalletServiceRoutingMessage = { + const routingResponse: WalletServiceRoutingData = { portId: request.portId, - data: transactionResponse, + messageId: request.messageId, + message: transactionResponse, } parentPort!.postMessage(routingResponse);