diff --git a/infra/prover-cluster/components/DataServicesComponent.ts b/infra/prover-cluster/components/DataServicesComponent.ts index 825e8db30..de79f8e32 100644 --- a/infra/prover-cluster/components/DataServicesComponent.ts +++ b/infra/prover-cluster/components/DataServicesComponent.ts @@ -8,20 +8,15 @@ export interface DataServicesComponentConfig extends BaseComponentConfig { taskDBPassword: string; securityGroupId: pulumi.Output; rdsInstanceClass?: string; - redisNodeType?: string; } export class DataServicesComponent extends BaseComponent { public readonly rdsInstance: aws.rds.Instance; public readonly rdsEndpoint: pulumi.Output; - public readonly redisCluster: aws.elasticache.ReplicationGroup; - public readonly redisEndpoint: pulumi.Output; public readonly s3Bucket: aws.s3.Bucket; public readonly s3BucketName: pulumi.Output; public readonly dbSubnetGroup: aws.rds.SubnetGroup; public readonly rdsSecurityGroup: aws.ec2.SecurityGroup; - public readonly redisSecurityGroup: aws.ec2.SecurityGroup; - public readonly redisSubnetGroup: aws.elasticache.SubnetGroup; constructor(config: DataServicesComponentConfig) { super(config, "boundless-bento"); @@ -63,51 +58,6 @@ export class DataServicesComponent extends BaseComponent { }, }); - // Create Redis subnet group - // ElastiCache subnet group names must be unique across account and max 40 chars - // Use format: bb-redis-{stackName} (truncated to fit 40 chars) - const prefix = "bb-redis-"; - const maxStackNameLength = 40 - prefix.length; - const truncatedStackName = config.stackName.length > maxStackNameLength - ? config.stackName.substring(0, maxStackNameLength) - : config.stackName; - const redisSubnetGroupName = `${prefix}${truncatedStackName}`; - this.redisSubnetGroup = new aws.elasticache.SubnetGroup(`${config.stackName}-redis-subnet-group`, { - name: redisSubnetGroupName, - subnetIds: config.privateSubnetIds, - tags: { - Environment: config.environment, - Stack: config.stackName, - Component: "data-services", - }, - }); - - // Create Redis security group - this.redisSecurityGroup = new aws.ec2.SecurityGroup(`${config.stackName}-redis`, { - name: config.stackName, - vpcId: config.vpcId, - description: "Security group for ElastiCache Redis", - ingress: [{ - protocol: "tcp", - fromPort: 6379, - toPort: 6379, - securityGroups: [config.securityGroupId], - description: "Redis access from cluster instances", - }], - egress: [{ - protocol: "-1", - fromPort: 0, - toPort: 0, - cidrBlocks: ["0.0.0.0/0"], - description: "All outbound traffic", - }], - tags: { - Environment: config.environment, - Stack: config.stackName, - Component: "redis", - }, - }); - // Create RDS PostgreSQL instance this.rdsInstance = new aws.rds.Instance(`${config.stackName}`, { identifier: this.generateName("postgres"), @@ -137,37 +87,6 @@ export class DataServicesComponent extends BaseComponent { // RDS endpoint is just the hostname, need to append port this.rdsEndpoint = pulumi.interpolate`${this.rdsInstance.endpoint}:${this.rdsInstance.port}`; - // Create ElastiCache Redis replication group - // ElastiCache replication group IDs must be 1-40 characters - // Use a shorter name: stackName-redis (max 40 chars) - const redisGroupId = config.stackName.length > 35 - ? `${config.stackName.substring(0, 34)}-redis` - : `${config.stackName}-redis`; - this.redisCluster = new aws.elasticache.ReplicationGroup(`${config.stackName}`, { - replicationGroupId: redisGroupId, - description: `Redis cluster for ${config.stackName}`, - engine: "redis", - engineVersion: "7.1", - nodeType: config.redisNodeType || "cache.t4g.micro", - port: 6379, - parameterGroupName: "default.redis7", - numCacheClusters: 1, - subnetGroupName: this.redisSubnetGroup.name, - securityGroupIds: [this.redisSecurityGroup.id], - atRestEncryptionEnabled: true, - transitEncryptionEnabled: false, - automaticFailoverEnabled: false, - tags: { - Name: config.stackName, - Environment: config.environment, - Stack: config.stackName, - Component: "redis", - }, - }); - - // Redis endpoint is just the hostname, need to append port - this.redisEndpoint = pulumi.interpolate`${this.redisCluster.primaryEndpointAddress}:${this.redisCluster.port}`; - // Create S3 bucket for workflow storage const bucketName = this.generateName("bento-storage"); this.s3Bucket = new aws.s3.Bucket(`${config.stackName}-storage`, { diff --git a/infra/prover-cluster/components/LaunchTemplateComponent.ts b/infra/prover-cluster/components/LaunchTemplateComponent.ts index a3ab76573..3491d3f8a 100644 --- a/infra/prover-cluster/components/LaunchTemplateComponent.ts +++ b/infra/prover-cluster/components/LaunchTemplateComponent.ts @@ -22,7 +22,6 @@ export interface LaunchTemplateConfig extends BaseComponentConfig { componentType: "manager" | "prover" | "execution" | "aux"; volumeSize?: number; rdsEndpoint?: pulumi.Output; - redisEndpoint?: pulumi.Output; s3BucketName?: pulumi.Output; s3AccessKeyId?: pulumi.Output; s3SecretAccessKey?: pulumi.Output; @@ -112,7 +111,6 @@ export class LaunchTemplateComponent extends BaseComponent { this.config.stackName, config.componentType, config.rdsEndpoint!, - config.redisEndpoint!, config.s3BucketName!, config.s3AccessKeyId!, config.s3SecretAccessKey!, @@ -133,14 +131,14 @@ export class LaunchTemplateComponent extends BaseComponent { config.maxFetchRetries || 3, config.allowClientAddresses || "", config.lockinPriorityGas || "0", - ]).apply(([dbName, dbUser, dbPass, rpcUrl, privKey, orderStreamUrl, verifierAddress, boundlessMarketAddress, setVerifierAddress, collateralTokenAddress, chainId, stackName, componentType, rdsEndpoint, redisEndpoint, s3BucketName, s3AccessKeyId, s3SecretAccessKey, mcyclePrice, peakProveKhz, minDeadline, lookbackBlocks, maxCollateral, maxFileSize, maxMcycleLimit, maxConcurrentProofs, balanceWarnThreshold, balanceErrorThreshold, collateralBalanceWarnThreshold, collateralBalanceErrorThreshold, priorityRequestorAddresses, denyRequestorAddresses, maxFetchRetries, allowClientAddresses, lockinPriorityGas]) => { + ]).apply(([dbName, dbUser, dbPass, rpcUrl, privKey, orderStreamUrl, verifierAddress, boundlessMarketAddress, setVerifierAddress, collateralTokenAddress, chainId, stackName, componentType, rdsEndpoint, s3BucketName, s3AccessKeyId, s3SecretAccessKey, mcyclePrice, peakProveKhz, minDeadline, lookbackBlocks, maxCollateral, maxFileSize, maxMcycleLimit, maxConcurrentProofs, balanceWarnThreshold, balanceErrorThreshold, collateralBalanceWarnThreshold, collateralBalanceErrorThreshold, priorityRequestorAddresses, denyRequestorAddresses, maxFetchRetries, allowClientAddresses, lockinPriorityGas]) => { // Extract host from endpoints (format: host:port) const rdsEndpointStr = String(rdsEndpoint); - const redisEndpointStr = String(redisEndpoint); const rdsHost = rdsEndpointStr.split(':')[0]; const rdsPort = rdsEndpointStr.split(':')[1] || '5432'; - const redisHost = redisEndpointStr.split(':')[0]; - const redisPort = redisEndpointStr.split(':')[1] || '6379'; + // Manager runs Redis/Valkey locally, so use localhost + const redisHost = "127.0.0.1"; + const redisPort = "6379"; const brokerTomlContent = `[market] mcycle_price = "${mcyclePrice}" @@ -234,6 +232,29 @@ ${aggregationDimensionsJson.split('\n').map(line => ` ${line}`).join('\n')} runcmd: - | + # Install Valkey (Redis fork) as a systemd service + apt-get update + apt-get install -y software-properties-common + add-apt-repository -y ppa:valkey/valkey + apt-get update + apt-get install -y valkey-server + - | + # Configure Valkey to listen on all interfaces (for worker nodes to connect) + cat > /etc/valkey/valkey.conf << 'VALKEYEOF' + bind 0.0.0.0 + port 6379 + protected-mode no + maxmemory 12gb + maxmemory-policy allkeys-lru + save "" + VALKEYEOF + - | + # Enable and start Valkey service + systemctl enable valkey-server + systemctl restart valkey-server + - | + # Allow Redis/Valkey connections from worker nodes in the security group + # (Security group rules are managed by Pulumi, but we ensure the service is ready) cat /etc/environment.d/bento.conf >> /etc/environment - | /usr/bin/sed -i 's|group_name: "/boundless/bent.*"|group_name: "/boundless/bento/${stackName}/${componentType}"|g' /etc/vector/vector.yaml @@ -269,18 +290,17 @@ runcmd: this.config.stackName, config.componentType, config.rdsEndpoint!, - config.redisEndpoint!, config.s3BucketName!, config.s3AccessKeyId!, config.s3SecretAccessKey! - ]).apply(([managerIp, dbName, dbUser, dbPass, stackName, componentType, rdsEndpoint, redisEndpoint, s3BucketName, s3AccessKeyId, s3SecretAccessKey]) => { + ]).apply(([managerIp, dbName, dbUser, dbPass, stackName, componentType, rdsEndpoint, s3BucketName, s3AccessKeyId, s3SecretAccessKey]) => { // Extract host from endpoints (format: host:port) const rdsEndpointStr = String(rdsEndpoint); - const redisEndpointStr = String(redisEndpoint); const rdsHost = rdsEndpointStr.split(':')[0]; const rdsPort = rdsEndpointStr.split(':')[1] || '5432'; - const redisHost = redisEndpointStr.split(':')[0]; - const redisPort = redisEndpointStr.split(':')[1] || '6379'; + // Workers connect to Redis/Valkey on the manager node + const redisHost = managerIp; + const redisPort = "6379"; const commonEnvVars = this.generateCommonEnvVars(managerIp, dbName, dbUser, dbPass, stackName, componentType, rdsHost, rdsPort, redisHost, redisPort, s3BucketName, s3AccessKeyId, s3SecretAccessKey); let componentSpecificVars = ""; diff --git a/infra/prover-cluster/components/ManagerComponent.ts b/infra/prover-cluster/components/ManagerComponent.ts index 47c1e7dc7..72ba4a9b8 100644 --- a/infra/prover-cluster/components/ManagerComponent.ts +++ b/infra/prover-cluster/components/ManagerComponent.ts @@ -22,7 +22,6 @@ export interface ManagerComponentConfig extends BaseComponentConfig { chainId: string; alertsTopicArns: string[]; rdsEndpoint: pulumi.Output; - redisEndpoint: pulumi.Output; s3BucketName: pulumi.Output; s3AccessKeyId: pulumi.Output; s3SecretAccessKey: pulumi.Output; diff --git a/infra/prover-cluster/components/ScalerComponent.ts b/infra/prover-cluster/components/ScalerComponent.ts new file mode 100644 index 000000000..57e23beb5 --- /dev/null +++ b/infra/prover-cluster/components/ScalerComponent.ts @@ -0,0 +1,343 @@ +import * as pulumi from "@pulumi/pulumi"; +import * as aws from "@pulumi/aws"; +import * as path from "path"; +import * as fs from "fs"; +import * as child_process from "child_process"; +import { BaseComponent, BaseComponentConfig } from "./BaseComponent"; + +export interface ScalerComponentConfig extends BaseComponentConfig { + rdsEndpoint: pulumi.Output; + rdsSecurityGroupId: pulumi.Output; + taskDBName: string; + taskDBUsername: string; + taskDBPassword: string; + proverAsgName: pulumi.Output; + proverAsgArn: pulumi.Output; + scheduleExpression?: string; +} + +export class ScalerComponent extends BaseComponent { + public readonly lambdaFunction: aws.lambda.Function; + public readonly lambdaRole: aws.iam.Role; + public readonly eventRule: aws.cloudwatch.EventRule; + public readonly eventTarget: aws.cloudwatch.EventTarget; + + constructor(config: ScalerComponentConfig) { + super(config, "boundless-bento"); + + // Create IAM role for Lambda + this.lambdaRole = this.createLambdaRole(config); + + // Create security group for Lambda to access RDS + const lambdaSecurityGroup = this.createLambdaSecurityGroup(config); + + // Allow Lambda security group to access RDS + const rdsIngressRule = new aws.ec2.SecurityGroupRule( + `${config.stackName}-lambda-rds-ingress`, + { + type: "ingress", + fromPort: 5432, + toPort: 5432, + protocol: "tcp", + securityGroupId: config.rdsSecurityGroupId, + sourceSecurityGroupId: lambdaSecurityGroup.id, + description: "Allow Lambda to access RDS", + } + ); + + // Construct database URL + const databaseUrl = pulumi.all([config.rdsEndpoint, config.taskDBName, config.taskDBUsername, config.taskDBPassword]) + .apply(([endpoint, dbName, username, password]) => { + // RDS endpoint format: hostname:port + const [host, port] = endpoint.split(":"); + return `postgresql://${username}:${password}@${host}:${port}/${dbName}`; + }); + + // Create Lambda layer with Python dependencies + const lambdaLayer = this.createLambdaLayer(config); + + // Create CloudWatch log group + const logGroup = new aws.cloudwatch.LogGroup( + `${config.stackName}-scaler-log-group`, + { + name: `/aws/lambda/${this.generateName("scaler")}`, + retentionInDays: 7, + tags: { + Environment: config.environment, + Stack: config.stackName, + Component: "scaler", + }, + } + ); + + // Create Lambda deployment package (zip with just the Python handler) + const scalerDir = path.join(__dirname, "../../scaler"); + const lambdaFunctionPath = path.join(scalerDir, "lambda_function.py"); + + if (!fs.existsSync(lambdaFunctionPath)) { + throw new Error(`Lambda handler not found at ${lambdaFunctionPath}`); + } + + // Create a zip file with just lambda_function.py + const lambdaZipPath = path.join(scalerDir, "lambda.zip"); + try { + // Always remove old zip to ensure we get the latest code + if (fs.existsSync(lambdaZipPath)) { + fs.unlinkSync(lambdaZipPath); + } + // Create zip with lambda_function.py at the root + // Use -j to junk paths and put file at root of zip + child_process.execSync( + `cd ${scalerDir} && zip -j ${lambdaZipPath} lambda_function.py`, + { stdio: "inherit" } + ); + console.log(`Created Lambda zip at ${lambdaZipPath}`); + } catch (error) { + throw new Error(`Failed to create Lambda zip file: ${error}`); + } + + if (!fs.existsSync(lambdaZipPath)) { + throw new Error(`Lambda zip file was not created at ${lambdaZipPath}`); + } + + // Use the zip file for Lambda code + const lambdaCode = new pulumi.asset.FileArchive(lambdaZipPath); + + // Create Lambda function with Python code + this.lambdaFunction = new aws.lambda.Function( + `${config.stackName}-scaler-lambda`, + { + name: this.generateName("scaler"), + runtime: "python3.12", + handler: "lambda_function.lambda_handler", + role: this.lambdaRole.arn, + code: lambdaCode, + layers: [lambdaLayer.arn], + memorySize: 256, + timeout: 60, + environment: { + variables: { + DATABASE_URL: databaseUrl, + AUTO_SCALING_GROUP_NAME: config.proverAsgName, + // Note: AWS_REGION is automatically set by Lambda and cannot be overridden + }, + }, + vpcConfig: pulumi.all([config.privateSubnetIds, lambdaSecurityGroup.id]).apply(([subnets, sgId]) => ({ + subnetIds: subnets, + securityGroupIds: [sgId], + })), + tags: { + Environment: config.environment, + Stack: config.stackName, + Component: "scaler", + }, + }, + { + dependsOn: [logGroup, lambdaLayer, rdsIngressRule], + } + ); + + // Create EventBridge rule to trigger Lambda on schedule + const schedule = config.scheduleExpression || "rate(5 minutes)"; + this.eventRule = new aws.cloudwatch.EventRule( + `${config.stackName}-scaler-schedule`, + { + description: `Schedule for ${config.stackName} scaler Lambda`, + scheduleExpression: schedule, + tags: { + Environment: config.environment, + Stack: config.stackName, + Component: "scaler", + }, + } + ); + + // Add Lambda as target for EventBridge rule + this.eventTarget = new aws.cloudwatch.EventTarget( + `${config.stackName}-scaler-target`, + { + rule: this.eventRule.name, + arn: this.lambdaFunction.arn, + } + ); + + // Grant EventBridge permission to invoke Lambda + new aws.lambda.Permission( + `${config.stackName}-scaler-eventbridge-permission`, + { + statementId: "AllowExecutionFromEventBridge", + action: "lambda:InvokeFunction", + function: this.lambdaFunction.name, + principal: "events.amazonaws.com", + sourceArn: this.eventRule.arn, + } + ); + } + + private createLambdaRole(config: ScalerComponentConfig): aws.iam.Role { + const role = new aws.iam.Role( + `${config.stackName}-scaler-role`, + { + name: this.generateName("scaler-role"), + assumeRolePolicy: aws.iam.assumeRolePolicyForPrincipal({ + Service: "lambda.amazonaws.com", + }), + tags: { + Environment: config.environment, + Stack: config.stackName, + Component: "scaler", + }, + } + ); + + // Attach basic Lambda execution role (for CloudWatch Logs) + new aws.iam.RolePolicyAttachment( + `${config.stackName}-scaler-logs`, + { + role: role.name, + policyArn: aws.iam.ManagedPolicies.AWSLambdaBasicExecutionRole, + } + ); + + // Attach VPC access role + new aws.iam.RolePolicyAttachment( + `${config.stackName}-scaler-vpc`, + { + role: role.name, + policyArn: aws.iam.ManagedPolicies.AWSLambdaVPCAccessExecutionRole, + } + ); + + new aws.iam.RolePolicy( + `${config.stackName}-scaler-autoscaling-policy`, + { + role: role.id, + policy: pulumi.all([config.proverAsgArn]).apply(([asgArn]) => + JSON.stringify({ + Version: "2012-10-17", + Statement: [ + { + Effect: "Allow", + Action: [ + "autoscaling:DescribeAutoScalingGroups", + ], + Resource: "*", + }, + { + Effect: "Allow", + Action: [ + "autoscaling:UpdateAutoScalingGroup", + ], + Resource: asgArn, + }, + ], + }) + ), + } + ); + + return role; + } + + private createLambdaLayer(config: ScalerComponentConfig): aws.lambda.LayerVersion { + const scalerDir = path.join(__dirname, "../../scaler"); + const layerDir = path.join(scalerDir, "layer"); + const pythonDir = path.join(layerDir, "python"); + const requirementsPath = path.join(scalerDir, "requirements.txt"); + const layerZipPath = path.join(scalerDir, "layer.zip"); + + // Use Docker to build the layer in an environment matching Lambda's runtime + // This ensures psycopg2-binary is compiled for Amazon Linux, not the local OS + if (fs.existsSync(requirementsPath)) { + try { + console.log("Building Lambda layer using Docker (Amazon Linux environment)..."); + + // Clean up old layer directory and zip + if (fs.existsSync(layerDir)) { + fs.rmSync(layerDir, { recursive: true, force: true }); + } + if (fs.existsSync(layerZipPath)) { + fs.unlinkSync(layerZipPath); + } + + // Create layer directory structure + fs.mkdirSync(pythonDir, { recursive: true }); + + // Use Docker with Lambda Python runtime image to build the layer + // This ensures dependencies are compiled for Amazon Linux + // Override the entrypoint since Lambda images expect a handler as the first argument + const dockerCmd = `docker run --rm --entrypoint /bin/bash ` + + `-v "${scalerDir}:/var/task" -v "${layerDir}:/opt/layer" ` + + `public.ecr.aws/lambda/python:3.12 ` + + `-c "pip install -r /var/task/requirements.txt -t /opt/layer/python --platform manylinux2014_x86_64 --only-binary=:all: --no-cache-dir || pip install -r /var/task/requirements.txt -t /opt/layer/python --no-cache-dir"`; + + child_process.execSync(dockerCmd, { stdio: "inherit" }); + + // Create zip file for layer + child_process.execSync( + `cd ${layerDir} && zip -r ${layerZipPath} python/`, + { stdio: "inherit" } + ); + } catch (error) { + console.warn("Docker build failed, trying local pip install (may not work for psycopg2):", error); + + // Fallback: try local install (may fail for psycopg2 on non-Linux) + try { + if (!fs.existsSync(pythonDir)) { + fs.mkdirSync(pythonDir, { recursive: true }); + } + child_process.execSync( + `pip install -r ${requirementsPath} -t ${pythonDir} --platform manylinux2014_x86_64 --only-binary=:all: || pip install -r ${requirementsPath} -t ${pythonDir}`, + { stdio: "inherit" } + ); + child_process.execSync( + `cd ${layerDir} && zip -r ${layerZipPath} python/`, + { stdio: "inherit" } + ); + } catch (fallbackError) { + throw new Error(`Failed to build Lambda layer: ${error}. Fallback also failed: ${fallbackError}`); + } + } + } + + if (!fs.existsSync(layerZipPath)) { + throw new Error(`Lambda layer zip file was not created at ${layerZipPath}`); + } + + return new aws.lambda.LayerVersion( + `${config.stackName}-scaler-layer`, + { + layerName: this.generateName("scaler-layer"), + code: new pulumi.asset.FileArchive(layerZipPath), + compatibleRuntimes: ["python3.11", "python3.12"], + description: "Python dependencies for scaler Lambda", + } + ); + } + + private createLambdaSecurityGroup(config: ScalerComponentConfig): aws.ec2.SecurityGroup { + return new aws.ec2.SecurityGroup( + `${config.stackName}-scaler-sg`, + { + name: this.generateName("scaler-sg"), + vpcId: config.vpcId, + description: "Security group for scaler Lambda to access RDS", + egress: [ + { + protocol: "-1", + fromPort: 0, + toPort: 0, + cidrBlocks: ["0.0.0.0/0"], + description: "All outbound traffic", + }, + ], + tags: { + Environment: config.environment, + Stack: config.stackName, + Component: "scaler", + }, + } + ); + } +} + diff --git a/infra/prover-cluster/components/SecurityComponent.ts b/infra/prover-cluster/components/SecurityComponent.ts index 56373a3e6..d8d3db4da 100644 --- a/infra/prover-cluster/components/SecurityComponent.ts +++ b/infra/prover-cluster/components/SecurityComponent.ts @@ -74,6 +74,7 @@ export class SecurityComponent extends BaseComponent { { Effect: "Allow", Action: [ + "s3:CreateBucket", "s3:ListBucket", "s3:GetBucketLocation" ], @@ -140,6 +141,13 @@ export class SecurityComponent extends BaseComponent { cidrBlocks: ["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"], description: "VPC Link access to Bento API" }, + { + protocol: "tcp", + fromPort: 6379, + toPort: 6379, + self: true, + description: "Redis/Valkey access from cluster instances" + }, ], egress: [ { @@ -174,6 +182,7 @@ export class SecurityComponent extends BaseComponent { { Effect: "Allow", Action: [ + "s3:CreateBucket", "s3:ListBucket", "s3:GetBucketLocation" ], diff --git a/infra/prover-cluster/components/WorkerClusterComponent.ts b/infra/prover-cluster/components/WorkerClusterComponent.ts index fd067c44b..198f8a017 100644 --- a/infra/prover-cluster/components/WorkerClusterComponent.ts +++ b/infra/prover-cluster/components/WorkerClusterComponent.ts @@ -21,7 +21,6 @@ export interface WorkerClusterConfig extends BaseComponentConfig { auxCount: number; alertsTopicArns: string[]; rdsEndpoint: pulumi.Output; - redisEndpoint: pulumi.Output; s3BucketName: pulumi.Output; s3AccessKeyId: pulumi.Output; s3SecretAccessKey: pulumi.Output; @@ -71,8 +70,8 @@ export class WorkerClusterComponent extends BaseComponent { ...config, launchTemplateId: launchTemplate.launchTemplate.id, launchTemplateUserData: pulumi.output(launchTemplate.launchTemplate.userData).apply(u => u || ""), - minSize: config.proverCount, - maxSize: config.proverCount, + minSize: 0, + maxSize: 100, desiredCapacity: config.proverCount, componentType: "prover", }; diff --git a/infra/prover-cluster/components/index.ts b/infra/prover-cluster/components/index.ts index 586d5972d..fc2ae31e6 100644 --- a/infra/prover-cluster/components/index.ts +++ b/infra/prover-cluster/components/index.ts @@ -6,3 +6,4 @@ export { ManagerComponent, ManagerComponentConfig } from "./ManagerComponent"; export { WorkerClusterComponent, WorkerClusterConfig } from "./WorkerClusterComponent"; export { ApiGatewayComponent, ApiGatewayComponentConfig } from "./ApiGatewayComponent"; export { DataServicesComponent, DataServicesComponentConfig } from "./DataServicesComponent"; +export { ScalerComponent, ScalerComponentConfig } from "./ScalerComponent"; diff --git a/infra/prover-cluster/index.ts b/infra/prover-cluster/index.ts index 0138cd0a9..780605e1d 100644 --- a/infra/prover-cluster/index.ts +++ b/infra/prover-cluster/index.ts @@ -6,6 +6,7 @@ import { WorkerClusterComponent, ApiGatewayComponent, DataServicesComponent, + ScalerComponent, BaseComponentConfig } from "./components"; @@ -96,7 +97,7 @@ const baseComponentConfig: BaseComponentConfig = { // Add security components const security = new SecurityComponent(baseComponentConfig); -// Create data services (RDS PostgreSQL and ElastiCache Redis) +// Create data services (RDS PostgreSQL) const dataServices = new DataServicesComponent({ ...baseComponentConfig, taskDBName, @@ -104,7 +105,6 @@ const dataServices = new DataServicesComponent({ taskDBPassword, securityGroupId: security.securityGroup.id, rdsInstanceClass: config.get("rdsInstanceClass") || "db.t4g.micro", - redisNodeType: config.get("redisNodeType") || "cache.t4g.micro", }); // Create manager instance @@ -127,7 +127,6 @@ const manager = new ManagerComponent({ chainId, alertsTopicArns: alertsTopicArns, rdsEndpoint: dataServices.rdsEndpoint, - redisEndpoint: dataServices.redisEndpoint, s3BucketName: dataServices.s3BucketName, s3AccessKeyId: security.s3AccessKeyId, s3SecretAccessKey: security.s3SecretAccessKey, @@ -166,7 +165,6 @@ const workerCluster = new WorkerClusterComponent({ auxCount, alertsTopicArns: alertsTopicArns, rdsEndpoint: dataServices.rdsEndpoint, - redisEndpoint: dataServices.redisEndpoint, s3BucketName: dataServices.s3BucketName, s3AccessKeyId: security.s3AccessKeyId, s3SecretAccessKey: security.s3SecretAccessKey, @@ -180,6 +178,19 @@ const apiGateway = new ApiGatewayComponent({ apiKey: apiKey.apply(key => key), }); +// Create scaler Lambda to auto-scale prover ASG based on queue depth +const scaler = new ScalerComponent({ + ...baseComponentConfig, + rdsEndpoint: dataServices.rdsEndpoint, + rdsSecurityGroupId: dataServices.rdsSecurityGroup.id, + taskDBName, + taskDBUsername, + taskDBPassword, + proverAsgName: workerCluster.proverAsg.autoScalingGroup.name, + proverAsgArn: workerCluster.proverAsg.autoScalingGroup.arn, + scheduleExpression: config.get("scalerSchedule") || "rate(5 minutes)", +}); + // Outputs export const managerInstanceId = manager.instance.id; export const managerPrivateIp = manager.instance.privateIp; @@ -214,20 +225,18 @@ export const auxMaxSize = workerCluster.auxAsg.autoScalingGroup.maxSize; // Data services outputs export const rdsEndpoint = dataServices.rdsEndpoint; -export const redisEndpoint = dataServices.redisEndpoint; export const s3BucketName = dataServices.s3BucketName; // Shared credentials for prover nodes -export const sharedCredentials = pulumi.all([dataServices.rdsEndpoint, dataServices.redisEndpoint, dataServices.s3BucketName]).apply(([rdsEp, redisEp, s3Bucket]) => { +export const sharedCredentials = pulumi.all([dataServices.rdsEndpoint, manager.instance.privateIp, dataServices.s3BucketName]).apply(([rdsEp, managerIp, s3Bucket]) => { const rdsHost = rdsEp.split(':')[0]; - const redisHost = redisEp.split(':')[0]; return { postgresHost: rdsHost, postgresPort: "5432", postgresDb: taskDBName, postgresUser: taskDBUsername, postgresPassword: taskDBPassword, - redisHost: redisHost, + redisHost: managerIp, redisPort: "6379", s3Bucket: s3Bucket, s3Region: "us-west-2", diff --git a/infra/scaler/lambda_function.py b/infra/scaler/lambda_function.py new file mode 100644 index 000000000..77ec77112 --- /dev/null +++ b/infra/scaler/lambda_function.py @@ -0,0 +1,173 @@ +import os +import json +import math +import logging +from typing import Dict, Any + +import boto3 +import psycopg2 +from psycopg2.extras import RealDictCursor + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +def get_queue_depth(conn) -> int: + with conn.cursor(cursor_factory=RealDictCursor) as cur: + cur.execute(""" + SELECT COUNT(*) as count FROM + tasks + JOIN jobs ON tasks.job_id = jobs.id + CROSS JOIN LATERAL jsonb_object_keys(tasks.task_def) task_type + WHERE ( + tasks.state = 'pending' OR tasks.state = 'ready' + ) AND ( + task_type = 'Join' OR task_type = 'Prove' OR task_type = 'Resolve' + ) AND ( + jobs.state = 'running' + ) + """) + result = cur.fetchone() + return result['count'] if result else 0 + + +def required_workers(queue_depth: int) -> int: + """Calculate required workers using log2 rounding up.""" + if queue_depth == 0: + return 0 + return int(math.ceil(math.log2(queue_depth)) * 2) + + +def get_current_workers(autoscaling_client, asg_name: str) -> int: + """Get the current desired capacity of the ASG.""" + response = autoscaling_client.describe_auto_scaling_groups( + AutoScalingGroupNames=[asg_name] + ) + + if not response['AutoScalingGroups']: + raise ValueError(f"Auto scaling group '{asg_name}' not found") + + group = response['AutoScalingGroups'][0] + desired_capacity = group.get('DesiredCapacity') + + if desired_capacity is None: + raise ValueError(f"Desired capacity not set for ASG '{asg_name}'") + + return desired_capacity + + +def scale( + conn, + autoscaling_client, + asg_name: str +) -> Dict[str, Any]: + # Get queue depth and calculate required workers + queue_depth = get_queue_depth(conn) + required_workers_count = required_workers(queue_depth) + + # Get current ASG state + current_workers = get_current_workers(autoscaling_client, asg_name) + + # Get ASG constraints + response = autoscaling_client.describe_auto_scaling_groups( + AutoScalingGroupNames=[asg_name] + ) + group = response['AutoScalingGroups'][0] + min_size = group.get('MinSize', 0) + max_size = group.get('MaxSize') + + if max_size is None: + raise ValueError(f"Max size not set for ASG '{asg_name}'") + + # Clamp required workers to min/max bounds + target_capacity = max(min_size, min(required_workers_count, max_size)) + + result = { + 'asg_name': asg_name, + 'queue_depth': queue_depth, + 'required_workers': required_workers_count, + 'current_workers': current_workers, + 'target_capacity': target_capacity, + 'min_size': min_size, + 'max_size': max_size, + 'scaled': False + } + + if current_workers != target_capacity: + logger.info( + f"Scaling ASG {asg_name} from {current_workers} to {target_capacity} workers " + f"(required: {required_workers_count}, min: {min_size}, max: {max_size})" + ) + + autoscaling_client.update_auto_scaling_group( + AutoScalingGroupName=asg_name, + DesiredCapacity=target_capacity + ) + + result['scaled'] = True + else: + logger.info( + f"ASG {asg_name} already at target capacity: {current_workers} workers" + ) + + return result + + +def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: + """Lambda handler entry point.""" + try: + # Get configuration from environment variables + database_url = os.environ.get('DATABASE_URL') + if not database_url: + raise ValueError("DATABASE_URL environment variable is required") + + asg_name = os.environ.get('AUTO_SCALING_GROUP_NAME') + if not asg_name: + raise ValueError("AUTO_SCALING_GROUP_NAME environment variable is required") + + aws_region = os.environ.get('AWS_REGION', 'us-west-2') + + # Create database connection with timeout and retry + # Lambda in VPC may need a moment to establish network connectivity + import time + max_retries = 3 + retry_delay = 2 + + conn = None + for attempt in range(max_retries): + try: + conn = psycopg2.connect( + database_url, + connect_timeout=10 + ) + break + except psycopg2.OperationalError as e: + if attempt < max_retries - 1: + logger.warning(f"Connection attempt {attempt + 1} failed: {e}. Retrying in {retry_delay} seconds...") + time.sleep(retry_delay) + else: + raise + + try: + # Create AWS Auto Scaling client + autoscaling_client = boto3.client('autoscaling', region_name=aws_region) + + # Run scaling logic + result = scale(conn, autoscaling_client, asg_name) + + return { + 'statusCode': 200, + 'body': json.dumps(result) + } + finally: + conn.close() + + except Exception as e: + logger.error(f"Error scaling ASG: {str(e)}", exc_info=True) + return { + 'statusCode': 500, + 'body': json.dumps({ + 'error': str(e) + }) + } + diff --git a/infra/scaler/requirements.txt b/infra/scaler/requirements.txt new file mode 100644 index 000000000..719b9b15e --- /dev/null +++ b/infra/scaler/requirements.txt @@ -0,0 +1,3 @@ +boto3>=1.34.0 +psycopg2-binary>=2.9.9 +