diff --git a/appsync-lambda-bedrock-async-stream-subscription-cdk/.gitignore b/appsync-lambda-bedrock-async-stream-subscription-cdk/.gitignore new file mode 100644 index 000000000..f60797b6a --- /dev/null +++ b/appsync-lambda-bedrock-async-stream-subscription-cdk/.gitignore @@ -0,0 +1,8 @@ +*.js +!jest.config.js +*.d.ts +node_modules + +# CDK asset staging directory +.cdk.staging +cdk.out diff --git a/appsync-lambda-bedrock-async-stream-subscription-cdk/.npmignore b/appsync-lambda-bedrock-async-stream-subscription-cdk/.npmignore new file mode 100644 index 000000000..c1d6d45dc --- /dev/null +++ b/appsync-lambda-bedrock-async-stream-subscription-cdk/.npmignore @@ -0,0 +1,6 @@ +*.ts +!*.d.ts + +# CDK asset staging directory +.cdk.staging +cdk.out diff --git a/appsync-lambda-bedrock-async-stream-subscription-cdk/README.md b/appsync-lambda-bedrock-async-stream-subscription-cdk/README.md new file mode 100644 index 000000000..148672c83 --- /dev/null +++ b/appsync-lambda-bedrock-async-stream-subscription-cdk/README.md @@ -0,0 +1,131 @@ +# Long running invocations of Amazon Bedrock using Amazon AppSync and AWS Lambda streaming + +This pattern demonstrates how to implement [long-running invocations](https://docs.aws.amazon.com/appsync/latest/devguide/resolver-reference-bedrock-js.html#long-running-invocations) with Amazon Bedrock using AWS AppSync subscriptions and AWS Lambda in Event Mode, following the official AWS AppSync documentation pattern. + +Learn more about this pattern at [Serverless Land Patterns](https://serverlessland.com/patterns/appsync-lambda-bedrock-async-stream-subscription-cdk). + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [Node and NPM](https://nodejs.org/en/download/) installed +* [AWS Cloud Development Kit](https://docs.aws.amazon.com/cdk/v2/guide/cli.html) (AWS CDK) installed +* Enable the **Anthropic - Claude Sonnet 3.5 V2** model in **us-east-1** region through the [Bedrock console](https://console.aws.amazon.com/bedrock/home#/modelaccess). This implementation uses the [cross-region inference profile](https://docs.aws.amazon.com/bedrock/latest/userguide/inference-profiles-support.html#inference-profiles-support-system) from us-east-1. + +## How it works + +The pattern implements an asynchronous [streaming architecture](https://docs.aws.amazon.com/appsync/latest/devguide/resolver-reference-bedrock-js.html#long-running-invocations) where: + +1. Client initiates a WebSocket subscription and makes a request to AppSync +2. AppSync invokes Lambda function in Event mode, enabling asynchronous processing +3. Lambda function streams responses from Bedrock using ConverseStream +4. Lambda sends updates via mutations to AppSync +5. Updates are delivered to client through WebSocket subscription + +![alt text](image.png) + +**Key Benefits** +- **Asynchronous Processing**: AppSync immediately returns a response while Lambda processes the request asynchronously, preventing timeouts for long-running operations +- **Real-time Updates**: Clients receive progressive updates through WebSocket subscriptions as the model generates responses +- **Scalable Architecture**: Event-driven design allows handling multiple concurrent requests without blocking +- **Enhanced User Experience**: Progressive updates enable responsive interfaces even during lengthy AI model invocations + +## Deployment Instructions + +1. Clone the repository: +```sh +git clone https://github.com/aws-samples/serverless-patterns +``` +2. Navigate to pattern directory: +```sh +cd appsync-lambda-bedrock-async-stream-subscription-cdk +``` + +3. Install dependencies: +```sh +npm install +``` + +4. Bootstrap CDK (if needed): +```sh +cdk bootstrap +``` + +5. Deploy stack: +```sh +npm run deploy +``` + +### Important: +Note the GraphQL API URL and API Key from the stack outputs - you'll need these for testing. + +## Testing + +After deployment, you can test the Bedrock streaming integration using the provided test script. The script demonstrates: +- WebSocket subscription initialization +- Conversation start with Bedrock +- Real-time streaming chunks display +- Graceful cleanup on exit + +1. Configure test credentials: +```sh + Open test/test.ts + Replace APPSYNC_API_URL with the API URL from stack outputs + Replace APPSYNC_API_KEY with the API Key from stack outputs +``` + +2. Run the test: +```sh +npx tsx test/test.ts +``` + +3. Expected Output: +```sh +Starting subscription... +Starting conversation... +StartConversation response: { +data: { +startConversation: { +conversationId: '123e4567-e89b-12d3-a456-426614174000', +status: 'STARTED' +} +} +} +Received chunk: { +conversationId: '123e4567-e89b-12d3-a456-426614174000', +chunk: "Here's a joke for you: Why don't scientists trust atoms? Because they make" +} +Received chunk: { +conversationId: '123e4567-e89b-12d3-a456-426614174000', +chunk: 'up everything!' +} +``` + +If you do not receive any response, please check your Bedrock Model access for Claude Sonnet 3.5 V2 in us-east-1 region. + +4. Stop the test: +```sh +Press Ctrl+C to terminate the process +``` + + +## Cleanup + +1. Delete the stack +```sh +cdk destroy --all +``` + +## Author bio +Kaustav Dey, +https://www.linkedin.com/in/kaustavbecs/ +Solution Architect + +---- +Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 + diff --git a/appsync-lambda-bedrock-async-stream-subscription-cdk/appsync-lambda-bedrock-async-stream-subscription-cdk.json b/appsync-lambda-bedrock-async-stream-subscription-cdk/appsync-lambda-bedrock-async-stream-subscription-cdk.json new file mode 100644 index 000000000..26cec414c --- /dev/null +++ b/appsync-lambda-bedrock-async-stream-subscription-cdk/appsync-lambda-bedrock-async-stream-subscription-cdk.json @@ -0,0 +1,81 @@ +{ + "title": "Amazon Bedrock calls via AppSync & Lambda streaming for long tasks", + "language": "TypeScript", + "level": "300", + "framework": "CDK", + "introBox": { + "headline": "How it works", + "text": [ + "The pattern implements an asynchronous streaming architecture.", + "Client initiates a WebSocket subscription and makes a request to AWS AppSync. AppSync invokes Lambda function in event mode, enabling asynchronous processing.", + "Lambda function streams responses from Amazon Bedrock using ConverseStream. Lambda function sends updates via mutations to AppSync. Updates are delivered to client through WebSocket subscription." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/appsync-lambda-bedrock-async-stream-subscription-cdk", + "templateURL": "serverless-patterns/appsync-lambda-bedrock-async-stream-subscription-cdk", + "projectFolder": "appsync-lambda-bedrock-async-stream-subscription-cdk", + "templateFile": "/lib/appsync-lambda-bedrock-async-stream-subscription-cdk-stack.ts" + } + }, + "resources": { + "bullets": [ + { + "text": "AWS AppSync JavaScript resolver and function reference for Amazon Bedrock runtime", + "link": "https://docs.aws.amazon.com/appsync/latest/devguide/resolver-reference-bedrock-js.html#long-running-invocations" + }, + { + "text": "Bedrock ConverseStream API", + "link": "https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html" + } + ] + }, + "deploy": { + "text": ["cdk deploy"] + }, + "testing": { + "text": ["See the GitHub repo for detailed testing instructions."] + }, + "cleanup": { + "text": ["Delete the stack: cdk destroy --all."] + }, + "authors": [ + { + "name": "Kaustav Dey", + "image": "https://avatars.githubusercontent.com/u/13236519", + "bio": "Solution Architect at AWS", + "linkedin": "kaustavbecs" + } + ], + "patternArch": { + "icon1": { + "x": 20, + "y": 50, + "service": "appsync", + "label": "AWS AppSync" + }, + "icon2": { + "x": 50, + "y": 50, + "service": "lambda", + "label": "AWS Lambda" + }, + "icon3": { + "x": 80, + "y": 50, + "service": "bedrock", + "label": "Amazon Bedrock" + }, + "line1": { + "from": "icon1", + "to": "icon2", + "label": "" + }, + "line2": { + "from": "icon2", + "to": "icon3", + "label": "" + } + } +} diff --git a/appsync-lambda-bedrock-async-stream-subscription-cdk/bin/appsync-lambda-bedrock-async-stream-subscription-cdk.ts b/appsync-lambda-bedrock-async-stream-subscription-cdk/bin/appsync-lambda-bedrock-async-stream-subscription-cdk.ts new file mode 100644 index 000000000..4b42d5bb4 --- /dev/null +++ b/appsync-lambda-bedrock-async-stream-subscription-cdk/bin/appsync-lambda-bedrock-async-stream-subscription-cdk.ts @@ -0,0 +1,20 @@ +#!/usr/bin/env node +import * as cdk from 'aws-cdk-lib'; +import { AppsyncLambdaBedrockAsyncStreamSubscriptionCdkStack } from '../lib/appsync-lambda-bedrock-async-stream-subscription-cdk-stack'; + +const app = new cdk.App(); +new AppsyncLambdaBedrockAsyncStreamSubscriptionCdkStack(app, 'AppsyncLambdaBedrockAsyncStreamSubscriptionCdkStack', { + /* If you don't specify 'env', this stack will be environment-agnostic. + * Account/Region-dependent features and context lookups will not work, + * but a single synthesized template can be deployed anywhere. */ + + /* Uncomment the next line to specialize this stack for the AWS Account + * and Region that are implied by the current CLI configuration. */ + // env: { account: process.env.CDK_DEFAULT_ACCOUNT, region: process.env.CDK_DEFAULT_REGION }, + + /* Uncomment the next line if you know exactly what Account and Region you + * want to deploy the stack to. */ + // env: { account: '123456789012', region: 'us-east-1' }, + + /* For more information, see https://docs.aws.amazon.com/cdk/latest/guide/environments.html */ +}); \ No newline at end of file diff --git a/appsync-lambda-bedrock-async-stream-subscription-cdk/cdk.json b/appsync-lambda-bedrock-async-stream-subscription-cdk/cdk.json new file mode 100644 index 000000000..4c597d9f8 --- /dev/null +++ b/appsync-lambda-bedrock-async-stream-subscription-cdk/cdk.json @@ -0,0 +1,81 @@ +{ + "app": "npx ts-node --prefer-ts-exts bin/appsync-lambda-bedrock-async-stream-subscription-cdk.ts", + "watch": { + "include": [ + "**" + ], + "exclude": [ + "README.md", + "cdk*.json", + "**/*.d.ts", + "**/*.js", + "tsconfig.json", + "package*.json", + "yarn.lock", + "node_modules", + "test" + ] + }, + "context": { + "@aws-cdk/aws-lambda:recognizeLayerVersion": true, + "@aws-cdk/core:checkSecretUsage": true, + "@aws-cdk/core:target-partitions": [ + "aws", + "aws-cn" + ], + "@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true, + "@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true, + "@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true, + "@aws-cdk/aws-iam:minimizePolicies": true, + "@aws-cdk/core:validateSnapshotRemovalPolicy": true, + "@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true, + "@aws-cdk/aws-s3:createDefaultLoggingPolicy": true, + "@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true, + "@aws-cdk/aws-apigateway:disableCloudWatchRole": true, + "@aws-cdk/core:enablePartitionLiterals": true, + "@aws-cdk/aws-events:eventsTargetQueueSameAccount": true, + "@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true, + "@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true, + "@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true, + "@aws-cdk/aws-route53-patters:useCertificate": true, + "@aws-cdk/customresources:installLatestAwsSdkDefault": false, + "@aws-cdk/aws-rds:databaseProxyUniqueResourceName": true, + "@aws-cdk/aws-codedeploy:removeAlarmsFromDeploymentGroup": true, + "@aws-cdk/aws-apigateway:authorizerChangeDeploymentLogicalId": true, + "@aws-cdk/aws-ec2:launchTemplateDefaultUserData": true, + "@aws-cdk/aws-secretsmanager:useAttachedSecretResourcePolicyForSecretTargetAttachments": true, + "@aws-cdk/aws-redshift:columnId": true, + "@aws-cdk/aws-stepfunctions-tasks:enableEmrServicePolicyV2": true, + "@aws-cdk/aws-ec2:restrictDefaultSecurityGroup": true, + "@aws-cdk/aws-apigateway:requestValidatorUniqueId": true, + "@aws-cdk/aws-kms:aliasNameRef": true, + "@aws-cdk/aws-autoscaling:generateLaunchTemplateInsteadOfLaunchConfig": true, + "@aws-cdk/core:includePrefixInUniqueNameGeneration": true, + "@aws-cdk/aws-efs:denyAnonymousAccess": true, + "@aws-cdk/aws-opensearchservice:enableOpensearchMultiAzWithStandby": true, + "@aws-cdk/aws-lambda-nodejs:useLatestRuntimeVersion": true, + "@aws-cdk/aws-efs:mountTargetOrderInsensitiveLogicalId": true, + "@aws-cdk/aws-rds:auroraClusterChangeScopeOfInstanceParameterGroupWithEachParameters": true, + "@aws-cdk/aws-appsync:useArnForSourceApiAssociationIdentifier": true, + "@aws-cdk/aws-rds:preventRenderingDeprecatedCredentials": true, + "@aws-cdk/aws-codepipeline-actions:useNewDefaultBranchForCodeCommitSource": true, + "@aws-cdk/aws-cloudwatch-actions:changeLambdaPermissionLogicalIdForLambdaAction": true, + "@aws-cdk/aws-codepipeline:crossAccountKeysDefaultValueToFalse": true, + "@aws-cdk/aws-codepipeline:defaultPipelineTypeToV2": true, + "@aws-cdk/aws-kms:reduceCrossAccountRegionPolicyScope": true, + "@aws-cdk/aws-eks:nodegroupNameAttribute": true, + "@aws-cdk/aws-ec2:ebsDefaultGp3Volume": true, + "@aws-cdk/aws-ecs:removeDefaultDeploymentAlarm": true, + "@aws-cdk/custom-resources:logApiResponseDataPropertyTrueDefault": false, + "@aws-cdk/aws-s3:keepNotificationInImportedBucket": false, + "@aws-cdk/aws-ecs:reduceEc2FargateCloudWatchPermissions": true, + "@aws-cdk/aws-dynamodb:resourcePolicyPerReplica": true, + "@aws-cdk/aws-ec2:ec2SumTImeoutEnabled": true, + "@aws-cdk/aws-appsync:appSyncGraphQLAPIScopeLambdaPermission": true, + "@aws-cdk/aws-rds:setCorrectValueForDatabaseInstanceReadReplicaInstanceResourceId": true, + "@aws-cdk/core:cfnIncludeRejectComplexResourceUpdateCreatePolicyIntrinsics": true, + "@aws-cdk/aws-lambda-nodejs:sdkV3ExcludeSmithyPackages": true, + "@aws-cdk/aws-stepfunctions-tasks:fixRunEcsTaskPolicy": true, + "@aws-cdk/aws-ec2:bastionHostUseAmazonLinux2023ByDefault": true + } +} diff --git a/appsync-lambda-bedrock-async-stream-subscription-cdk/example.pattern.json b/appsync-lambda-bedrock-async-stream-subscription-cdk/example.pattern.json new file mode 100644 index 000000000..651074020 --- /dev/null +++ b/appsync-lambda-bedrock-async-stream-subscription-cdk/example.pattern.json @@ -0,0 +1,58 @@ +{ + "title": "Long running invocations of Amazon Bedrock using Amazon AppSync and AWS Lambda streaming", + "language": "Typescript", + "level": "300", + "framework": "CDK", + "introBox": { + "headline": "How it works", + "text": [ + "The pattern implements an asynchronous streaming architecture.", + "Client initiates a WebSocket subscription and makes a request to AppSync. AppSync invokes Lambda function in Event mode, enabling asynchronous processing.", + "Lambda function streams responses from Bedrock using ConverseStream. Lambda sends updates via mutations to AppSync. Updates are delivered to client through WebSocket subscription" + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/appsync-lambda-bedrock-async-stream-subscription-cdk", + "templateURL": "serverless-patterns/appsync-lambda-bedrock-async-stream-subscription-cdk", + "projectFolder": "appsync-lambda-bedrock-async-stream-subscription-cdk", + "templateFile": "lib/appsync-lambda-bedrock-async-stream-subscription-cdk-stack.ts" + } + }, + "resources": { + "bullets": [ + { + "text": "AWS AppSync JavaScript resolver and function reference for Amazon Bedrock runtime", + "link": "https://docs.aws.amazon.com/appsync/latest/devguide/resolver-reference-bedrock-js.html#long-running-invocations" + }, + { + "text": "Bedrock ConverseStream API", + "link": "https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html" + } + ] + }, + "deploy": { + "text": [ + "cdk deploy" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: cdk destroy --all." + ] + }, + "authors": [ + { + "name": "Kaustav Dey", + "image": "https://avatars.githubusercontent.com/u/13236519", + "bio": "Solution Architect at AWS", + "linkedin": "https://www.linkedin.com/in/kaustavbecs/" + } + ] + } + \ No newline at end of file diff --git a/appsync-lambda-bedrock-async-stream-subscription-cdk/image.png b/appsync-lambda-bedrock-async-stream-subscription-cdk/image.png new file mode 100644 index 000000000..78960960f Binary files /dev/null and b/appsync-lambda-bedrock-async-stream-subscription-cdk/image.png differ diff --git a/appsync-lambda-bedrock-async-stream-subscription-cdk/jest.config.js b/appsync-lambda-bedrock-async-stream-subscription-cdk/jest.config.js new file mode 100644 index 000000000..08263b895 --- /dev/null +++ b/appsync-lambda-bedrock-async-stream-subscription-cdk/jest.config.js @@ -0,0 +1,8 @@ +module.exports = { + testEnvironment: 'node', + roots: ['/test'], + testMatch: ['**/*.test.ts'], + transform: { + '^.+\\.tsx?$': 'ts-jest' + } +}; diff --git a/appsync-lambda-bedrock-async-stream-subscription-cdk/lib/appsync-lambda-bedrock-async-stream-subscription-cdk-stack.ts b/appsync-lambda-bedrock-async-stream-subscription-cdk/lib/appsync-lambda-bedrock-async-stream-subscription-cdk-stack.ts new file mode 100644 index 000000000..59a25375e --- /dev/null +++ b/appsync-lambda-bedrock-async-stream-subscription-cdk/lib/appsync-lambda-bedrock-async-stream-subscription-cdk-stack.ts @@ -0,0 +1,207 @@ +import * as cdk from 'aws-cdk-lib'; +import * as appsync from 'aws-cdk-lib/aws-appsync'; +import * as lambda from 'aws-cdk-lib/aws-lambda'; +import * as iam from 'aws-cdk-lib/aws-iam'; +import * as logs from 'aws-cdk-lib/aws-logs'; +import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs'; +import { Construct } from 'constructs'; +import * as path from 'path'; + +export class AppsyncLambdaBedrockAsyncStreamSubscriptionCdkStack extends cdk.Stack { + constructor(scope: Construct, id: string, props?: cdk.StackProps) { + super(scope, id, { + ...props, + env: { + account: process.env.CDK_DEFAULT_ACCOUNT, + region: process.env.CDK_DEFAULT_REGION, + }, + }); + + const api = new appsync.GraphqlApi(this, 'Api', { + name: 'bedrock-streaming-api', + definition: appsync.Definition.fromFile(path.join(__dirname, '..', 'schema.graphql')), + authorizationConfig: { + defaultAuthorization: { + authorizationType: appsync.AuthorizationType.API_KEY, + }, + }, + logConfig: { + fieldLogLevel: appsync.FieldLogLevel.ALL, // Change to ALL to see resolver details + excludeVerboseContent: false, // Include verbose content + retention: logs.RetentionDays.ONE_WEEK + }, + xrayEnabled: false + }); + + + + const invocationHandler = new NodejsFunction(this, 'InvocationHandler', { + runtime: lambda.Runtime.NODEJS_18_X, + handler: 'handler', + entry: path.join(__dirname, 'lambda/invocation/index.ts'), + timeout: cdk.Duration.minutes(15), + environment: { + APPSYNC_ENDPOINT: api.graphqlUrl, + APPSYNC_API_KEY: api.apiKey || '', + }, + logRetention: logs.RetentionDays.ONE_WEEK, + tracing: lambda.Tracing.DISABLED, + bundling: { + minify: true, + sourceMap: false // Disable source maps to reduce log size + } + }); + + + // Add Bedrock permissions to Lambda. Add IAM policies for all regions covered under the Inference profile + invocationHandler.addToRolePolicy(new iam.PolicyStatement({ + actions: ['bedrock:InvokeModelWithResponseStream'], + resources: [ + 'arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-5-sonnet-20241022-v2:0', + 'arn:aws:bedrock:us-east-2::foundation-model/anthropic.claude-3-5-sonnet-20241022-v2:0', + 'arn:aws:bedrock:us-west-2::foundation-model/anthropic.claude-3-5-sonnet-20241022-v2:0', + `arn:aws:bedrock:us-east-1:${this.account}:inference-profile/us.anthropic.claude-3-5-sonnet-20241022-v2:0` + ] + })); + + // Add AppSync permissions to Lambda + invocationHandler.addToRolePolicy(new iam.PolicyStatement({ + actions: ['appsync:GraphQL'], + resources: [api.arn + '/types/Mutation/fields/sendChunk'], + })); + + // Add CloudWatch Logs permissions to Lambda + invocationHandler.addToRolePolicy(new iam.PolicyStatement({ + actions: [ + 'logs:CreateLogStream', + 'logs:PutLogEvents' + ], + resources: [ + `arn:aws:logs:${this.region}:${this.account}:log-group:/aws/lambda/*` + ] + })); + + + const invocationDS = api.addLambdaDataSource('InvocationDataSource', invocationHandler); + + + invocationDS.createResolver('StartConversationResolver', { + typeName: 'Mutation', + fieldName: 'startConversation', + requestMappingTemplate: appsync.MappingTemplate.fromString(` + { + "version": "2018-05-29", + "operation": "Invoke", + "invocationType": "Event", + "payload": $util.toJson($context.arguments) + } + `), + responseMappingTemplate: appsync.MappingTemplate.fromString(` + #if($context.error) + $util.error($context.error.message, $context.error.type) + #end + { + "conversationId": "$context.arguments.input.conversationId", + "status": "STARTED" + } + `) + }); + + const noneDS = api.addNoneDataSource('NoneDataSource'); + + noneDS.createResolver('SendChunkResolver', { + typeName: 'Mutation', + fieldName: 'sendChunk', + requestMappingTemplate: appsync.MappingTemplate.fromString(` + { + "version": "2018-05-29", + "payload": { + "conversationId": "$context.arguments.conversationId", + "chunk": "$context.arguments.chunk" + } + } + `), + responseMappingTemplate: appsync.MappingTemplate.fromString(` + #if($context.error) + $util.error($context.error.message, $context.error.type) + #end + $util.toJson({ + "conversationId": "$context.arguments.conversationId", + "chunk": "$context.arguments.chunk" + }) + `) + }); + + noneDS.createResolver('SubscriptionResolver', { + typeName: 'Subscription', + fieldName: 'onReceiveChunk', + requestMappingTemplate: appsync.MappingTemplate.fromString(` + { + "version": "2018-05-29", + "payload": $util.toJson($context.arguments) + } + `), + responseMappingTemplate: appsync.MappingTemplate.fromString( + '$util.toJson($context.result)' + ), + }); + + noneDS.createResolver('SendErrorResolver', { + typeName: 'Mutation', + fieldName: 'sendError', + requestMappingTemplate: appsync.MappingTemplate.fromString(` + #set($logMessage = "sendError invoked with arguments - Conversation ID: $context.arguments.conversationId, Error Message: $context.arguments.error") + $util.log($logMessage) + { + "version": "2018-05-29", + "payload": { + "conversationId": "$context.arguments.conversationId", + "error": "$context.arguments.error" + } + } + `), + responseMappingTemplate: appsync.MappingTemplate.fromString(` + #if($context.error) + $util.error($context.error.message, $context.error.type) + #end + $util.toJson($context.result) + `), + }); + + noneDS.createResolver('CompleteStreamResolver', { + typeName: 'Mutation', + fieldName: 'completeStream', + requestMappingTemplate: appsync.MappingTemplate.fromString(` + { + "version": "2017-02-28", + "payload": { + "conversationId": "$context.arguments.conversationId", + "status": "COMPLETED" + } + } + `), + responseMappingTemplate: appsync.MappingTemplate.fromString(` + #if($context.error) + $util.error($context.error.message, $context.error.type) + #end + $util.toJson($context.result) + `) + }); + + + + + // Add CloudWatch dashboard for monitoring + new cdk.CfnOutput(this, 'GraphQLAPIURL', { + value: api.graphqlUrl + }); + + new cdk.CfnOutput(this, 'GraphQLAPIKey', { + value: api.apiKey || '' + }); + + new cdk.CfnOutput(this, 'CloudWatchLogsURL', { + value: `https://console.aws.amazon.com/cloudwatch/home?region=${this.region}#logsV2:log-groups` + }); + } +} diff --git a/appsync-lambda-bedrock-async-stream-subscription-cdk/lib/lambda/invocation/index.ts b/appsync-lambda-bedrock-async-stream-subscription-cdk/lib/lambda/invocation/index.ts new file mode 100644 index 000000000..cdfdfb7d3 --- /dev/null +++ b/appsync-lambda-bedrock-async-stream-subscription-cdk/lib/lambda/invocation/index.ts @@ -0,0 +1,180 @@ +import { BedrockRuntimeClient, ConverseStreamCommand, ConverseStreamCommandInput } from '@aws-sdk/client-bedrock-runtime'; +import { GraphQLClient } from 'graphql-request'; + +interface Event { + input: { + conversationId: string; + prompt: string; + }; +} + +function sanitizeGraphQLString(text: string): string { + return text + .replace(/[\n\r]/g, ' ') + .replace(/\\/g, '\\\\') + .replace(/"/g, '\\"') + .replace(/\t/g, ' ') + .trim(); +} + +export const handler = async (event: Event) => { + console.log('Received event:', JSON.stringify(event)); + + const { conversationId, prompt } = event.input; + + if (!conversationId || !prompt) { + console.error('Invalid input: Missing conversationId or prompt'); + return; + } + + console.log(`Starting Bedrock stream for conversationId: ${conversationId}`); + + // Using the us-east-1 Bedrock Inference Profile for Claude Sonnet 3.5 V2 + const bedrockClient = new BedrockRuntimeClient({ + region: 'us-east-1' + }); + const graphQLClient = new GraphQLClient(process.env.APPSYNC_ENDPOINT!, { + headers: { 'x-api-key': process.env.APPSYNC_API_KEY! }, + }); + + try { + await processBedrockStream(bedrockClient, graphQLClient, prompt, conversationId); + console.log(`Successfully completed processing for conversationId: ${conversationId}`); + } catch (error) { + console.error(`Error during processing for conversationId ${conversationId}:`, error); + await notifyError(graphQLClient, conversationId, error); + } +}; + +async function processBedrockStream( + bedrockClient: BedrockRuntimeClient, + graphQLClient: GraphQLClient, + input: string, + conversationId: string +): Promise { + const params: ConverseStreamCommandInput = { + modelId: 'us.anthropic.claude-3-5-sonnet-20241022-v2:0', + messages: [ + { + role: 'user', + content: [ + { + text: input + } + ] + } + ], + inferenceConfig: { + temperature: 0.7, + maxTokens: 4096, + topP: 1 + } + }; + + + const command = new ConverseStreamCommand(params); + const response = await bedrockClient.send(command); + + if (!response.stream) { + throw new Error('No response stream received from Bedrock'); + } + + + let buffer = ''; + const chunkSize = 100; + + try { + for await (const event of response.stream) { + if (event.contentBlockDelta && event.contentBlockDelta.delta) { + const delta = event.contentBlockDelta.delta.text; + if (delta) { + buffer += delta; + + if (buffer.length >= chunkSize || buffer.match(/[.!?]\s/)) { + const sanitizedChunk = sanitizeGraphQLString(buffer); + if (sanitizedChunk) { + await sendChunkToAppSync(graphQLClient, conversationId, sanitizedChunk); + } + buffer = ''; + } + } + } + } + + if (buffer) { + const sanitizedChunk = sanitizeGraphQLString(buffer); + if (sanitizedChunk) { + await sendChunkToAppSync(graphQLClient, conversationId, sanitizedChunk); + } + } + + await completeStream(graphQLClient, conversationId); + } catch (error) { + console.error(`Error while processing stream for conversationId ${conversationId}:`, error); + throw error; + } +} + + +async function sendChunkToAppSync( + graphQLClient: GraphQLClient, + conversationId: string, + chunk: string +): Promise { + const mutation = ` + mutation SendChunk($conversationId: ID!, $chunk: String!) { + sendChunk(conversationId: $conversationId, chunk: $chunk) { + conversationId + chunk + } + } + `; + + try { + await graphQLClient.request(mutation, { conversationId, chunk }); + console.log(`Sent chunk to AppSync for conversationId ${conversationId}`); + } catch (error) { + console.error(`Failed to send chunk to AppSync for conversationId ${conversationId}:`, error); + throw error; + } +} + +async function completeStream( + graphQLClient: GraphQLClient, + conversationId: string +): Promise { + const mutation = ` + mutation Complete($conversationId: ID!) { + completeStream(conversationId: $conversationId) { + conversationId + status + } + } + `; + + await graphQLClient.request(mutation, { conversationId }); + console.log(`Stream completed for conversationId ${conversationId}`); +} + +async function notifyError( + graphQLClient: GraphQLClient, + conversationId: string, + error: unknown +): Promise { + const errorMessage = error instanceof Error ? error.message : 'Unknown error'; + const sanitizedError = sanitizeGraphQLString(errorMessage); + + const mutation = ` + mutation SendError($conversationId: ID!, $error: String!) { + sendError(conversationId: $conversationId, error: $error) { + conversationId + error + } + } + `; + + await graphQLClient.request(mutation, { + conversationId, + error: sanitizedError, + }); +} diff --git a/appsync-lambda-bedrock-async-stream-subscription-cdk/package.json b/appsync-lambda-bedrock-async-stream-subscription-cdk/package.json new file mode 100644 index 000000000..53a4c7113 --- /dev/null +++ b/appsync-lambda-bedrock-async-stream-subscription-cdk/package.json @@ -0,0 +1,34 @@ +{ + "name": "appsync-lambda-bedrock-async-stream-subscription-cdk", + "version": "0.1.0", + "bin": { + "cdk": "bin/cdk.js" + }, + "scripts": { + "build": "tsc", + "watch": "tsc -w", + "test": "tsx test/test.ts", + "cdk": "cdk", + "deploy": "cdk deploy" + }, + "devDependencies": { + "@types/cookie": "^0.6.0", + "@types/node": "^20.7.1", + "aws-cdk": "^2.99.1", + "cookie": "^0.5.0", + "esbuild": "^0.19.4", + "ts-node": "^10.9.1", + "typescript": "~5.2.2", + "tsx": "^3.12.7" + }, + "dependencies": { + "@aws-sdk/client-bedrock-runtime": "^3.496.0", + "aws-amplify": "^6.0.12", + "aws-cdk-lib": "^2.99.1", + "constructs": "^10.0.0", + "cookie": "^0.5.0", + "graphql": "^16.8.1", + "graphql-request": "^6.1.0", + "source-map-support": "^0.5.21" + } +} diff --git a/appsync-lambda-bedrock-async-stream-subscription-cdk/schema.graphql b/appsync-lambda-bedrock-async-stream-subscription-cdk/schema.graphql new file mode 100644 index 000000000..411d2309b --- /dev/null +++ b/appsync-lambda-bedrock-async-stream-subscription-cdk/schema.graphql @@ -0,0 +1,43 @@ +type Mutation { + startConversation(input: StartConversationInput!): ConversationResponse! + sendChunk(conversationId: ID!, chunk: String!): ChunkResponse! + completeStream(conversationId: ID!): CompletionResponse! + + # Add this missing mutation + sendError(conversationId: ID!, error: String!): ErrorResponse! +} + +type Subscription { + onReceiveChunk(conversationId: ID!): ChunkResponse + @aws_subscribe(mutations: ["sendChunk"]) +} + +type Query { + dummyField: String +} + +input StartConversationInput { + prompt: String! + conversationId: ID! +} + +type ConversationResponse { + conversationId: ID! + status: String! +} + +type ChunkResponse { + conversationId: ID + chunk: String +} + +type CompletionResponse { + conversationId: ID! + status: String! +} + +# Add this type for errors +type ErrorResponse { + conversationId: ID! + error: String! +} diff --git a/appsync-lambda-bedrock-async-stream-subscription-cdk/test/test.ts b/appsync-lambda-bedrock-async-stream-subscription-cdk/test/test.ts new file mode 100644 index 000000000..e030be89c --- /dev/null +++ b/appsync-lambda-bedrock-async-stream-subscription-cdk/test/test.ts @@ -0,0 +1,100 @@ +import { Amplify } from 'aws-amplify'; +import { generateClient } from 'aws-amplify/api'; + +// AppSync API Configuration + +const APPSYNC_API_URL = ''; +const APPSYNC_API_KEY = ''; + +// Configure Amplify +Amplify.configure({ + API: { + GraphQL: { + endpoint: APPSYNC_API_URL, + defaultAuthMode: 'apiKey', + apiKey: APPSYNC_API_KEY + } + } +}); + +// Generate Amplify client +const client = generateClient(); + +async function startConversation(prompt: string, conversationId: string) { + const mutation = ` + mutation StartConversation($input: StartConversationInput!) { + startConversation(input: $input) { + conversationId + status + } + } + `; + + console.log('Starting conversation...'); + const response = await client.graphql({ + query: mutation, + variables: { input: { prompt, conversationId } } + }); + console.log('StartConversation response:', response); +} + +function subscribeToChunks(conversationId: string) { + const subscription = ` + subscription OnReceiveChunk($conversationId: ID!) { + onReceiveChunk(conversationId: $conversationId) { + conversationId + chunk + } + } + `; + + console.log('Starting subscription...'); + + // Explicitly cast to `Observable` + const observable = client.graphql({ + query: subscription, + variables: { conversationId } + }) as unknown as { subscribe: Function }; + + const subscriptionInstance = observable.subscribe({ + next: (data: { data?: { onReceiveChunk?: { conversationId: string; chunk: string } } }) => { + console.log('Received chunk:', data?.data?.onReceiveChunk); + }, + error: (error: Error) => { + console.error('Subscription error:', error); + }, + complete: () => { + console.log('Subscription completed'); + } + }); + + return subscriptionInstance; +} + +async function main() { + try { + const TEST_CONVERSATION_ID = '123e4567-e89b-12d3-a456-426614174000'; + const TEST_PROMPT = 'Tell me a joke'; + + // Step 1: Subscribe to chunks + const subscription = subscribeToChunks(TEST_CONVERSATION_ID); + + // Step 2: Start the long-running invocation (start conversation) + setTimeout(async () => { + await startConversation(TEST_PROMPT, TEST_CONVERSATION_ID); + }, 2000); + + // Cleanup on process exit + process.on('SIGINT', () => { + console.log('Cleaning up...'); + subscription.unsubscribe(); + process.exit(); + }); + + } catch (error) { + console.error('Error during test:', error); + process.exit(1); + } +} + +main(); diff --git a/appsync-lambda-bedrock-async-stream-subscription-cdk/tsconfig.json b/appsync-lambda-bedrock-async-stream-subscription-cdk/tsconfig.json new file mode 100644 index 000000000..bc46f183a --- /dev/null +++ b/appsync-lambda-bedrock-async-stream-subscription-cdk/tsconfig.json @@ -0,0 +1,32 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "commonjs", + "lib": [ + "es2020", + "dom" + ], + "declaration": true, + "strict": true, + "noImplicitAny": true, + "strictNullChecks": true, + "noImplicitThis": true, + "alwaysStrict": true, + "noUnusedLocals": false, + "noUnusedParameters": false, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": false, + "inlineSourceMap": true, + "inlineSources": true, + "experimentalDecorators": true, + "strictPropertyInitialization": false, + "esModuleInterop": true, + "typeRoots": [ + "./node_modules/@types" + ] + }, + "exclude": [ + "node_modules", + "cdk.out" + ] +}