From 98e165a07a94ee72d4094028fc29b767c687b36e Mon Sep 17 00:00:00 2001 From: Jason Bruwer Date: Wed, 27 Nov 2024 17:45:11 +0000 Subject: [PATCH] feat(backend): caching on asset and wallet address including other enhancements (#3041) * feat(2981): initial caching updates for asset and wallet address. * feat(2981): compilation issues. * feat(2981): compilation issues. * feat(2981): remove interledgerjs and merge with main branch. * feat(2981): fix test cases for service layers without cache. * feat(2981): fix test cases for service layers without cache. * feat(2981): Blair review feedback. * feat(2981): Test case for cache on wallet service. * feat(2981): Test case for cache on wallet service and asset. * feat(2981): middleware stats. * feat(2981): comparable balance calculation. * feat(2981): remove header. * feat(2981): review feedback. * feat(2981): cache peer issue. * feat(2981): getCounter is getHistogram fix. * feat(2981): case fix. * feat(2981): fix review comments from Max. * feat(2981): fix review comments from Max. * feat(2981): make use of cache for outgoing payment. * feat(2981): fix quote. * feat(2981): caching in the outgoing payment. * feat(2981): fix caching misplacement. * feat(2981): fix case issue with lowercase. * feat(2981): review feedback from Max. * feat(2981): fix asset. * feat(2981): address review comments from Max. * feat(2981): address review comments from @mkurapov. Enhancements. * feat(2981): address review comments from @mkurapov. Enhancements. * feat(2981): address review comments from @mkurapov. Make use of cdi for cache. * feat(2981): address review comments from @mkurapov. Dashboard and object containing. * feat(2981): address review comments from @mkurapov. Remove timer on error handler. * feat(2981): address review comments from @mkurapov. Cache walletAddress on quote. * feat(2981): address review comments from @mkurapov. Formatting. --- .gitignore | 2 +- .../provisioning/dashboards/example.json | 86 ++++++++--- .../backend/src/accounting/psql/service.ts | 121 +++++++-------- .../src/accounting/tigerbeetle/service.ts | 140 ++++++++++-------- packages/backend/src/asset/service.test.ts | 96 ++++++++++++ packages/backend/src/asset/service.ts | 31 +++- packages/backend/src/config/app.ts | 7 +- .../src/graphql/middleware/index.test.ts | 5 +- .../backend/src/graphql/middleware/index.ts | 4 +- .../graphql/resolvers/wallet_address.test.ts | 5 +- packages/backend/src/index.ts | 17 ++- .../middleware/cache/data-stores/in-memory.ts | 17 ++- .../src/middleware/cache/data-stores/index.ts | 6 +- .../src/middleware/cache/data-stores/redis.ts | 2 +- .../src/middleware/cache/index.test.ts | 2 +- .../backend/src/middleware/cache/index.ts | 2 +- .../payment/incoming/service.test.ts | 5 +- .../open_payments/payment/incoming/service.ts | 135 ++++++++++++----- .../payment/outgoing/service.test.ts | 3 +- .../open_payments/payment/outgoing/service.ts | 60 ++++++-- .../src/open_payments/quote/service.test.ts | 5 +- .../src/open_payments/quote/service.ts | 37 ++++- .../wallet_address/service.test.ts | 98 +++++++++++- .../open_payments/wallet_address/service.ts | 75 +++++++--- .../ilp/connector/core/middleware/balance.ts | 16 +- .../core/middleware/stream-address.ts | 9 +- .../src/payment-method/ilp/peer/service.ts | 60 +++++--- packages/backend/src/rates/service.test.ts | 2 +- packages/backend/src/rates/service.ts | 2 +- packages/backend/src/tests/quote.ts | 8 +- packages/backend/src/tests/telemetry.ts | 1 + 31 files changed, 777 insertions(+), 282 deletions(-) diff --git a/.gitignore b/.gitignore index d083dacb07..10914889f5 100644 --- a/.gitignore +++ b/.gitignore @@ -62,4 +62,4 @@ build packages/**/src/openapi/specs/schemas.yaml packages/**/src/openapi/specs/auth-server.yaml packages/**/src/openapi/specs/resource-server.yaml -packages/**/src/openapi/specs/wallet-address-server.yaml \ No newline at end of file +packages/**/src/openapi/specs/wallet-address-server.yaml diff --git a/localenv/telemetry/grafana/provisioning/dashboards/example.json b/localenv/telemetry/grafana/provisioning/dashboards/example.json index 42ed96baaa..426b329919 100644 --- a/localenv/telemetry/grafana/provisioning/dashboards/example.json +++ b/localenv/telemetry/grafana/provisioning/dashboards/example.json @@ -72,7 +72,7 @@ "textMode": "auto", "wideLayout": true }, - "pluginVersion": "11.2.0", + "pluginVersion": "11.3.1", "targets": [ { "datasource": { @@ -142,7 +142,7 @@ "textMode": "auto", "wideLayout": true }, - "pluginVersion": "11.2.0", + "pluginVersion": "11.3.1", "targets": [ { "datasource": { @@ -238,7 +238,7 @@ "textMode": "auto", "wideLayout": true }, - "pluginVersion": "11.2.0", + "pluginVersion": "11.3.1", "targets": [ { "datasource": { @@ -322,6 +322,12 @@ "id": 6, "options": { "displayMode": "gradient", + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": false + }, "maxVizHeight": 300, "minVizHeight": 16, "minVizWidth": 8, @@ -336,7 +342,7 @@ "sizing": "auto", "valueMode": "color" }, - "pluginVersion": "11.2.0", + "pluginVersion": "11.3.1", "targets": [ { "datasource": { @@ -412,7 +418,7 @@ } ] }, - "pluginVersion": "11.2.0", + "pluginVersion": "11.3.1", "targets": [ { "datasource": { @@ -540,6 +546,7 @@ "sort": "none" } }, + "pluginVersion": "11.3.1", "targets": [ { "datasource": { @@ -742,6 +749,7 @@ "sort": "none" } }, + "pluginVersion": "11.3.1", "targets": [ { "datasource": { @@ -906,6 +914,7 @@ "sort": "none" } }, + "pluginVersion": "11.3.1", "targets": [ { "datasource": { @@ -1031,6 +1040,7 @@ "sort": "none" } }, + "pluginVersion": "11.3.1", "targets": [ { "datasource": { @@ -1233,6 +1243,7 @@ "sort": "none" } }, + "pluginVersion": "11.3.1", "targets": [ { "datasource": { @@ -1375,7 +1386,30 @@ }, "unit": "ms" }, - "overrides": [] + "overrides": [ + { + "__systemRef": "hideSeriesFrom", + "matcher": { + "id": "byNames", + "options": { + "mode": "exclude", + "names": ["AccountingService:Postgres:getAccountBalances"], + "prefix": "All except:", + "readOnly": true + } + }, + "properties": [ + { + "id": "custom.hideFrom", + "value": { + "legend": false, + "tooltip": false, + "viz": true + } + } + ] + } + ] }, "gridPos": { "h": 7, @@ -1396,6 +1430,7 @@ "sort": "none" } }, + "pluginVersion": "11.3.1", "targets": [ { "datasource": { @@ -1408,9 +1443,22 @@ "legendFormat": "__auto", "range": true, "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${PrometheusDS}" + }, + "editorMode": "code", + "expr": "sum by (callName) (rate(tb_get_account_balances_sum[$__rate_interval])/rate(tb_get_account_balances_count[$__rate_interval]))", + "hide": false, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "B" } ], - "title": "PSQL Get Account Balance", + "title": "Get Account Balance", "type": "timeseries" }, { @@ -1462,7 +1510,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -1493,6 +1542,7 @@ "sort": "none" } }, + "pluginVersion": "11.3.1", "targets": [ { "datasource": { @@ -1500,7 +1550,7 @@ "uid": "${PrometheusDS}" }, "editorMode": "code", - "expr": "(increase(packet_amount_fulfill_total[30s]) * 0.5 * 0.0001)", + "expr": "sum by (source) (increase(packet_amount_fulfill_total[30s]) * 0.0001)", "hide": false, "instant": false, "interval": "", @@ -1561,7 +1611,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -1592,6 +1643,7 @@ "sort": "none" } }, + "pluginVersion": "11.3.1", "targets": [ { "datasource": { @@ -1599,7 +1651,7 @@ "uid": "${PrometheusDS}" }, "editorMode": "code", - "expr": "sum by (source) (round(increase(transactions_total[30s]) * 0.5))", + "expr": "sum by (source) (round(increase(transactions_total[30s])))", "hide": false, "instant": false, "interval": "", @@ -1612,46 +1664,38 @@ "type": "timeseries" } ], + "preload": false, "refresh": "15s", - "schemaVersion": 39, + "schemaVersion": 40, "tags": [], "templating": { "list": [ { "current": { - "selected": false, "text": "Prometheus", "value": "PBFA97CFB590B2093" }, "description": "The Prometheus data-source.", - "hide": 0, "includeAll": false, - "multi": false, "name": "PrometheusDS", "options": [], "query": "prometheus", - "queryValue": "", "refresh": 1, "regex": "", - "skipUrlSync": false, "type": "datasource" }, { "current": { - "selected": false, "text": "Tempo", "value": "P214B5B846CF3925F" }, "description": "The Tempo datasource uid.", - "hide": 0, "includeAll": false, - "multi": false, "name": "TempoDS", "options": [], "query": "tempo", "refresh": 1, "regex": "", - "skipUrlSync": false, "type": "datasource" } ] diff --git a/packages/backend/src/accounting/psql/service.ts b/packages/backend/src/accounting/psql/service.ts index e6af38d9f9..fe67500eb6 100644 --- a/packages/backend/src/accounting/psql/service.ts +++ b/packages/backend/src/accounting/psql/service.ts @@ -161,9 +161,7 @@ export async function getAccountTotalSent( } const totalsSent = (await getAccountBalances(deps, account)).debitsPosted - stopTimer() - return totalsSent } @@ -231,66 +229,73 @@ export async function createTransfer( deps: ServiceDependencies, args: TransferOptions ): Promise { - return createAccountToAccountTransfer(deps, { - transferArgs: args, - voidTransfers: async (transferRefs) => voidTransfers(deps, transferRefs), - postTransfers: async (transferRefs) => postTransfers(deps, transferRefs), - getAccountReceived: async (accountRef) => - getAccountTotalReceived(deps, accountRef), - getAccountBalance: async (accountRef) => - getLiquidityAccountBalance(deps, accountRef), - createPendingTransfers: async (transfersToCreate) => { - const [ - sourceAccount, - sourceAssetAccount, - destinationAccount, - destinationAssetAccount - ] = await Promise.all([ - getLiquidityAccount(deps, args.sourceAccount.id), - getLiquidityAccount(deps, args.sourceAccount.asset.id), - getLiquidityAccount(deps, args.destinationAccount.id), - getLiquidityAccount(deps, args.destinationAccount.asset.id) - ]) - - if (!sourceAccount || !sourceAssetAccount) { - return TransferError.UnknownSourceAccount - } - - if (!destinationAccount || !destinationAssetAccount) { - return TransferError.UnknownDestinationAccount - } + const stopTimer = deps.telemetry.startTimer('psql_create_transfer_ms', { + callName: 'AccountingService:Postgres:createTransfer' + }) + try { + return createAccountToAccountTransfer(deps, { + transferArgs: args, + voidTransfers: async (transferRefs) => voidTransfers(deps, transferRefs), + postTransfers: async (transferRefs) => postTransfers(deps, transferRefs), + getAccountReceived: async (accountRef) => + getAccountTotalReceived(deps, accountRef), + getAccountBalance: async (accountRef) => + getLiquidityAccountBalance(deps, accountRef), + createPendingTransfers: async (transfersToCreate) => { + const [ + sourceAccount, + sourceAssetAccount, + destinationAccount, + destinationAssetAccount + ] = await Promise.all([ + getLiquidityAccount(deps, args.sourceAccount.id), + getLiquidityAccount(deps, args.sourceAccount.asset.id), + getLiquidityAccount(deps, args.destinationAccount.id), + getLiquidityAccount(deps, args.destinationAccount.asset.id) + ]) + + if (!sourceAccount || !sourceAssetAccount) { + return TransferError.UnknownSourceAccount + } + + if (!destinationAccount || !destinationAssetAccount) { + return TransferError.UnknownDestinationAccount + } + + const accountMap = { + [sourceAccount.accountRef]: sourceAccount, + [sourceAssetAccount.accountRef]: sourceAssetAccount, + [destinationAccount.accountRef]: destinationAccount, + [destinationAssetAccount.accountRef]: destinationAssetAccount + } + + const pendingTransfersOrError = handleTransferCreateResults( + args, + transfersToCreate, + await createTransfers( + deps, + transfersToCreate.map((transfer) => ({ + transferRef: uuid(), + debitAccount: accountMap[transfer.sourceAccountId], + creditAccount: accountMap[transfer.destinationAccountId], + amount: transfer.amount, + timeoutMs: BigInt(args.timeout * 1000) + })) + ) + ) - const accountMap = { - [sourceAccount.accountRef]: sourceAccount, - [sourceAssetAccount.accountRef]: sourceAssetAccount, - [destinationAccount.accountRef]: destinationAccount, - [destinationAssetAccount.accountRef]: destinationAssetAccount - } + if (isTransferError(pendingTransfersOrError)) { + return pendingTransfersOrError + } - const pendingTransfersOrError = handleTransferCreateResults( - args, - transfersToCreate, - await createTransfers( - deps, - transfersToCreate.map((transfer) => ({ - transferRef: uuid(), - debitAccount: accountMap[transfer.sourceAccountId], - creditAccount: accountMap[transfer.destinationAccountId], - amount: transfer.amount, - timeoutMs: BigInt(args.timeout * 1000) - })) + return pendingTransfersOrError.map( + (pendingTransfer) => pendingTransfer.transferRef ) - ) - - if (isTransferError(pendingTransfersOrError)) { - return pendingTransfersOrError } - - return pendingTransfersOrError.map( - (pendingTransfer) => pendingTransfer.transferRef - ) - } - }) + }) + } finally { + stopTimer() + } } function handleTransferCreateResults( diff --git a/packages/backend/src/accounting/tigerbeetle/service.ts b/packages/backend/src/accounting/tigerbeetle/service.ts index b9a92b38ad..4bdf94c4a8 100644 --- a/packages/backend/src/accounting/tigerbeetle/service.ts +++ b/packages/backend/src/accounting/tigerbeetle/service.ts @@ -212,8 +212,15 @@ export async function getAccountBalance( deps: ServiceDependencies, id: string ): Promise { - const account = (await getAccounts(deps, [id]))[0] - if (account) return calculateBalance(account) + const stopTimer = deps.telemetry.startTimer('tb_get_account_balances', { + callName: 'AccountingService:Tigerbeetle:getAccountBalances' + }) + try { + const account = (await getAccounts(deps, [id]))[0] + if (account) return calculateBalance(account) + } finally { + stopTimer() + } } export async function getAccountTotalSent( @@ -282,69 +289,76 @@ export async function createTransfer( deps: ServiceDependencies, args: TransferOptions ): Promise { - return createAccountToAccountTransfer(deps, { - transferArgs: args, - voidTransfers: async (transferIds) => { - const error = await createTransfers( - deps, - transferIds.map((transferId) => ({ - voidId: transferId - })) - ) - if (error) return error.error - }, - postTransfers: async (transferIds) => { - const error = await createTransfers( - deps, - transferIds.map((transferId) => ({ - postId: transferId - })) - ) - if (error) return error.error - }, - getAccountReceived: async (accountRef) => - getAccountTotalReceived(deps, accountRef), - getAccountBalance: async (accountRef) => - getAccountBalance(deps, accountRef), - createPendingTransfers: async (transfersToCreate) => { - const tbTransfers: NewTransferOptions[] = transfersToCreate.map( - (transfer) => ({ - id: uuid(), - ledger: transfer.ledger, - sourceAccountId: transfer.sourceAccountId, - destinationAccountId: transfer.destinationAccountId, - amount: transfer.amount, - timeout: args.timeout, - transferRef: uuid(), - code: transferCodeFromType(transfer.transferType) - }) - ) - const error = await createTransfers(deps, tbTransfers) - if (error) { - switch (error.error) { - case TransferError.UnknownSourceAccount: - throw new TigerbeetleUnknownAccountError( - transfersToCreate[error.index].sourceAccountId - ) - case TransferError.UnknownDestinationAccount: - throw new TigerbeetleUnknownAccountError( - transfersToCreate[error.index].destinationAccountId - ) - case TransferError.InsufficientBalance: - if ( - transfersToCreate[error.index].sourceAccountId === - args.destinationAccount.asset.id - ) { - return TransferError.InsufficientLiquidity - } - return TransferError.InsufficientBalance - default: - throw new BalanceTransferError(error.error) + const stopTimer = deps.telemetry.startTimer('tb_create_transfer_ms', { + callName: 'AccountingService:Tigerbeetle:createTransfer' + }) + try { + return createAccountToAccountTransfer(deps, { + transferArgs: args, + voidTransfers: async (transferIds) => { + const error = await createTransfers( + deps, + transferIds.map((transferId) => ({ + voidId: transferId + })) + ) + if (error) return error.error + }, + postTransfers: async (transferIds) => { + const error = await createTransfers( + deps, + transferIds.map((transferId) => ({ + postId: transferId + })) + ) + if (error) return error.error + }, + getAccountReceived: async (accountRef) => + getAccountTotalReceived(deps, accountRef), + getAccountBalance: async (accountRef) => + getAccountBalance(deps, accountRef), + createPendingTransfers: async (transfersToCreate) => { + const tbTransfers: NewTransferOptions[] = transfersToCreate.map( + (transfer) => ({ + id: uuid(), + ledger: transfer.ledger, + sourceAccountId: transfer.sourceAccountId, + destinationAccountId: transfer.destinationAccountId, + amount: transfer.amount, + timeout: args.timeout, + transferRef: uuid(), + code: transferCodeFromType(transfer.transferType) + }) + ) + const error = await createTransfers(deps, tbTransfers) + if (error) { + switch (error.error) { + case TransferError.UnknownSourceAccount: + throw new TigerbeetleUnknownAccountError( + transfersToCreate[error.index].sourceAccountId + ) + case TransferError.UnknownDestinationAccount: + throw new TigerbeetleUnknownAccountError( + transfersToCreate[error.index].destinationAccountId + ) + case TransferError.InsufficientBalance: + if ( + transfersToCreate[error.index].sourceAccountId === + args.destinationAccount.asset.id + ) { + return TransferError.InsufficientLiquidity + } + return TransferError.InsufficientBalance + default: + throw new BalanceTransferError(error.error) + } } + return tbTransfers.map((transfer) => transfer.id.toString()) } - return tbTransfers.map((transfer) => transfer.id.toString()) - } - }) + }) + } finally { + stopTimer() + } } function transferCodeFromType(type?: TransferType): number { diff --git a/packages/backend/src/asset/service.test.ts b/packages/backend/src/asset/service.test.ts index ec30e34407..6c05b221a2 100644 --- a/packages/backend/src/asset/service.test.ts +++ b/packages/backend/src/asset/service.test.ts @@ -20,6 +20,7 @@ import { WalletAddressService } from '../open_payments/wallet_address/service' import { isWalletAddressError } from '../open_payments/wallet_address/errors' import { PeerService } from '../payment-method/ilp/peer/service' import { isPeerError } from '../payment-method/ilp/peer/errors' +import { CacheDataStore } from '../middleware/cache/data-stores' describe('Asset Service', (): void => { let deps: IocContract @@ -314,3 +315,98 @@ describe('Asset Service', (): void => { }) }) }) + +describe('Asset Service using Cache', (): void => { + let deps: IocContract + let appContainer: TestContainer + let assetService: AssetService + let assetCache: CacheDataStore + + beforeAll(async (): Promise => { + deps = initIocContainer({ + ...Config, + localCacheDuration: 5_000 // 5-second default. + }) + appContainer = await createTestApp(deps) + assetService = await deps.use('assetService') + assetCache = await deps.use('assetCache') + }) + + afterEach(async (): Promise => { + await truncateTables(appContainer.knex) + }) + + afterAll(async (): Promise => { + await appContainer.shutdown() + }) + + describe('create, update and retrieve asset using cache', (): void => { + test.each` + withdrawalThreshold | liquidityThreshold + ${undefined} | ${undefined} + ${BigInt(5)} | ${undefined} + ${undefined} | ${BigInt(5)} + ${BigInt(5)} | ${BigInt(5)} + `( + 'Asset can be created, updated and fetched', + async ({ withdrawalThreshold, liquidityThreshold }): Promise => { + const options = { + ...randomAsset(), + withdrawalThreshold, + liquidityThreshold + } + + const spyCacheSet = jest.spyOn(assetCache, 'set') + + const asset = await assetService.create(options) + assert.ok(!isAssetError(asset)) + expect(asset).toMatchObject({ + ...options, + id: asset.id, + ledger: asset.ledger, + withdrawalThreshold: withdrawalThreshold || null, + liquidityThreshold: liquidityThreshold || null + }) + // Ensure that the cache was set for create: + expect(spyCacheSet).toHaveBeenCalledTimes(1) + + const spyCacheGet = jest.spyOn(assetCache, 'get') + await expect(assetService.get(asset.id)).resolves.toEqual(asset) + + expect(spyCacheGet).toHaveBeenCalledTimes(1) + expect(spyCacheGet).toHaveBeenCalledWith(asset.id) + + // Update the asset: + const spyCacheUpdateSet = jest.spyOn(assetCache, 'set') + const assetUpdate = await assetService.update({ + id: asset.id, + withdrawalThreshold, + liquidityThreshold + }) + assert.ok(!isAssetError(asset)) + + await expect(assetService.get(asset.id)).resolves.toEqual(assetUpdate) + + expect(spyCacheUpdateSet).toHaveBeenCalledTimes(2) + expect(spyCacheUpdateSet).toHaveBeenCalledWith( + asset.id, + expect.objectContaining({ + id: asset.id, + code: asset.code, + deletedAt: null + }) + ) + + // Delete the asset, and ensure it is not cached: + const deletedAsset = await assetService.delete({ + id: asset.id, + deletedAt: new Date() + }) + assert.ok(!isAssetError(deletedAsset)) + expect(deletedAsset.deletedAt).not.toBeNull() + + await expect(assetService.get(asset.id)).resolves.toBeUndefined() + } + ) + }) +}) diff --git a/packages/backend/src/asset/service.ts b/packages/backend/src/asset/service.ts index 8967fc5d32..5dbe8b63c1 100644 --- a/packages/backend/src/asset/service.ts +++ b/packages/backend/src/asset/service.ts @@ -7,6 +7,7 @@ import { BaseService } from '../shared/baseService' import { AccountingService, LiquidityAccountType } from '../accounting/service' import { WalletAddress } from '../open_payments/wallet_address/model' import { Peer } from '../payment-method/ilp/peer/model' +import { CacheDataStore } from '../middleware/cache/data-stores' export interface AssetOptions { code: string @@ -40,21 +41,26 @@ export interface AssetService { interface ServiceDependencies extends BaseService { accountingService: AccountingService + assetCache: CacheDataStore } export async function createAssetService({ logger, knex, - accountingService + accountingService, + assetCache }: ServiceDependencies): Promise { const log = logger.child({ service: 'AssetService' }) + const deps: ServiceDependencies = { logger: log, knex, - accountingService + accountingService, + assetCache } + return { create: (options) => createAsset(deps, options), update: (options) => updateAsset(deps, options), @@ -82,9 +88,11 @@ async function createAsset( if (deletedAsset) { // if found, enable - return await Asset.query(deps.knex) + const reActivated = await Asset.query(deps.knex) .patchAndFetchById(deletedAsset.id, { deletedAt: null }) .throwIfNotFound() + await deps.assetCache.set(reActivated.id, reActivated) + return reActivated } // Asset rows include a smallserial 'ledger' column that would have sequence gaps @@ -100,6 +108,7 @@ async function createAsset( withdrawalThreshold, liquidityThreshold }) + await deps.assetCache.set(asset.id, asset) await deps.accountingService.createLiquidityAndLinkedSettlementAccount( asset, LiquidityAccountType.ASSET, @@ -123,9 +132,12 @@ async function updateAsset( throw new Error('Knex undefined') } try { - return await Asset.query(deps.knex) + const asset = await Asset.query(deps.knex) .patchAndFetchById(id, { withdrawalThreshold, liquidityThreshold }) .throwIfNotFound() + + await deps.assetCache.set(id, asset) + return asset } catch (err) { if (err instanceof NotFoundError) { return AssetError.UnknownAsset @@ -142,6 +154,8 @@ async function deleteAsset( if (!deps.knex) { throw new Error('Knex undefined') } + + await deps.assetCache.delete(id) try { // return error in case there is a peer or wallet address using the asset const peer = await Peer.query(deps.knex).where('assetId', id).first() @@ -155,7 +169,6 @@ async function deleteAsset( if (walletAddress) { return AssetError.CannotDeleteInUseAsset } - return await Asset.query(deps.knex) .patchAndFetchById(id, { deletedAt: deletedAt.toISOString() }) .throwIfNotFound() @@ -171,7 +184,13 @@ async function getAsset( deps: ServiceDependencies, id: string ): Promise { - return await Asset.query(deps.knex).whereNull('deletedAt').findById(id) + const inMem = await deps.assetCache.get(id) + if (inMem) return inMem + + const asset = await Asset.query(deps.knex).whereNull('deletedAt').findById(id) + if (asset) await deps.assetCache.set(asset.id, asset) + + return asset } async function getAssetByCodeAndScale( diff --git a/packages/backend/src/config/app.ts b/packages/backend/src/config/app.ts index 46e2d02dff..b266c062ac 100644 --- a/packages/backend/src/config/app.ts +++ b/packages/backend/src/config/app.ts @@ -127,8 +127,8 @@ export const Config = { authServerGrantUrl: envString('AUTH_SERVER_GRANT_URL'), authServerIntrospectionUrl: envString('AUTH_SERVER_INTROSPECTION_URL'), - outgoingPaymentWorkers: envInt('OUTGOING_PAYMENT_WORKERS', 4), - outgoingPaymentWorkerIdle: envInt('OUTGOING_PAYMENT_WORKER_IDLE', 200), // milliseconds + outgoingPaymentWorkers: envInt('OUTGOING_PAYMENT_WORKERS', 1), + outgoingPaymentWorkerIdle: envInt('OUTGOING_PAYMENT_WORKER_IDLE', 10), // milliseconds incomingPaymentWorkers: envInt('INCOMING_PAYMENT_WORKERS', 1), incomingPaymentWorkerIdle: envInt('INCOMING_PAYMENT_WORKER_IDLE', 200), // milliseconds @@ -191,7 +191,8 @@ export const Config = { maxOutgoingPaymentRetryAttempts: envInt( 'MAX_OUTGOING_PAYMENT_RETRY_ATTEMPTS', 5 - ) + ), + localCacheDuration: envInt('LOCAL_CACHE_DURATION_MS', 15_000) } function parseRedisTlsConfig( diff --git a/packages/backend/src/graphql/middleware/index.test.ts b/packages/backend/src/graphql/middleware/index.test.ts index 1c816dd1cd..6456b418df 100644 --- a/packages/backend/src/graphql/middleware/index.test.ts +++ b/packages/backend/src/graphql/middleware/index.test.ts @@ -24,7 +24,10 @@ describe('GraphQL Middleware', (): void => { let assetService: AssetService beforeAll(async (): Promise => { - deps = initIocContainer(Config) + deps = initIocContainer({ + ...Config, + localCacheDuration: 0 + }) appContainer = await createTestApp(deps) assetService = await deps.use('assetService') }) diff --git a/packages/backend/src/graphql/middleware/index.ts b/packages/backend/src/graphql/middleware/index.ts index fb5bcb1b66..d8e490e4c4 100644 --- a/packages/backend/src/graphql/middleware/index.ts +++ b/packages/backend/src/graphql/middleware/index.ts @@ -24,7 +24,9 @@ export function lockGraphQLMutationMiddleware(lock: Lock): { } } -export function idempotencyGraphQLMiddleware(dataStore: CacheDataStore): { +export function idempotencyGraphQLMiddleware( + dataStore: CacheDataStore +): { Mutation: IMiddleware } { return { diff --git a/packages/backend/src/graphql/resolvers/wallet_address.test.ts b/packages/backend/src/graphql/resolvers/wallet_address.test.ts index 12e55044ca..8f559d8a40 100644 --- a/packages/backend/src/graphql/resolvers/wallet_address.test.ts +++ b/packages/backend/src/graphql/resolvers/wallet_address.test.ts @@ -43,7 +43,10 @@ describe('Wallet Address Resolvers', (): void => { let walletAddressService: WalletAddressService beforeAll(async (): Promise => { - deps = await initIocContainer(Config) + deps = await initIocContainer({ + ...Config, + localCacheDuration: 0 + }) appContainer = await createTestApp(deps) knex = appContainer.knex walletAddressService = await deps.use('walletAddressService') diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts index b746db97d2..91db346566 100644 --- a/packages/backend/src/index.ts +++ b/packages/backend/src/index.ts @@ -60,6 +60,7 @@ import { createNoopTelemetryService } from './telemetry/service' import { createWebhookService } from './webhook/service' +import { createInMemoryDataStore } from './middleware/cache/data-stores/in-memory' BigInt.prototype.toJSON = function () { return this.toString() @@ -205,13 +206,17 @@ export function initIocContainer( knex: knex }) }) + container.singleton('assetCache', async () => { + return createInMemoryDataStore(config.localCacheDuration) + }) container.singleton('assetService', async (deps) => { const logger = await deps.use('logger') const knex = await deps.use('knex') return await createAssetService({ logger: logger, knex: knex, - accountingService: await deps.use('accountingService') + accountingService: await deps.use('accountingService'), + assetCache: await deps.use('assetCache') }) }) @@ -278,6 +283,9 @@ export function initIocContainer( logger: await deps.use('logger') }) }) + container.singleton('walletAddressCache', async () => { + return createInMemoryDataStore(config.localCacheDuration) + }) container.singleton('walletAddressService', async (deps) => { const logger = await deps.use('logger') return await createWalletAddressService({ @@ -285,7 +293,9 @@ export function initIocContainer( knex: await deps.use('knex'), logger: logger, accountingService: await deps.use('accountingService'), - webhookService: await deps.use('webhookService') + webhookService: await deps.use('webhookService'), + assetService: await deps.use('assetService'), + walletAddressCache: await deps.use('walletAddressCache') }) }) container.singleton('spspRoutes', async (deps) => { @@ -303,6 +313,7 @@ export function initIocContainer( knex: await deps.use('knex'), accountingService: await deps.use('accountingService'), walletAddressService: await deps.use('walletAddressService'), + assetService: await deps.use('assetService'), config: await deps.use('config') }) }) @@ -472,6 +483,7 @@ export function initIocContainer( receiverService: await deps.use('receiverService'), feeService: await deps.use('feeService'), walletAddressService: await deps.use('walletAddressService'), + assetService: await deps.use('assetService'), paymentMethodHandlerService: await deps.use( 'paymentMethodHandlerService' ), @@ -500,6 +512,7 @@ export function initIocContainer( peerService: await deps.use('peerService'), walletAddressService: await deps.use('walletAddressService'), quoteService: await deps.use('quoteService'), + assetService: await deps.use('assetService'), telemetry: await deps.use('telemetry') }) }) diff --git a/packages/backend/src/middleware/cache/data-stores/in-memory.ts b/packages/backend/src/middleware/cache/data-stores/in-memory.ts index 769f302701..bb9b4beb4c 100644 --- a/packages/backend/src/middleware/cache/data-stores/in-memory.ts +++ b/packages/backend/src/middleware/cache/data-stores/in-memory.ts @@ -1,16 +1,17 @@ import { CacheDataStore } from '.' -interface Cached { +interface Cached { expiry: number - data: string + data: T } -export function createInMemoryDataStore(keyTtlMs: number): CacheDataStore { - const map = new Map() +export function createInMemoryDataStore( + keyTtlMs: number +): CacheDataStore { + const map = new Map>() - const getAndCheckExpiry = (key: string): Cached | undefined => { + const getAndCheckExpiry = (key: string): Cached | undefined => { const cached = map.get(key) - if (cached?.expiry && Date.now() >= cached.expiry) { deleteKey(key) return undefined @@ -22,7 +23,7 @@ export function createInMemoryDataStore(keyTtlMs: number): CacheDataStore { const deleteKey = (key: string) => map.delete(key) return { - async get(key): Promise { + async get(key): Promise { const cached = getAndCheckExpiry(key) return cached?.data @@ -35,7 +36,7 @@ export function createInMemoryDataStore(keyTtlMs: number): CacheDataStore { async delete(key): Promise { deleteKey(key) }, - async set(key: string, value: string): Promise { + async set(key: string, value: T): Promise { map.set(key, { expiry: Date.now() + keyTtlMs, data: value }) return true }, diff --git a/packages/backend/src/middleware/cache/data-stores/index.ts b/packages/backend/src/middleware/cache/data-stores/index.ts index 4c2d7e9e59..7c46433702 100644 --- a/packages/backend/src/middleware/cache/data-stores/index.ts +++ b/packages/backend/src/middleware/cache/data-stores/index.ts @@ -1,7 +1,7 @@ -export interface CacheDataStore { - get(key: string): Promise +export interface CacheDataStore { + get(key: string): Promise getKeyExpiry(key: string): Promise - set(key: string, value: string): Promise + set(key: string, value: T): Promise delete(key: string): Promise deleteAll(): Promise } diff --git a/packages/backend/src/middleware/cache/data-stores/redis.ts b/packages/backend/src/middleware/cache/data-stores/redis.ts index e62ef38ad8..55a1c1c405 100644 --- a/packages/backend/src/middleware/cache/data-stores/redis.ts +++ b/packages/backend/src/middleware/cache/data-stores/redis.ts @@ -4,7 +4,7 @@ import { CacheDataStore } from '.' export function createRedisDataStore( redisClient: Redis, keyTtlMs: number -): CacheDataStore { +): CacheDataStore { return { async get(key: string): Promise { return (await redisClient.get(key)) || undefined diff --git a/packages/backend/src/middleware/cache/index.test.ts b/packages/backend/src/middleware/cache/index.test.ts index 76e3a65709..2c6f4abfde 100644 --- a/packages/backend/src/middleware/cache/index.test.ts +++ b/packages/backend/src/middleware/cache/index.test.ts @@ -8,7 +8,7 @@ describe('Cache Middleware', (): void => { const defaultRequest = () => Promise.resolve('requestResult') const defaultOperationName = 'defaultOperationName' - const dataStore = createInMemoryDataStore(10000) + const dataStore = createInMemoryDataStore(10000) const handleParamMismatch = () => { throw new Error('Param mismatch') } diff --git a/packages/backend/src/middleware/cache/index.ts b/packages/backend/src/middleware/cache/index.ts index 6652c942c7..6b863cfc48 100644 --- a/packages/backend/src/middleware/cache/index.ts +++ b/packages/backend/src/middleware/cache/index.ts @@ -11,7 +11,7 @@ interface CachedRequest { type Request = () => Promise interface CacheMiddlewareArgs { - deps: { logger: Logger; dataStore: CacheDataStore } + deps: { logger: Logger; dataStore: CacheDataStore } idempotencyKey?: string request: Request requestParams: Record diff --git a/packages/backend/src/open_payments/payment/incoming/service.test.ts b/packages/backend/src/open_payments/payment/incoming/service.test.ts index 90465420ba..6826f46b00 100644 --- a/packages/backend/src/open_payments/payment/incoming/service.test.ts +++ b/packages/backend/src/open_payments/payment/incoming/service.test.ts @@ -40,7 +40,10 @@ describe('Incoming Payment Service', (): void => { let config: IAppConfig beforeAll(async (): Promise => { - deps = await initIocContainer(Config) + deps = initIocContainer({ + ...Config, + localCacheDuration: 0 + }) appContainer = await createTestApp(deps) accountingService = await deps.use('accountingService') knex = appContainer.knex diff --git a/packages/backend/src/open_payments/payment/incoming/service.ts b/packages/backend/src/open_payments/payment/incoming/service.ts index 2fac831885..6229c613fe 100644 --- a/packages/backend/src/open_payments/payment/incoming/service.ts +++ b/packages/backend/src/open_payments/payment/incoming/service.ts @@ -18,6 +18,7 @@ import { Amount } from '../../amount' import { IncomingPaymentError } from './errors' import { IAppConfig } from '../../../config/app' import { poll } from '../../../shared/utils' +import { AssetService } from '../../../asset/service' export const POSITIVE_SLIPPAGE = BigInt(1) // First retry waits 10 seconds @@ -57,6 +58,7 @@ export interface ServiceDependencies extends BaseService { knex: TransactionOrKnex accountingService: AccountingService walletAddressService: WalletAddressService + assetService: AssetService config: IAppConfig } @@ -86,20 +88,34 @@ async function getIncomingPayment( deps: ServiceDependencies, options: GetOptions ): Promise { - const incomingPayment = await IncomingPayment.query(deps.knex) - .get(options) - .withGraphFetched('[asset, walletAddress]') - if (incomingPayment) return await addReceivedAmount(deps, incomingPayment) - else return + const incomingPayment = await IncomingPayment.query(deps.knex).get(options) + if (incomingPayment) { + const asset = await deps.assetService.get(incomingPayment.assetId) + if (asset) incomingPayment.asset = asset + + incomingPayment.walletAddress = await deps.walletAddressService.get( + incomingPayment.walletAddressId + ) + + return await addReceivedAmount(deps, incomingPayment) + } else return } async function updateIncomingPayment( deps: ServiceDependencies, options: UpdateOptions ): Promise { - const incomingPayment = await IncomingPayment.query(deps.knex) - .patchAndFetchById(options.id, { metadata: options.metadata }) - .withGraphFetched('[asset, walletAddress]') + const incomingPayment = await IncomingPayment.query( + deps.knex + ).patchAndFetchById(options.id, { metadata: options.metadata }) + if (incomingPayment) { + const asset = await deps.assetService.get(incomingPayment.assetId) + if (asset) incomingPayment.asset = asset + + incomingPayment.walletAddress = await deps.walletAddressService.get( + incomingPayment.walletAddressId + ) + } return incomingPayment ? await addReceivedAmount(deps, incomingPayment) @@ -144,18 +160,25 @@ async function createIncomingPayment( } } - let incomingPayment = await IncomingPayment.query(trx || deps.knex) - .insertAndFetch({ - walletAddressId: walletAddressId, - client, - assetId: walletAddress.asset.id, - expiresAt, - incomingAmount, - metadata, - state: IncomingPaymentState.Pending, - processAt: expiresAt - }) - .withGraphFetched('[asset, walletAddress]') + let incomingPayment = await IncomingPayment.query( + trx || deps.knex + ).insertAndFetch({ + walletAddressId: walletAddressId, + client, + assetId: walletAddress.asset.id, + expiresAt, + incomingAmount, + metadata, + state: IncomingPaymentState.Pending, + processAt: expiresAt + }) + + const asset = await deps.assetService.get(incomingPayment.assetId) + if (asset) incomingPayment.asset = asset + + incomingPayment.walletAddress = await deps.walletAddressService.get( + incomingPayment.walletAddressId + ) await IncomingPaymentEvent.query(trx || deps.knex).insert({ incomingPaymentId: incomingPayment.id, @@ -207,11 +230,19 @@ async function getApprovedOrCanceledIncomingPayment( deps: ServiceDependencies, options: GetOptions ) { - return IncomingPayment.query(deps.knex) + const incomingPayment = await IncomingPayment.query(deps.knex) .get(options) - .withGraphFetched('[asset, walletAddress]') .whereNotNull('approvedAt') .orWhereNotNull('cancelledAt') + if (incomingPayment) { + const asset = await deps.assetService.get(incomingPayment.assetId) + if (asset) incomingPayment.asset = asset + + incomingPayment.walletAddress = await deps.walletAddressService.get( + incomingPayment.walletAddressId + ) + } + return incomingPayment } // Fetch (and lock) an incoming payment for work. @@ -228,11 +259,17 @@ async function processNextIncomingPayment( // If an incoming payment is locked, don't wait — just come back for it later. .skipLocked() .where('processAt', '<=', now) - .withGraphFetched('[asset, walletAddress]') const incomingPayment = incomingPayments[0] if (!incomingPayment) return + const asset = await deps_.assetService.get(incomingPayment.assetId) + if (asset) incomingPayment.asset = asset + + incomingPayment.walletAddress = await deps_.walletAddressService.get( + incomingPayment.walletAddressId + ) + const deps = { ...deps_, knex: trx, @@ -326,9 +363,16 @@ async function getWalletAddressPage( deps: ServiceDependencies, options: ListOptions ): Promise { - const page = await IncomingPayment.query(deps.knex) - .list(options) - .withGraphFetched('[asset, walletAddress]') + const page = await IncomingPayment.query(deps.knex).list(options) + for (const payment of page) { + const asset = await deps.assetService.get(payment.assetId) + if (asset) payment.asset = asset + + payment.walletAddress = await deps.walletAddressService.get( + payment.walletAddressId + ) + } + const amounts = await deps.accountingService.getAccountsTotalReceived( page.map((payment: IncomingPayment) => payment.id) ) @@ -358,12 +402,17 @@ async function approveIncomingPayment( id: string ): Promise { return deps.knex.transaction(async (trx) => { - const payment = await IncomingPayment.query(trx) - .findById(id) - .forUpdate() - .withGraphFetched('[asset, walletAddress]') + const payment = await IncomingPayment.query(trx).findById(id).forUpdate() if (!payment) return IncomingPaymentError.UnknownPayment + + const asset = await deps.assetService.get(payment.assetId) + if (asset) payment.asset = asset + + payment.walletAddress = await deps.walletAddressService.get( + payment.walletAddressId + ) + if (payment.state !== IncomingPaymentState.Pending) return IncomingPaymentError.WrongState @@ -390,12 +439,17 @@ async function cancelIncomingPayment( id: string ): Promise { return deps.knex.transaction(async (trx) => { - const payment = await IncomingPayment.query(trx) - .findById(id) - .forUpdate() - .withGraphFetched('[asset, walletAddress]') + const payment = await IncomingPayment.query(trx).findById(id).forUpdate() if (!payment) return IncomingPaymentError.UnknownPayment + + const asset = await deps.assetService.get(payment.assetId) + if (asset) payment.asset = asset + + payment.walletAddress = await deps.walletAddressService.get( + payment.walletAddressId + ) + if (payment.state !== IncomingPaymentState.Pending) return IncomingPaymentError.WrongState @@ -422,11 +476,16 @@ async function completeIncomingPayment( id: string ): Promise { return deps.knex.transaction(async (trx) => { - const payment = await IncomingPayment.query(trx) - .findById(id) - .forUpdate() - .withGraphFetched('[asset, walletAddress]') + const payment = await IncomingPayment.query(trx).findById(id).forUpdate() if (!payment) return IncomingPaymentError.UnknownPayment + + const asset = await deps.assetService.get(payment.assetId) + if (asset) payment.asset = asset + + payment.walletAddress = await deps.walletAddressService.get( + payment.walletAddressId + ) + if ( ![IncomingPaymentState.Pending, IncomingPaymentState.Processing].includes( payment.state diff --git a/packages/backend/src/open_payments/payment/outgoing/service.test.ts b/packages/backend/src/open_payments/payment/outgoing/service.test.ts index 87251cb1ff..b3e98b502c 100644 --- a/packages/backend/src/open_payments/payment/outgoing/service.test.ts +++ b/packages/backend/src/open_payments/payment/outgoing/service.test.ts @@ -254,7 +254,8 @@ describe('OutgoingPaymentService', (): void => { deps = await initIocContainer({ ...Config, exchangeRatesUrl, - enableTelemetry: true + enableTelemetry: true, + localCacheDuration: 0 }) appContainer = await createTestApp(deps) outgoingPaymentService = await deps.use('outgoingPaymentService') diff --git a/packages/backend/src/open_payments/payment/outgoing/service.ts b/packages/backend/src/open_payments/payment/outgoing/service.ts index 8887235edd..54ede1accb 100644 --- a/packages/backend/src/open_payments/payment/outgoing/service.ts +++ b/packages/backend/src/open_payments/payment/outgoing/service.ts @@ -43,6 +43,7 @@ import { isQuoteError } from '../../quote/errors' import { Pagination, SortOrder } from '../../../shared/baseModel' import { FilterString } from '../../../shared/filters' import { IAppConfig } from '../../../config/app' +import { AssetService } from '../../../asset/service' export interface OutgoingPaymentService extends WalletAddressSubresourceService { @@ -68,6 +69,7 @@ export interface ServiceDependencies extends BaseService { paymentMethodHandlerService: PaymentMethodHandlerService walletAddressService: WalletAddressService quoteService: QuoteService + assetService: AssetService telemetry: TelemetryService } @@ -107,9 +109,7 @@ async function getOutgoingPaymentsPage( ): Promise { const { filter, pagination, sortOrder } = options ?? {} - const query = OutgoingPayment.query(deps.knex).withGraphFetched( - '[quote.asset, walletAddress]' - ) + const query = OutgoingPayment.query(deps.knex).withGraphFetched('quote') if (filter?.receiver?.in && filter.receiver.in.length) { query @@ -126,6 +126,14 @@ async function getOutgoingPaymentsPage( } const page = await query.getPage(pagination, sortOrder) + for (const payment of page) { + payment.walletAddress = await deps.walletAddressService.get( + payment.walletAddressId + ) + const asset = await deps.assetService.get(payment.quote.assetId) + if (asset) payment.quote.asset = asset + } + const amounts = await deps.accountingService.getAccountsTotalSent( page.map((payment: OutgoingPayment) => payment.id) ) @@ -146,9 +154,14 @@ async function getOutgoingPayment( ): Promise { const outgoingPayment = await OutgoingPayment.query(deps.knex) .get(options) - .withGraphFetched('[quote.asset, walletAddress]') - + .withGraphFetched('quote') if (outgoingPayment) { + outgoingPayment.walletAddress = await deps.walletAddressService.get( + outgoingPayment.walletAddressId + ) + const asset = await deps.assetService.get(outgoingPayment.quote.assetId) + if (asset) outgoingPayment.quote.asset = asset + return addSentAmount(deps, outgoingPayment) } } @@ -207,7 +220,13 @@ async function cancelOutgoingPayment( ...(options.reason ? { cancellationReason: options.reason } : {}) } }) - .withGraphFetched('[quote.asset, walletAddress]') + .withGraphFetched('quote') + const asset = await deps.assetService.get(payment.quote.assetId) + if (asset) payment.quote.asset = asset + + payment.walletAddress = await deps.walletAddressService.get( + payment.walletAddressId + ) return addSentAmount(deps, payment) }) @@ -303,7 +322,13 @@ async function createOutgoingPayment( state: OutgoingPaymentState.Funding, grantId }) - .withGraphFetched('[quote.asset, walletAddress]') + .withGraphFetched('quote') + payment.walletAddress = await deps.walletAddressService.get( + payment.walletAddressId + ) + const asset = await deps.assetService.get(payment.quote.assetId) + if (asset) payment.quote.asset = asset + stopTimerInsertPayment() if ( @@ -528,7 +553,7 @@ async function validateGrantAndAddSpentAmountsToPayment( .andWhereNot({ id: payment.id }) - .withGraphFetched('quote.asset') + .withGraphFetched('quote') if (grantPayments.length === 0) { return true @@ -547,6 +572,9 @@ async function validateGrantAndAddSpentAmountsToPayment( } } for (const grantPayment of grantPayments) { + const asset = await deps.assetService.get(grantPayment.quote.assetId) + if (asset) grantPayment.quote.asset = asset + if ( validatePaymentInterval({ limits: paymentLimits, @@ -605,8 +633,12 @@ async function fundPayment( const payment = await OutgoingPayment.query(trx) .findById(id) .forUpdate() - .withGraphFetched('quote.asset') + .withGraphFetched('quote') if (!payment) return FundingError.UnknownPayment + + const asset = await deps.assetService.get(payment.quote.assetId) + if (asset) payment.quote.asset = asset + if (payment.state !== OutgoingPaymentState.Funding) { return FundingError.WrongState } @@ -649,7 +681,15 @@ async function getWalletAddressPage( ): Promise { const page = await OutgoingPayment.query(deps.knex) .list(options) - .withGraphFetched('[quote.asset, walletAddress]') + .withGraphFetched('quote') + for (const payment of page) { + payment.walletAddress = await deps.walletAddressService.get( + payment.walletAddressId + ) + const asset = await deps.assetService.get(payment.quote.assetId) + if (asset) payment.quote.asset = asset + } + const amounts = await deps.accountingService.getAccountsTotalSent( page.map((payment: OutgoingPayment) => payment.id) ) diff --git a/packages/backend/src/open_payments/quote/service.test.ts b/packages/backend/src/open_payments/quote/service.test.ts index 9e9c8712bf..3840212c2e 100644 --- a/packages/backend/src/open_payments/quote/service.test.ts +++ b/packages/backend/src/open_payments/quote/service.test.ts @@ -77,7 +77,10 @@ describe('QuoteService', (): void => { } beforeAll(async (): Promise => { - deps = initIocContainer(Config) + deps = initIocContainer({ + ...Config, + localCacheDuration: 0 + }) appContainer = await createTestApp(deps) knex = appContainer.knex diff --git a/packages/backend/src/open_payments/quote/service.ts b/packages/backend/src/open_payments/quote/service.ts index b8e96145af..957c4065f4 100644 --- a/packages/backend/src/open_payments/quote/service.ts +++ b/packages/backend/src/open_payments/quote/service.ts @@ -21,6 +21,7 @@ import { } from '../../payment-method/handler/errors' import { v4 as uuid } from 'uuid' import { TelemetryService } from '../../telemetry/service' +import { AssetService } from '../../asset/service' export interface QuoteService extends WalletAddressSubresourceService { create(options: CreateQuoteOptions): Promise @@ -31,6 +32,7 @@ export interface ServiceDependencies extends BaseService { knex: TransactionOrKnex receiverService: ReceiverService walletAddressService: WalletAddressService + assetService: AssetService feeService: FeeService paymentMethodHandlerService: PaymentMethodHandlerService telemetry: TelemetryService @@ -54,9 +56,18 @@ async function getQuote( deps: ServiceDependencies, options: GetOptions ): Promise { - return Quote.query(deps.knex) + const quote = await Quote.query(deps.knex) .get(options) - .withGraphFetched('[asset, fee, walletAddress]') + .withGraphFetched('fee') + if (quote) { + const asset = await deps.assetService.get(quote.assetId) + if (asset) quote.asset = asset + + quote.walletAddress = await deps.walletAddressService.get( + quote.walletAddressId + ) + } + return quote } interface QuoteOptionsBase { @@ -188,7 +199,14 @@ async function createQuote( feeId: sendingFee?.id, estimatedExchangeRate: quote.estimatedExchangeRate }) - .withGraphFetched('[asset, fee, walletAddress]') + .withGraphFetched('fee') + const asset = await deps.assetService.get(createdQuote.assetId) + if (asset) createdQuote.asset = asset + + createdQuote.walletAddress = await deps.walletAddressService.get( + createdQuote.walletAddressId + ) + stopQuoteCreate() const stopFinalize = deps.telemetry.startTimer( @@ -428,7 +446,16 @@ async function getWalletAddressPage( deps: ServiceDependencies, options: ListOptions ): Promise { - return await Quote.query(deps.knex) + const quotes = await Quote.query(deps.knex) .list(options) - .withGraphFetched('[asset, fee, walletAddress]') + .withGraphFetched('fee') + for (const quote of quotes) { + const asset = await deps.assetService.get(quote.assetId) + if (asset) quote.asset = asset + + quote.walletAddress = await deps.walletAddressService.get( + quote.walletAddressId + ) + } + return quotes } diff --git a/packages/backend/src/open_payments/wallet_address/service.test.ts b/packages/backend/src/open_payments/wallet_address/service.test.ts index 30db937dd3..b2b7245010 100644 --- a/packages/backend/src/open_payments/wallet_address/service.test.ts +++ b/packages/backend/src/open_payments/wallet_address/service.test.ts @@ -25,6 +25,7 @@ import { Pagination, SortOrder } from '../../shared/baseModel' import { sleep } from '../../shared/utils' import { withConfigOverride } from '../../tests/helpers' import { WalletAddressAdditionalProperty } from './additional_property/model' +import { CacheDataStore } from '../../middleware/cache/data-stores' describe('Open Payments Wallet Address Service', (): void => { let deps: IocContract @@ -35,7 +36,10 @@ describe('Open Payments Wallet Address Service', (): void => { let knex: Knex beforeAll(async (): Promise => { - deps = initIocContainer(Config) + deps = initIocContainer({ + ...Config, + localCacheDuration: 0 + }) config = await deps.use('config') appContainer = await createTestApp(deps) knex = appContainer.knex @@ -797,3 +801,95 @@ describe('Open Payments Wallet Address Service', (): void => { ) }) }) + +describe('Open Payments Wallet Address Service using Cache', (): void => { + let deps: IocContract + let appContainer: TestContainer + let walletAddressService: WalletAddressService + let walletAddressCache: CacheDataStore + let knex: Knex + + beforeAll(async (): Promise => { + deps = initIocContainer({ + ...Config, + localCacheDuration: 5_000 // 5-second default. + }) + appContainer = await createTestApp(deps) + knex = appContainer.knex + walletAddressService = await deps.use('walletAddressService') + walletAddressCache = await deps.use('walletAddressCache') + }) + + afterEach(async (): Promise => { + jest.useRealTimers() + await truncateTables(knex) + }) + + afterAll(async (): Promise => { + await appContainer.shutdown() + }) + + describe('Create, Update and Fetch Wallet Address with cache', (): void => { + test.each` + initialIsActive | status | expectedIsActive | expectedCallCount + ${true} | ${undefined} | ${true} | ${2} + ${true} | ${'INACTIVE'} | ${false} | ${2} + ${false} | ${'ACTIVE'} | ${true} | ${3} + ${false} | ${undefined} | ${false} | ${3} + `( + 'Wallet address with initial isActive of $initialIsActive can be updated with $status status and called $expectedCallCount', + async ({ + initialIsActive, + status, + expectedIsActive, + expectedCallCount + }): Promise => { + const spyCacheSet = jest.spyOn(walletAddressCache, 'set') + const walletAddress = await createWalletAddress(deps) + expect(spyCacheSet).toHaveBeenCalledTimes(1) + + if (!initialIsActive) { + // Only update the database: + await walletAddress.$query(knex).patch({ deactivatedAt: new Date() }) + const fromCacheActive = await walletAddressService.get( + walletAddress.id + ) + + // We don't expect a match here, since the cache and database is out-of-sync: + expect(fromCacheActive!.isActive).toEqual(false) + + // Update through the service, will also update the wallet-address cache: + await walletAddressService.update({ + id: walletAddress.id, + status: 'INACTIVE' + }) + } + + const updatedWalletAddress = await walletAddressService.update({ + id: walletAddress.id, + status + }) + assert.ok(!isWalletAddressError(updatedWalletAddress)) + expect(updatedWalletAddress.isActive).toEqual(expectedIsActive) + + // We expect the [set] to be called again with the new data: + expect(spyCacheSet).toHaveBeenCalledTimes(expectedCallCount) + expect(spyCacheSet).toHaveBeenCalledWith( + walletAddress.id, + expect.objectContaining({ + id: walletAddress.id, + url: walletAddress.url + }) + ) + + const spyCacheGet = jest.spyOn(walletAddressCache, 'get') + await expect( + walletAddressService.get(walletAddress.id) + ).resolves.toEqual(updatedWalletAddress) + + expect(spyCacheGet).toHaveBeenCalledTimes(expectedCallCount - 1) + expect(spyCacheGet).toHaveBeenCalledWith(walletAddress.id) + } + ) + }) +}) diff --git a/packages/backend/src/open_payments/wallet_address/service.ts b/packages/backend/src/open_payments/wallet_address/service.ts index ad14d8f882..b2b79bd5c3 100644 --- a/packages/backend/src/open_payments/wallet_address/service.ts +++ b/packages/backend/src/open_payments/wallet_address/service.ts @@ -26,6 +26,8 @@ import { Pagination, SortOrder } from '../../shared/baseModel' import { WebhookService } from '../../webhook/service' import { poll } from '../../shared/utils' import { WalletAddressAdditionalProperty } from './additional_property/model' +import { AssetService } from '../../asset/service' +import { CacheDataStore } from '../../middleware/cache/data-stores' interface Options { publicName?: string @@ -78,6 +80,8 @@ interface ServiceDependencies extends BaseService { knex: TransactionOrKnex accountingService: AccountingService webhookService: WebhookService + assetService: AssetService + walletAddressCache: CacheDataStore } export async function createWalletAddressService({ @@ -85,7 +89,9 @@ export async function createWalletAddressService({ config, knex, accountingService, - webhookService + webhookService, + assetService, + walletAddressCache }: ServiceDependencies): Promise { const log = logger.child({ service: 'WalletAddressService' @@ -95,7 +101,9 @@ export async function createWalletAddressService({ logger: log, knex, accountingService, - webhookService + webhookService, + assetService, + walletAddressCache } return { create: (options) => createWalletAddress(deps, options), @@ -165,21 +173,25 @@ async function createWalletAddress( ? cleanAdditionalProperties(options.additionalProperties) : undefined - return await WalletAddress.query(deps.knex) - .insertGraphAndFetch({ - url: options.url.toLowerCase(), - publicName: options.publicName, - assetId: options.assetId, - additionalProperties: additionalProperties - }) - .withGraphFetched('asset') + const walletAddress = await WalletAddress.query( + deps.knex + ).insertGraphAndFetch({ + url: options.url.toLowerCase(), + publicName: options.publicName, + assetId: options.assetId, + additionalProperties: additionalProperties + }) + const asset = await deps.assetService.get(walletAddress.assetId) + if (asset) walletAddress.asset = asset + + await deps.walletAddressCache.set(walletAddress.id, walletAddress) + return walletAddress } catch (err) { if (err instanceof ForeignKeyViolationError) { if (err.constraint === 'walletaddresses_assetid_foreign') { return WalletAddressError.UnknownAsset } - } - if (err instanceof UniqueViolationError) { + } else if (err instanceof UniqueViolationError) { if (err.constraint === 'walletaddresses_url_unique') { return WalletAddressError.DuplicateWalletAddress } @@ -209,8 +221,9 @@ async function updateWalletAddress( const updatedWalletAddress = await walletAddress .$query(trx) .patchAndFetch(update) - .withGraphFetched('asset') .throwIfNotFound() + const asset = await deps.assetService.get(updatedWalletAddress.assetId) + if (asset) updatedWalletAddress.asset = asset // Override all existing additional properties if new ones are provided if (additionalProperties) { @@ -231,6 +244,10 @@ async function updateWalletAddress( } await trx.commit() + await deps.walletAddressCache.set( + updatedWalletAddress.id, + updatedWalletAddress + ) return updatedWalletAddress } catch (err) { await trx.rollback() @@ -245,9 +262,16 @@ async function getWalletAddress( deps: ServiceDependencies, id: string ): Promise { - return await WalletAddress.query(deps.knex) - .findById(id) - .withGraphFetched('asset') + const walletAdd = await deps.walletAddressCache.get(id) + if (walletAdd) return walletAdd + + const walletAddress = await WalletAddress.query(deps.knex).findById(id) + if (walletAddress) { + const asset = await deps.assetService.get(walletAddress.assetId) + if (asset) walletAddress.asset = asset + await deps.walletAddressCache.set(id, walletAddress) + } + return walletAddress } async function getWalletAdditionalProperties( @@ -301,9 +325,13 @@ async function getWalletAddressByUrl( deps: ServiceDependencies, url: string ): Promise { - const walletAddress = await WalletAddress.query(deps.knex) - .findOne({ url: url.toLowerCase() }) - .withGraphFetched('asset') + const walletAddress = await WalletAddress.query(deps.knex).findOne({ + url: url.toLowerCase() + }) + if (walletAddress) { + const asset = await deps.assetService.get(walletAddress.assetId) + if (asset) walletAddress.asset = asset + } return walletAddress || undefined } @@ -348,7 +376,10 @@ async function processNextWalletAddresses( // If a wallet address is locked, don't wait — just come back for it later. .skipLocked() .where('processAt', '<=', now) - .withGraphFetched('asset') + for (const walletAddress of walletAddresses) { + const asset = await deps_.assetService.get(walletAddress.assetId) + if (asset) walletAddress.asset = asset + } const deps = { ...deps_, @@ -431,10 +462,6 @@ async function deactivateOpenIncomingPaymentsByWalletAddress( .where('expiresAt', '>', expiresAt) } -export interface CreateSubresourceOptions { - walletAddressId: string -} - export interface WalletAddressSubresourceService< M extends WalletAddressSubresource > { diff --git a/packages/backend/src/payment-method/ilp/connector/core/middleware/balance.ts b/packages/backend/src/payment-method/ilp/connector/core/middleware/balance.ts index f8beb0b487..5023b3d6cd 100644 --- a/packages/backend/src/payment-method/ilp/connector/core/middleware/balance.ts +++ b/packages/backend/src/payment-method/ilp/connector/core/middleware/balance.ts @@ -21,6 +21,9 @@ export function createBalanceMiddleware(): ILPMiddleware { }: ILPContext, next: () => Promise ): Promise => { + const stopTimer = services.telemetry.startTimer('balance_middleware_next', { + callName: 'balanceMiddleware:next' + }) const { amount } = request.prepare const logger = services.logger.child( { module: 'balance-middleware' }, @@ -32,6 +35,7 @@ export function createBalanceMiddleware(): ILPMiddleware { // Ignore zero amount packets if (amount === '0') { await next() + stopTimer() return } @@ -61,6 +65,7 @@ export function createBalanceMiddleware(): ILPMiddleware { if (state.unfulfillable) { await next() + stopTimer() return } @@ -92,17 +97,24 @@ export function createBalanceMiddleware(): ILPMiddleware { ctxThrow(500, destinationAmountOrError.toString()) } } else { + stopTimer() return trxOrError } } - if (state.streamDestination) await next() + if (state.streamDestination) { + await next() + stopTimer() + } if (!state.streamDestination || response.fulfill) { // TODO: make this single-phase if streamDestination === true const trx = await createPendingTransfer() - if (!state.streamDestination) await next() + if (!state.streamDestination) { + await next() + stopTimer() + } if (trx) { if (response.fulfill) { diff --git a/packages/backend/src/payment-method/ilp/connector/core/middleware/stream-address.ts b/packages/backend/src/payment-method/ilp/connector/core/middleware/stream-address.ts index a4b9158c63..17b7ba5869 100644 --- a/packages/backend/src/payment-method/ilp/connector/core/middleware/stream-address.ts +++ b/packages/backend/src/payment-method/ilp/connector/core/middleware/stream-address.ts @@ -2,12 +2,19 @@ import { ILPMiddleware, ILPContext } from '../rafiki' export function createStreamAddressMiddleware(): ILPMiddleware { return async ( - { request, services: { streamServer }, state }: ILPContext, + { request, services: { streamServer, telemetry }, state }: ILPContext, next: () => Promise ): Promise => { + const stopTimer = telemetry.startTimer( + 'create_stream_address_middleware_decode_tag', + { + callName: 'createStreamAddressMiddleware:decodePaymentTag' + } + ) const { destination } = request.prepare // To preserve sender privacy, the accountId wasn't included in the original destination address. state.streamDestination = streamServer.decodePaymentTag(destination) + stopTimer() await next() } } diff --git a/packages/backend/src/payment-method/ilp/peer/service.ts b/packages/backend/src/payment-method/ilp/peer/service.ts index be64dcc520..471e8f44d3 100644 --- a/packages/backend/src/payment-method/ilp/peer/service.ts +++ b/packages/backend/src/payment-method/ilp/peer/service.ts @@ -114,7 +114,12 @@ async function getPeer( deps: ServiceDependencies, id: string ): Promise { - return Peer.query(deps.knex).findById(id).withGraphFetched('asset') + const peer = await Peer.query(deps.knex).findById(id) + if (peer) { + const asset = await deps.assetService.get(peer.assetId) + if (asset) peer.asset = asset + } + return peer } async function createPeer( @@ -131,16 +136,16 @@ async function createPeer( try { return await Peer.transaction(deps.knex, async (trx) => { - const peer = await Peer.query(trx) - .insertAndFetch({ - assetId: options.assetId, - http: options.http, - maxPacketAmount: options.maxPacketAmount, - staticIlpAddress: options.staticIlpAddress, - name: options.name, - liquidityThreshold: options.liquidityThreshold - }) - .withGraphFetched('asset') + const peer = await Peer.query(trx).insertAndFetch({ + assetId: options.assetId, + http: options.http, + maxPacketAmount: options.maxPacketAmount, + staticIlpAddress: options.staticIlpAddress, + name: options.name, + liquidityThreshold: options.liquidityThreshold + }) + const asset = await deps.assetService.get(peer.assetId) + if (asset) peer.asset = asset if (options.http?.incoming) { const err = await addIncomingHttpTokens({ @@ -232,10 +237,13 @@ async function updatePeer( throw err } } - return await Peer.query(trx) + + const peer = await Peer.query(trx) .patchAndFetchById(options.id, options) - .withGraphFetched('asset') .throwIfNotFound() + const asset = await deps.assetService.get(peer.assetId) + if (asset) peer.asset = asset + return peer }) } catch (err) { if (err instanceof NotFoundError) { @@ -314,7 +322,6 @@ async function getPeerByDestinationAddress( // for `staticIlpAddress`s in the accounts table: // new RegExp('^' + staticIlpAddress + '($|\\.)')).test(destinationAddress) const peerQuery = Peer.query(deps.knex) - .withGraphJoined('asset') .where( raw('?', [destinationAddress]), 'like', @@ -344,7 +351,10 @@ async function getPeerByDestinationAddress( } const peer = await peerQuery.first() - + if (peer) { + const asset = await deps.assetService.get(peer.assetId) + if (asset) peer.asset = asset + } return peer || undefined } @@ -375,18 +385,22 @@ async function getPeersPage( pagination?: Pagination, sortOrder?: SortOrder ): Promise { - return await Peer.query(deps.knex) - .getPage(pagination, sortOrder) - .withGraphFetched('asset') + const peers = await Peer.query(deps.knex).getPage(pagination, sortOrder) + for (const peer of peers) { + const asset = await deps.assetService.get(peer.assetId) + if (asset) peer.asset = asset + } + return peers } async function deletePeer( deps: ServiceDependencies, id: string ): Promise { - return Peer.query(deps.knex) - .withGraphFetched('asset') - .deleteById(id) - .returning('*') - .first() + const peer = await Peer.query(deps.knex).deleteById(id).returning('*').first() + if (peer) { + const asset = await deps.assetService.get(peer.assetId) + if (asset) peer.asset = asset + } + return peer } diff --git a/packages/backend/src/rates/service.test.ts b/packages/backend/src/rates/service.test.ts index fb7a709cca..5107a30cb9 100644 --- a/packages/backend/src/rates/service.test.ts +++ b/packages/backend/src/rates/service.test.ts @@ -46,7 +46,7 @@ describe('Rates service', function () { beforeEach(async (): Promise => { // eslint-disable-next-line @typescript-eslint/no-explicit-any - ;((service as any).cachedRates as CacheDataStore).deleteAll() + ;((service as any).cachedRates as CacheDataStore).deleteAll() apiRequestCount = 0 }) diff --git a/packages/backend/src/rates/service.ts b/packages/backend/src/rates/service.ts index e7d55d6a72..759dd0e47b 100644 --- a/packages/backend/src/rates/service.ts +++ b/packages/backend/src/rates/service.ts @@ -54,7 +54,7 @@ export function createRatesService(deps: ServiceDependencies): RatesService { class RatesServiceImpl implements RatesService { private axios: AxiosInstance - private cachedRates: CacheDataStore + private cachedRates: CacheDataStore private inProgressRequests: Record> = {} constructor(private deps: ServiceDependencies) { diff --git a/packages/backend/src/tests/quote.ts b/packages/backend/src/tests/quote.ts index bf6e844d28..26c0928619 100644 --- a/packages/backend/src/tests/quote.ts +++ b/packages/backend/src/tests/quote.ts @@ -165,13 +165,17 @@ export async function createQuote( maxPacketAmount: BigInt('9223372036854775807') }) - const withGraphFetchedArray = ['asset', 'walletAddress'] + const withGraphFetchedArray = [ + 'asset', + 'walletAddress', + 'walletAddress.asset' + ] if (withFee) { withGraphFetchedArray.push('fee') } const withGraphFetchedExpression = `[${withGraphFetchedArray.join(', ')}]` - return await Quote.query() + return Quote.query() .insertAndFetch({ id: quoteId, walletAddressId, diff --git a/packages/backend/src/tests/telemetry.ts b/packages/backend/src/tests/telemetry.ts index d07ad46fc8..70aeca7a54 100644 --- a/packages/backend/src/tests/telemetry.ts +++ b/packages/backend/src/tests/telemetry.ts @@ -47,6 +47,7 @@ export class MockTelemetryService implements TelemetryService { public getInstanceName(): string | undefined { return 'serviceName' } + public async shutdown(): Promise {} public startTimer(): () => void { return () => undefined