Skip to content

Commit

Permalink
Merge pull request #16 from catalystdao/jsanmi/wallet-worker-restart
Browse files Browse the repository at this point in the history
Wallet worker restart
  • Loading branch information
reednaa authored May 27, 2024
2 parents a9c4006 + 775a194 commit 47d4156
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 100 deletions.
30 changes: 21 additions & 9 deletions src/wallet/wallet.interface.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { TransactionReceipt, TransactionRequest, TransactionResponse } from 'ethers6';
import { MessagePort } from 'worker_threads';
import { WalletTransactionOptions, WalletTransactionRequestMessage, WalletTransactionRequestResponse } from './wallet.types';
import { tryErrorToString } from 'src/common/utils';
import { WALLET_WORKER_CRASHED_MESSAGE_ID } from './wallet.service';

export interface TransactionResult<T = any> {
txRequest: TransactionRequest;
Expand Down Expand Up @@ -30,19 +30,31 @@ export class WalletInterface {
const messageId = this.getNextPortMessageId();

const resultPromise = new Promise<TransactionResult<T>>(resolve => {
const listener = (data: WalletTransactionRequestResponse<T>) => {
if (data.messageId == messageId) {
const listener = (data: any) => {
if (data.messageId === messageId) {
this.port.off("message", listener);

const walletResponse = data as WalletTransactionRequestResponse<T>;

const result = {
txRequest: data.txRequest,
metadata: data.metadata,
tx: data.tx,
txReceipt: data.txReceipt,
submissionError: tryErrorToString(data.submissionError),
confirmationError: tryErrorToString(data.confirmationError)
txRequest: walletResponse.txRequest,
metadata: walletResponse.metadata,
tx: walletResponse.tx,
txReceipt: walletResponse.txReceipt,
submissionError: data.submissionError,
confirmationError: data.confirmationError
};
resolve(result);
} else if (data.messageId === WALLET_WORKER_CRASHED_MESSAGE_ID) {
this.port.off("message", listener);

const result = {
txRequest: transaction,
metadata,
submissionError: new Error('Wallet crashed.'), //TODO use a custom error type?
confirmationError: new Error('Wallet crashed.'), //TODO use a custom error type?
};
resolve(result);
}
};
this.port.on("message", listener);
Expand Down
191 changes: 146 additions & 45 deletions src/wallet/wallet.service.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { Global, Injectable, OnModuleInit } from '@nestjs/common';
import { join } from 'path';
import { LoggerOptions } from 'pino';
import { Worker, MessagePort } from 'worker_threads';
import { Worker, MessagePort, MessageChannel } from 'worker_threads';
import { ConfigService } from 'src/config/config.service';
import { LoggerService, STATUS_LOG_INTERVAL } from 'src/logger/logger.service';
import { WalletGetPortMessage, WalletGetPortResponse } from './wallet.types';
import { WalletServiceRoutingMessage, WalletTransactionRequestMessage } from './wallet.types';
import { Wallet } from 'ethers6';
import { tryErrorToString } from 'src/common/utils';

export const WALLET_WORKER_CRASHED_MESSAGE_ID = -1;

const DEFAULT_WALLET_RETRY_INTERVAL = 30000;
const DEFAULT_WALLET_PROCESSING_INTERVAL = 100;
const DEFAULT_WALLET_MAX_TRIES = 3;
Expand Down Expand Up @@ -56,18 +58,29 @@ export interface WalletWorkerData {

}

interface PortDescription {
chainId: string;
port: MessagePort;
}

@Global()
@Injectable()
export class WalletService implements OnModuleInit {
private readonly defaultWorkerConfig: DefaultWalletWorkerData;

private workers: Record<string, Worker | null> = {};
private requestPortMessageId = 0;
private portsCount = 0;
private readonly ports: Record<number, PortDescription> = {};

private readonly queuedRequests: Record<string, WalletServiceRoutingMessage[]> = {};

readonly publicKey: string;

constructor(
private readonly configService: ConfigService,
private readonly loggerService: LoggerService,
) {
this.defaultWorkerConfig = this.loadDefaultWorkerConfig();
this.publicKey = (new Wallet(this.configService.globalConfig.privateKey)).address;
}

Expand All @@ -80,31 +93,9 @@ export class WalletService implements OnModuleInit {
}

private async initializeWorkers(): Promise<void> {
const defaultWorkerConfig = this.loadDefaultWorkerConfig();

for (const [chainId,] of this.configService.chainsConfig) {

const workerData = this.loadWorkerConfig(chainId, defaultWorkerConfig);

const worker = new Worker(join(__dirname, 'wallet.worker.js'), {
workerData
});
this.workers[chainId] = worker;

worker.on('error', (error) =>
this.loggerService.fatal(
{ error: tryErrorToString(error), chainId },
`Error on wallet worker.`,
),
);

worker.on('exit', (exitCode) => {
this.workers[chainId] = null;
this.loggerService.fatal(
{ exitCode, chainId },
`Wallet worker exited.`,
);
});
this.spawnWorker(chainId);
}

// Add a small delay to wait for the workers to be initialized
Expand Down Expand Up @@ -152,9 +143,10 @@ export class WalletService implements OnModuleInit {

private loadWorkerConfig(
chainId: string,
defaultConfig: DefaultWalletWorkerData
): WalletWorkerData {

const defaultConfig = this.defaultWorkerConfig;

const chainConfig = this.configService.chainsConfig.get(chainId);
if (chainConfig == undefined) {
throw new Error(`Unable to load config for chain ${chainId}`);
Expand Down Expand Up @@ -215,6 +207,56 @@ export class WalletService implements OnModuleInit {
};
}

private spawnWorker(
chainId: string
): void {
const workerData = this.loadWorkerConfig(chainId);
this.loggerService.info(
{
chainId,
workerData,
},
`Spawning wallet worker.`
);

const worker = new Worker(join(__dirname, 'wallet.worker.js'), {
workerData
});
this.workers[chainId] = worker;

worker.on('error', (error) =>
this.loggerService.error(
{ error: tryErrorToString(error), chainId },
`Error on wallet worker.`,
),
);

worker.on('exit', (exitCode) => {
this.workers[chainId] = null;
this.loggerService.error(
{ exitCode, chainId },
`Wallet worker exited.`,
);

this.abortPendingRequests(chainId);
this.spawnWorker(chainId);
this.recoverQueuedMessages(chainId);
});

worker.on('message', (message: WalletServiceRoutingMessage) => {
const portDescription = this.ports[message.portId];
if (portDescription == undefined) {
this.loggerService.error(
message,
`Unable to route transaction response on wallet: port id not found.`
);
return;
}

portDescription.port.postMessage(message.data);
});
}

private initiateIntervalStatusLog(): void {
const logStatus = () => {
const activeWorkers = [];
Expand All @@ -232,32 +274,91 @@ export class WalletService implements OnModuleInit {
setInterval(logStatus, STATUS_LOG_INTERVAL);
}

async attachToWallet(chainId: string): Promise<MessagePort> {

const portId = this.portsCount++;

const { port1, port2 } = new MessageChannel();

private getNextRequestPortMessageId(): number {
return this.requestPortMessageId++;
port1.on('message', (message: WalletTransactionRequestMessage) => {
this.handleTransactionRequestMessage(
chainId,
portId,
message,
);
});

this.ports[portId] = {
chainId,
port: port1,
};

return port2;
}

async attachToWallet(chainId: string): Promise<MessagePort> {
private handleTransactionRequestMessage(
chainId: string,
portId: number,
message: WalletTransactionRequestMessage
): void {
const worker = this.workers[chainId];

const routingMessage: WalletServiceRoutingMessage = {
portId,
data: message
};

if (worker == undefined) {
throw new Error(`Wallet does not exist for chain ${chainId}`);
}
this.loggerService.warn(
{
chainId,
portId,
message
},
`Wallet does not exist for the requested chain. Queueing message.`
);

const messageId = this.getNextRequestPortMessageId();
const portPromise = new Promise<MessagePort>((resolve) => {
const listener = (data: WalletGetPortResponse) => {
if (data.messageId == messageId) {
worker.off("message", listener);
resolve(data.port);
}
};
worker.on("message", listener);
if (!(chainId in this.queuedRequests)) {
this.queuedRequests[chainId] = [];
}
this.queuedRequests[chainId]!.push(routingMessage);
} else {
worker.postMessage(routingMessage);
}
}

const portMessage: WalletGetPortMessage = { messageId };
worker.postMessage(portMessage);
});
private abortPendingRequests(
chainId: string,
): void {
for (const portDescription of Object.values(this.ports)) {
if (portDescription.chainId === chainId) {
portDescription.port.postMessage({
messageId: WALLET_WORKER_CRASHED_MESSAGE_ID
});
}
}
}

return portPromise;
private recoverQueuedMessages(
chainId: string,
): void {
const queuedRequests = this.queuedRequests[chainId] ?? [];
this.queuedRequests[chainId] = [];

this.loggerService.info(
{
chainId,
count: queuedRequests.length,
},
`Recovering queued wallet requests.`
);

for (const request of queuedRequests) {
this.handleTransactionRequestMessage(
chainId,
request.portId,
request.data,
);
}
}
}
11 changes: 3 additions & 8 deletions src/wallet/wallet.types.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import { TransactionRequest, TransactionReceipt, TransactionResponse } from "ethers6";
import { MessagePort } from "worker_threads";



// Port Channels Types
// ************************************************************************************************
export interface WalletGetPortMessage {
messageId: number;
}

export interface WalletGetPortResponse {
messageId: number;
port: MessagePort;
export interface WalletServiceRoutingMessage<T = any> {
portId: number;
data: T;
}

//TODO add 'priority'
Expand Down
Loading

0 comments on commit 47d4156

Please sign in to comment.