Skip to content

Commit

Permalink
feat: add new caching route lambda for async routing invocation (#887)
Browse files Browse the repository at this point in the history
* feat: add new caching route lambda for async routing invocation

* fix caching alarm and insights layer naming

* cleaner new caching routing lambda function name passing

* add test to assert against hitting new caching lambda
  • Loading branch information
jsy1218 authored Oct 31, 2024
1 parent d37d39d commit 3d46349
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 10 deletions.
112 changes: 112 additions & 0 deletions bin/stacks/routing-lambda-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,70 @@ export class RoutingLambdaStack extends cdk.NestedStack {

const region = cdk.Stack.of(this).region

const cachingRoutingLambda = new aws_lambda_nodejs.NodejsFunction(this, 'CachingRoutingLambda', {
role: lambdaRole,
runtime: aws_lambda.Runtime.NODEJS_18_X,
entry: path.join(__dirname, '../../lib/handlers/index.ts'),
handler: 'quoteHandler',
// 11/8/23: URA currently calls the Routing API with a timeout of 10 seconds.
// Set this lambda's timeout to be slightly lower to give them time to
// log the response in the event of a failure on our end.
timeout: cdk.Duration.seconds(9),
memorySize: 2560,
deadLetterQueueEnabled: true,
bundling: {
minify: true,
sourceMap: true,
},

awsSdkConnectionReuse: true,

description: 'Caching Routing Lambda',
environment: {
VERSION: '1',
NODE_OPTIONS: '--enable-source-maps',
POOL_CACHE_BUCKET: poolCacheBucket.bucketName,
POOL_CACHE_BUCKET_3: poolCacheBucket3.bucketName,
POOL_CACHE_GZIP_KEY: poolCacheGzipKey,
TOKEN_LIST_CACHE_BUCKET: tokenListCacheBucket.bucketName,
ETH_GAS_STATION_INFO_URL: ethGasStationInfoUrl,
TENDERLY_USER: tenderlyUser,
TENDERLY_PROJECT: tenderlyProject,
TENDERLY_ACCESS_KEY: tenderlyAccessKey,
TENDERLY_NODE_API_KEY: tenderlyNodeApiKey,
// WARNING: Dynamo table name should be the tableinstance.name, e.g. routesDynamoDb.tableName.
// But we tried and had seen lambd version error:
// The following resource(s) failed to create: [RoutingLambda2CurrentVersion49A1BB948389ce4f9c26b15e2ccb07b4c1bab726].
// 2023-09-01 10:22:43 UTC-0700RoutingLambda2CurrentVersion49A1BB948389ce4f9c26b15e2ccb07b4c1bab726CREATE_FAILED
// A version for this Lambda function exists ( 261 ). Modify the function to create a new version.
// Hence we do not want to modify the table name below.
ROUTES_TABLE_NAME: DynamoDBTableProps.RoutesDbTable.Name,
ROUTES_CACHING_REQUEST_FLAG_TABLE_NAME: DynamoDBTableProps.RoutesDbCachingRequestFlagTable.Name,
CACHED_ROUTES_TABLE_NAME: DynamoDBTableProps.CacheRouteDynamoDbTable.Name,
CACHING_REQUEST_FLAG_TABLE_NAME: DynamoDBTableProps.CachingRequestFlagDynamoDbTable.Name,
CACHED_V3_POOLS_TABLE_NAME: DynamoDBTableProps.V3PoolsDynamoDbTable.Name,
V2_PAIRS_CACHE_TABLE_NAME: DynamoDBTableProps.V2PairsDynamoCache.Name,
RPC_PROVIDER_HEALTH_TABLE_NAME: DynamoDBTableProps.RpcProviderHealthStateDbTable.Name,

// tokenPropertiesCachingDynamoDb.tableName is the correct format.
// we will start using the correct ones going forward
TOKEN_PROPERTIES_CACHING_TABLE_NAME: tokenPropertiesCachingDynamoDb.tableName,
UNICORN_SECRET: unicornSecret,
GQL_URL: uniGraphQLEndpoint,
GQL_H_ORGN: uniGraphQLHeaderOrigin,
...jsonRpcProviders,
},
layers: [
aws_lambda.LayerVersion.fromLayerVersionArn(
this,
'CachingInsightsLayer',
`arn:aws:lambda:${region}:580247275435:layer:LambdaInsightsExtension:14`
),
],
tracing: aws_lambda.Tracing.ACTIVE,
logRetention: RetentionDays.TWO_WEEKS,
})

this.routingLambda = new aws_lambda_nodejs.NodejsFunction(this, 'RoutingLambda2', {
role: lambdaRole,
runtime: aws_lambda.Runtime.NODEJS_18_X,
Expand Down Expand Up @@ -153,6 +217,7 @@ export class RoutingLambdaStack extends cdk.NestedStack {
UNICORN_SECRET: unicornSecret,
GQL_URL: uniGraphQLEndpoint,
GQL_H_ORGN: uniGraphQLHeaderOrigin,
CACHING_ROUTING_LAMBDA_FUNCTION_NAME: cachingRoutingLambda.functionName,
...jsonRpcProviders,
},
layers: [
Expand All @@ -166,6 +231,23 @@ export class RoutingLambdaStack extends cdk.NestedStack {
logRetention: RetentionDays.TWO_WEEKS,
})

const cachingLambdaAlarmErrorRate = new aws_cloudwatch.Alarm(this, 'CachingRoutingAPI-LambdaErrorRate', {
metric: new aws_cloudwatch.MathExpression({
expression: 'errors / invocations',
usingMetrics: {
errors: cachingRoutingLambda.metricErrors({
period: Duration.minutes(5),
statistic: 'avg',
}),
invocations: cachingRoutingLambda.metricInvocations({
period: Duration.minutes(5),
statistic: 'avg',
}),
},
}),
threshold: 0.05,
evaluationPeriods: 3,
})
const lambdaAlarmErrorRate = new aws_cloudwatch.Alarm(this, 'RoutingAPI-LambdaErrorRate', {
metric: new aws_cloudwatch.MathExpression({
expression: 'errors / invocations',
Expand All @@ -184,6 +266,14 @@ export class RoutingLambdaStack extends cdk.NestedStack {
evaluationPeriods: 3,
})

const cachingLambdaThrottlesErrorRate = new aws_cloudwatch.Alarm(this, 'CachingRoutingAPI-LambdaThrottles', {
metric: cachingRoutingLambda.metricThrottles({
period: Duration.minutes(5),
statistic: 'sum',
}),
threshold: 10,
evaluationPeriods: 3,
})
const lambdaThrottlesErrorRate = new aws_cloudwatch.Alarm(this, 'RoutingAPI-LambdaThrottles', {
metric: this.routingLambda.metricThrottles({
period: Duration.minutes(5),
Expand All @@ -196,20 +286,42 @@ export class RoutingLambdaStack extends cdk.NestedStack {
if (chatbotSNSArn) {
const chatBotTopic = aws_sns.Topic.fromTopicArn(this, 'ChatbotTopic', chatbotSNSArn)

cachingLambdaAlarmErrorRate.addAlarmAction(new aws_cloudwatch_actions.SnsAction(chatBotTopic))
lambdaAlarmErrorRate.addAlarmAction(new aws_cloudwatch_actions.SnsAction(chatBotTopic))

cachingLambdaThrottlesErrorRate.addAlarmAction(new aws_cloudwatch_actions.SnsAction(chatBotTopic))
lambdaThrottlesErrorRate.addAlarmAction(new aws_cloudwatch_actions.SnsAction(chatBotTopic))
}

const enableProvisionedConcurrency = provisionedConcurrency > 0

const cachingRoutingLambdaAlias = new aws_lambda.Alias(this, 'CachingRoutingLiveAlias', {
aliasName: 'live',
version: cachingRoutingLambda.currentVersion,
provisionedConcurrentExecutions: enableProvisionedConcurrency ? provisionedConcurrency : undefined,
})
this.routingLambdaAlias = new aws_lambda.Alias(this, 'RoutingLiveAlias', {
aliasName: 'live',
version: this.routingLambda.currentVersion,
provisionedConcurrentExecutions: enableProvisionedConcurrency ? provisionedConcurrency : undefined,
})

if (enableProvisionedConcurrency) {
const cachingTarget = new asg.ScalableTarget(this, 'CachingRoutingProvConcASG', {
serviceNamespace: asg.ServiceNamespace.LAMBDA,
maxCapacity: provisionedConcurrency * 10,
minCapacity: provisionedConcurrency,
resourceId: `function:${cachingRoutingLambdaAlias.lambda.functionName}:${cachingRoutingLambdaAlias.aliasName}`,
scalableDimension: 'lambda:function:ProvisionedConcurrency',
})

cachingTarget.node.addDependency(cachingRoutingLambdaAlias)

cachingTarget.scaleToTrackMetric('CachingRoutingProvConcTracking', {
targetValue: 0.7,
predefinedMetric: asg.PredefinedMetric.LAMBDA_PROVISIONED_CONCURRENCY_UTILIZATION,
})

const target = new asg.ScalableTarget(this, 'RoutingProvConcASG', {
serviceNamespace: asg.ServiceNamespace.LAMBDA,
maxCapacity: provisionedConcurrency * 10,
Expand Down
12 changes: 11 additions & 1 deletion lib/handlers/injector-sor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ export abstract class InjectorSOR<Router, QueryParams> extends Injector<
CACHED_ROUTES_TABLE_NAME,
AWS_LAMBDA_FUNCTION_NAME,
V2_PAIRS_CACHE_TABLE_NAME,
CACHING_ROUTING_LAMBDA_FUNCTION_NAME,
} = process.env

const dependenciesByChain: {
Expand Down Expand Up @@ -467,13 +468,22 @@ export abstract class InjectorSOR<Router, QueryParams> extends Injector<
tenderlySimulator,
ethEstimateGasSimulator
)
const newCachedRoutesRolloutPercent = NEW_CACHED_ROUTES_ROLLOUT_PERCENT[chainId]

let routeCachingProvider: IRouteCachingProvider | undefined = undefined

// if the newCachedRoutesRolloutPercent is greater than the random number, use the new caching routing lambda function name,
// so that the caching intent quote handler will invoke the even to the newly created caching routing lambda
const cachingQuoteLambdaName =
Math.random() * 100 < (newCachedRoutesRolloutPercent ?? 0)
? CACHING_ROUTING_LAMBDA_FUNCTION_NAME
: AWS_LAMBDA_FUNCTION_NAME!

if (CACHED_ROUTES_TABLE_NAME && CACHED_ROUTES_TABLE_NAME !== '') {
routeCachingProvider = new DynamoRouteCachingProvider({
routesTableName: ROUTES_TABLE_NAME!,
routesCachingRequestFlagTableName: ROUTES_CACHING_REQUEST_FLAG_TABLE_NAME!,
cachingQuoteLambdaName: AWS_LAMBDA_FUNCTION_NAME!,
cachingQuoteLambdaName: cachingQuoteLambdaName,
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ interface ConstructorParams {
/**
* The Lambda Function Name for the Lambda that will be invoked to fill the cache
*/
cachingQuoteLambdaName: string
cachingQuoteLambdaName?: string
}

export class DynamoRouteCachingProvider extends IRouteCachingProvider {
private readonly ddbClient: DynamoDB.DocumentClient
private readonly lambdaClient: Lambda
private readonly routesTableName: string
private readonly routesCachingRequestFlagTableName: string
private readonly cachingQuoteLambdaName: string
private readonly cachingQuoteLambdaName?: string

private readonly DEFAULT_CACHEMODE_ROUTES_DB = CacheMode.Livemode
private readonly ROUTES_DB_TTL = 24 * 60 * 60 // 24 hours
Expand Down Expand Up @@ -312,15 +312,22 @@ export class DynamoRouteCachingProvider extends IRouteCachingProvider {
},
}

const params = {
FunctionName: this.cachingQuoteLambdaName,
InvocationType: 'Event',
Payload: JSON.stringify(payload),
}
if (this.cachingQuoteLambdaName) {
const params = {
FunctionName: this.cachingQuoteLambdaName,
InvocationType: 'Event',
Payload: JSON.stringify(payload),
}

log.info(`[DynamoRouteCachingProvider] Sending async caching request to lambda ${JSON.stringify(params)}`)
log.info(`[DynamoRouteCachingProvider] Sending async caching request to lambda ${JSON.stringify(params)}`)
metric.putMetric(
`CachingQuoteForRoutesDbRequestSentToLambda${this.cachingQuoteLambdaName}`,
1,
MetricLoggerUnit.Count
)

this.lambdaClient.invoke(params).promise()
this.lambdaClient.invoke(params).promise()
}
}

private setRoutesDbCachingIntentFlag(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ import {
USDC_MAINNET,
V3Route,
nativeOnChain,
MetricLoggerUnit,
} from '@uniswap/smart-order-router'
import { DynamoDBTableProps } from '../../../../../../bin/stacks/routing-database-stack'
import { V4Route } from '@uniswap/smart-order-router/build/main/routers'
import { NEW_CACHED_ROUTES_ROLLOUT_PERCENT } from '../../../../../../lib/util/newCachedRoutesRolloutPercent'
import sinon, { SinonSpy } from 'sinon'
import { metric } from '@uniswap/smart-order-router/build/main/util/metric'

chai.use(chaiAsPromised)

Expand Down Expand Up @@ -152,13 +156,63 @@ const TEST_UNCACHED_ROUTES = new CachedRoutes({
})

describe('DynamoRouteCachingProvider', async () => {
let spy: SinonSpy

beforeEach(() => {
spy = sinon.spy(metric, 'putMetric')
})

afterEach(() => {
spy.restore()
})

setupTables(TEST_ROUTE_CACHING_TABLE, TEST_ROUTE_DB_TABLE)
const dynamoRouteCache = new DynamoRouteCachingProvider({
routesTableName: DynamoDBTableProps.RoutesDbTable.Name,
routesCachingRequestFlagTableName: DynamoDBTableProps.RoutesDbCachingRequestFlagTable.Name,
cachingQuoteLambdaName: 'test',
})

it('Cached routes hits new cached routes lambda', async () => {
spy.withArgs('CachingQuoteForRoutesDbRequestSentToLambdanewcachinglambda', 1, MetricLoggerUnit.Count)

// testnet rolls out at 100%
const newCachedRoutesRolloutPercent = NEW_CACHED_ROUTES_ROLLOUT_PERCENT[ChainId.SEPOLIA]

const dynamoRouteCache = new DynamoRouteCachingProvider({
routesTableName: DynamoDBTableProps.RoutesDbTable.Name,
routesCachingRequestFlagTableName: DynamoDBTableProps.RoutesDbCachingRequestFlagTable.Name,
cachingQuoteLambdaName: Math.random() * 100 < (newCachedRoutesRolloutPercent ?? 0) ? 'newcachinglambda' : 'test',
})

const currencyAmount = CurrencyAmount.fromRawAmount(WETH, JSBI.BigInt(1 * 10 ** WETH.decimals))

const cacheMode = await dynamoRouteCache.getCacheMode(
ChainId.MAINNET,
currencyAmount,
USDC_MAINNET,
TradeType.EXACT_INPUT,
[Protocol.V3, Protocol.V4]
)
expect(cacheMode).to.equal(CacheMode.Livemode)

const insertedIntoCache = await dynamoRouteCache.setCachedRoute(TEST_CACHED_ROUTES, currencyAmount)
expect(insertedIntoCache).to.be.true

// Fetches route successfully from cache when it has been cached.
const route = await dynamoRouteCache.getCachedRoute(
ChainId.MAINNET,
currencyAmount,
USDC_MAINNET,
TradeType.EXACT_INPUT,
[Protocol.V3],
TEST_CACHED_ROUTES.blockNumber
)
expect(route).to.not.be.undefined

sinon.assert.called(spy)
})

it('Caches routes properly for a token pair that has its cache configured to Livemode', async () => {
const currencyAmount = CurrencyAmount.fromRawAmount(WETH, JSBI.BigInt(1 * 10 ** WETH.decimals))
const currencyAmountETH = CurrencyAmount.fromRawAmount(
Expand Down

0 comments on commit 3d46349

Please sign in to comment.