Skip to content

Commit

Permalink
feat(deployment): tops up deployments for the same owner in a single tx
Browse files Browse the repository at this point in the history
refs #714
  • Loading branch information
ygrishajev committed Feb 8, 2025
1 parent dbf1944 commit c944de6
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,11 @@ export class BatchSigningClientService {
await this.semaphore.acquire();
try {
return await backOff(() => this.executeTxBatch(inputs), {
maxDelay: 5000,
numOfAttempts: 3,
jitter: "full",
maxDelay: 10000,
startingDelay: 1000,
timeMultiple: 2,
numOfAttempts: 5,
jitter: "none",
retry: async (error: Error, attempt) => {
const isSequenceMismatch = error?.message?.includes("account sequence mismatch");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ export interface ExecDepositDeploymentMsgOptions extends DepositDeploymentMsgOpt
grantee: string;
}

export interface DepositDeploymentMsg {
typeUrl: "/akash.deployment.v1beta3.MsgDepositDeployment";
value: {
id: {
owner: string;
dseq: Long;
};
amount: { denom: string; amount: string };
depositor: string;
};
}

@singleton()
export class RpcMessageService {
getFeesAllowanceGrantMsg({ limit, expiration, granter, grantee }: Omit<SpendingAuthorizationMsgOptions, "denom">) {
Expand Down Expand Up @@ -123,7 +135,7 @@ export class RpcMessageService {
};
}

getDepositDeploymentMsg({ owner, dseq, amount, denom, depositor }: DepositDeploymentMsgOptions) {
getDepositDeploymentMsg({ owner, dseq, amount, denom, depositor }: DepositDeploymentMsgOptions): DepositDeploymentMsg {
return {
typeUrl: "/akash.deployment.v1beta3.MsgDepositDeployment",
value: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { DrainingDeploymentOutput, LeaseRepository } from "@src/deployment/repos
import { averageBlockCountInAnHour } from "@src/utils/constants";
import { DeploymentConfigService } from "../deployment-config/deployment-config.service";

type DrainingDeployment = AutoTopUpDeployment & {
export type DrainingDeployment = AutoTopUpDeployment & {
predictedClosedHeight: number;
blockRate: number;
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import "@test/mocks/logger-service.mock";

import { LoggerService } from "@akashnetwork/logging";
import { faker } from "@faker-js/faker";

import { RpcMessageService, Wallet } from "@src/billing/services";
Expand All @@ -13,8 +14,16 @@ import { MockConfigService } from "@test/mocks/config-service.mock";
import { AkashAddressSeeder } from "@test/seeders/akash-address.seeder";
import { AutoTopUpDeploymentSeeder } from "@test/seeders/auto-top-up-deployment.seeder";
import { DrainingDeploymentSeeder } from "@test/seeders/draining-deployment.seeder";
import { stub } from "@test/services/stub";

jest.mock("@akashnetwork/logging");
jest.mock("@akashnetwork/logging", () => ({
LoggerService: {
forContext: jest.fn().mockReturnValue({
info: jest.fn(),
error: jest.fn()
})
}
}));

describe(TopUpManagedDeploymentsService.name, () => {
let managedSignerService: jest.Mocked<ManagedSignerService>;
Expand All @@ -25,38 +34,26 @@ describe(TopUpManagedDeploymentsService.name, () => {
let cachedBalanceService: jest.Mocked<CachedBalanceService>;
let blockHttpService: jest.Mocked<BlockHttpService>;
let service: TopUpManagedDeploymentsService;
let logger: jest.Mocked<LoggerService>;

const MANAGED_MASTER_WALLET_ADDRESS = AkashAddressSeeder.create();
const DEPLOYMENT_GRANT_DENOM = "ibc/170C677610AC31DF0904FFE09CD3B5C657492170E7E52372E48756B71E56F2F1";
const CURRENT_BLOCK_HEIGHT = 7481457;

beforeEach(() => {
managedSignerService = {
executeManagedTx: jest.fn()
} as Partial<jest.Mocked<ManagedSignerService>> as jest.Mocked<ManagedSignerService>;

managedSignerService = stub<ManagedSignerService>({ executeManagedTx: jest.fn() });
billingConfig = new MockConfigService<{ DEPLOYMENT_GRANT_DENOM: string }>({
DEPLOYMENT_GRANT_DENOM
});

drainingDeploymentService = {
drainingDeploymentService = stub<DrainingDeploymentService>({
paginate: jest.fn(),
calculateTopUpAmount: jest.fn()
} as Partial<jest.Mocked<DrainingDeploymentService>> as jest.Mocked<DrainingDeploymentService>;

managedMasterWallet = {
getFirstAddress: jest.fn().mockResolvedValue(MANAGED_MASTER_WALLET_ADDRESS)
} as Partial<jest.Mocked<Wallet>> as jest.Mocked<Wallet>;

});
managedMasterWallet = stub<Wallet>({ getFirstAddress: jest.fn().mockResolvedValue(MANAGED_MASTER_WALLET_ADDRESS) });
rpcMessageService = new RpcMessageService();

cachedBalanceService = {
get: jest.fn()
} as Partial<jest.Mocked<CachedBalanceService>> as jest.Mocked<CachedBalanceService>;

blockHttpService = {
getCurrentHeight: jest.fn().mockResolvedValue(CURRENT_BLOCK_HEIGHT)
} as Partial<jest.Mocked<BlockHttpService>> as jest.Mocked<BlockHttpService>;
cachedBalanceService = stub<CachedBalanceService>({ get: jest.fn() });
blockHttpService = stub<BlockHttpService>({ getCurrentHeight: jest.fn().mockResolvedValue(CURRENT_BLOCK_HEIGHT) });
logger = (LoggerService.forContext as jest.Mock)() as jest.Mocked<LoggerService>;

service = new TopUpManagedDeploymentsService(
managedSignerService,
Expand Down Expand Up @@ -84,7 +81,8 @@ describe(TopUpManagedDeploymentsService.name, () => {
...DrainingDeploymentSeeder.create({
dseq: Number(deployment.dseq),
owner: deployment.address,
predictedClosedHeight: index === 0 ? predictedClosedHeight1 : predictedClosedHeight2
predictedClosedHeight: index === 0 ? predictedClosedHeight1 : predictedClosedHeight2,
denom: DEPLOYMENT_GRANT_DENOM
})
}))
);
Expand All @@ -98,7 +96,7 @@ describe(TopUpManagedDeploymentsService.name, () => {
await service.topUpDeployments({ concurrency: 10, dryRun: false });

expect(managedSignerService.executeManagedTx).toHaveBeenCalledTimes(deployments.length);
deployments.forEach(deployment => {
deployments.forEach((deployment, index) => {
expect(managedSignerService.executeManagedTx).toHaveBeenCalledWith(deployment.walletId, [
{
typeUrl: "/akash.deployment.v1beta3.MsgDepositDeployment",
Expand All @@ -115,7 +113,57 @@ describe(TopUpManagedDeploymentsService.name, () => {
}
}
]);
expect(logger.info).toHaveBeenCalledWith(
expect.objectContaining({
event: "TOP_UP_DEPLOYMENTS_SUCCESS",
owner: deployment.address,
items: [
expect.objectContaining({
deployment: expect.objectContaining({
blockRate: expect.any(Number),
closedHeight: undefined,
denom: DEPLOYMENT_GRANT_DENOM,
dseq: Number(deployment.dseq),
id: deployment.id,
owner: deployment.address,
address: deployment.address,
predictedClosedHeight: index === 0 ? predictedClosedHeight1 : predictedClosedHeight2,
walletId: deployment.walletId
}),
input: expect.objectContaining({
amount: sufficientAmount,
denom: DEPLOYMENT_GRANT_DENOM,
depositor: MANAGED_MASTER_WALLET_ADDRESS,
dseq: Number(deployment.dseq),
owner: deployment.address
})
})
],
dryRun: false
})
);
});

expect(logger.info).toHaveBeenCalledWith(
expect.objectContaining({
event: "TOP_UP_DEPLOYMENTS_SUMMARY",
summary: expect.objectContaining({
startBlockHeight: CURRENT_BLOCK_HEIGHT,
endBlockHeight: CURRENT_BLOCK_HEIGHT,
deploymentCount: 2,
deploymentTopUpCount: 2,
deploymentTopUpErrorCount: 0,
insufficientBalanceCount: 0,
walletsCount: 2,
walletsTopUpCount: 2,
walletsTopUpErrorCount: 0,
minPredictedClosedHeight: predictedClosedHeight1,
maxPredictedClosedHeight: predictedClosedHeight2,
totalTopUpAmount: expect.any(Number)
}),
dryRun: false
})
);
});

it("should handle errors and continue processing", async () => {
Expand Down Expand Up @@ -184,5 +232,121 @@ describe(TopUpManagedDeploymentsService.name, () => {

expect(managedSignerService.executeManagedTx).not.toHaveBeenCalled();
});

it("should top up draining deployments for the same owner in the same tx", async () => {
const owner = AkashAddressSeeder.create();
const walletId = faker.number.int({ min: 1000000, max: 9999999 });
const deployments = [AutoTopUpDeploymentSeeder.create({ address: owner, walletId }), AutoTopUpDeploymentSeeder.create({ address: owner, walletId })];
const desiredAmount = faker.number.int({ min: 3500000, max: 4000000 });
const sufficientAmount = faker.number.int({ min: 1000000, max: 2000000 });
const predictedClosedHeight = CURRENT_BLOCK_HEIGHT + 1500;

(drainingDeploymentService.paginate as jest.Mock).mockImplementation(async (_, callback) => {
await callback(
deployments.map(deployment => ({
...deployment,
...DrainingDeploymentSeeder.create({
dseq: Number(deployment.dseq),
owner: deployment.address,
predictedClosedHeight
})
}))
);
});

(drainingDeploymentService.calculateTopUpAmount as jest.Mock).mockResolvedValue(desiredAmount);
(cachedBalanceService.get as jest.Mock).mockImplementation(async () => ({
reserveSufficientAmount: () => sufficientAmount
}));

await service.topUpDeployments({ concurrency: 10, dryRun: false });

expect(managedSignerService.executeManagedTx).toHaveBeenCalledTimes(1);
expect(managedSignerService.executeManagedTx).toHaveBeenCalledWith(walletId, [
{
typeUrl: "/akash.deployment.v1beta3.MsgDepositDeployment",
value: {
id: {
owner: owner,
dseq: { high: 0, low: Number(deployments[0].dseq), unsigned: true }
},
amount: {
denom: DEPLOYMENT_GRANT_DENOM,
amount: sufficientAmount.toString()
},
depositor: MANAGED_MASTER_WALLET_ADDRESS
}
},
{
typeUrl: "/akash.deployment.v1beta3.MsgDepositDeployment",
value: {
id: {
owner: owner,
dseq: { high: 0, low: Number(deployments[1].dseq), unsigned: true }
},
amount: {
denom: DEPLOYMENT_GRANT_DENOM,
amount: sufficientAmount.toString()
},
depositor: MANAGED_MASTER_WALLET_ADDRESS
}
}
]);
});

it("should log errors when message preparation fails", async () => {
const deployment = AutoTopUpDeploymentSeeder.create();
const error = new Error("Failed to calculate amount");

(drainingDeploymentService.paginate as jest.Mock).mockImplementation(async (_, callback) => {
await callback([
{
...deployment,
...DrainingDeploymentSeeder.create({
dseq: Number(deployment.dseq),
owner: deployment.address,
predictedClosedHeight: CURRENT_BLOCK_HEIGHT + 1500
})
}
]);
});

(drainingDeploymentService.calculateTopUpAmount as jest.Mock).mockRejectedValue(error);

await service.topUpDeployments({ concurrency: 10, dryRun: false });

expect(logger.error).toHaveBeenCalledWith(
expect.objectContaining({
event: "MESSAGE_PREPARATION_ERROR",
deployment: expect.objectContaining({
address: deployment.address,
walletId: deployment.walletId
}),
message: error.message,
dryRun: false,
stack: expect.any(String)
})
);
expect(logger.info).toHaveBeenCalledWith(
expect.objectContaining({
event: "TOP_UP_DEPLOYMENTS_SUMMARY",
summary: expect.objectContaining({
deploymentCount: 1,
deploymentTopUpCount: 0,
deploymentTopUpErrorCount: 1,
insufficientBalanceCount: 0,
walletsCount: 1,
walletsTopUpCount: 0,
walletsTopUpErrorCount: 1,
minPredictedClosedHeight: CURRENT_BLOCK_HEIGHT + 1500,
maxPredictedClosedHeight: CURRENT_BLOCK_HEIGHT + 1500,
totalTopUpAmount: 0,
startBlockHeight: CURRENT_BLOCK_HEIGHT,
endBlockHeight: CURRENT_BLOCK_HEIGHT
}),
dryRun: false
})
);
});
});
});
Loading

0 comments on commit c944de6

Please sign in to comment.