Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing msk-lambda-iam-python-sam and msk-lambda-iam-node-sam projects by adding Cloudformation templates as previous CFT doesn't work anymore due to Cloud9 dependency #2524

Merged
merged 11 commits into from
Feb 4, 2025
Merged
Prev Previous commit
Next Next commit
Readme.md
indranilbanerjeeawssa committed Dec 5, 2024
commit 2313fb07b27fec44120fde3f3aad182c70fc3630
5 changes: 0 additions & 5 deletions msk-lambda-iam-python-sam/MSKAndKafkaClientEC2.yaml
Original file line number Diff line number Diff line change
@@ -20,10 +20,6 @@ Parameters:
- python3.11
- python3.12
Default: python3.12
EC2KeyPair:
Type: AWS::EC2::KeyPair::KeyName
Description: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/create-key-pairs.html
Default: my-key-pair
MSKKafkaVersion:
Type: String
Default: 3.5.1
@@ -288,7 +284,6 @@ Resources:
SubnetId: !Ref PublicSubnetOne
SecurityGroupIds: [!GetAtt KafkaClientInstanceSecurityGroup.GroupId]
ImageId: !Ref LatestAmiId
KeyName: !Ref EC2KeyPair
Tags:
- Key: 'Name'
Value: 'KafkaClientInstance'
211 changes: 0 additions & 211 deletions msk-lambda-iam-python-sam/README 2.md

This file was deleted.

186 changes: 119 additions & 67 deletions msk-lambda-iam-python-sam/README.md
Original file line number Diff line number Diff line change
@@ -1,99 +1,72 @@
# Python AWS Lambda Kafka consumer with IAM auth, using AWS SAM

This pattern is an example of a Lambda function that consumes messages from an Amazon Managed Streaming for Kafka (Amazon MSK) topic, where the MSK Cluster has been configured to use IAM authentication. This pattern assumes you already have an MSK cluster with a topic configured, if you need a sample pattern to deploy an MSK cluster either in Provisioned or Serverless modes please see the [msk-cfn-sasl-lambda pattern](https://serverlessland.com/patterns/msk-cfn-sasl-lambda).
This pattern is an example of a Lambda function that consumes messages from an Amazon Managed Streaming for Kafka (Amazon MSK) topic, where the MSK Cluster has been configured to use IAM authentication.

This project contains source code and supporting files for a serverless application that you can deploy with the AWS Serverless Application Model (AWS SAM) CLI. It includes the following files and folders.

- HandlerKafka - Code for the application's Lambda function.
- events - Invocation events that you can use to invoke the function.
- template.yaml - An AWS SAM template that defines the application's AWS resources.
- template_original.yaml - An AWS SAM template that defines the application's AWS resources.
- MSKAndKafkaClientEC2.yaml - A Cloudformation template file that can be used to deploy an MSK cluster and also deploy an EC2 machine with all pre-requisities already installed, so you can directly build and deploy the lambda function and test it out.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- MSKAndKafkaClientEC2.yaml - A Cloudformation template file that can be used to deploy an MSK cluster and also deploy an EC2 machine with all pre-requisities already installed, so you can directly build and deploy the lambda function and test it out.
- MSKAndKafkaClientEC2.yaml - A CloudFormation template file that can be used to deploy an MSK cluster and also deploy an EC2 instance with all prerequisites already installed, so you can directly build and deploy the Lambda function and test it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked in modified Readme.md files with changes incorporated


The application creates a Lambda function that listens to Kafka messages on a topic of an MSK Cluster. These resources are defined in the `template.yaml` file in this project. You can update the template to add AWS resources through the same deployment process that updates your application code.
The application creates a Lambda function that listens to Kafka messages on a topic of an MSK Cluster. These resources are defined in the `template_original.yaml` file in this project. You can update the template to add AWS resources through the same deployment process that updates your application code.

Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.

## Requirements

* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
* [Git installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed
* Create MSK cluster and topic that will be used for testing. It is important to create the topic before deploying the Lambda function, otherwise the event source mapping will stay disabled.

## Deploy the sample application

The AWS SAM CLI is a serverless tool for building and testing Lambda applications. It uses Docker to locally test your functions in an Amazon Linux environment that resembles the Lambda execution environment. It can also emulate your application's build environment and API.

To use the AWS SAM CLI, you need the following tools.

* AWS SAM CLI - [Install the AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html)
* Docker - [Install Docker community edition](https://hub.docker.com/search/?type=edition&offering=community)

1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
```
git clone https://github.com/aws-samples/serverless-patterns.git
```
1. Change directory to the pattern directory:
```
cd msk-lambda-iam-python-sam
```
1. From the command line, use AWS SAM to deploy the AWS resources for the pattern as specified in the template.yml file:
```
sam build
sam deploy --guided
```
## Run the Cloudformation template to create the MSK Cluster and Client EC2 machine
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not going to mark every instance, but please double check the correct spelling:
Cloudformation -> CloudFormation
lambda -> Lambda
EC2 machine -> EC2 instance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked in modified Readme.md files with changes incorporated


1. During the prompts:
* **Stack Name**: The name of the stack to deploy to CloudFormation. This should be unique to your account and region, and a good starting point would be something matching your project name.
* **AWS Region**: The AWS region you want to deploy your app to.
* **Parameter MSKClusterName**: The name of the MSKCluster, eg. msk-test-cluster
* [Run the Cloudformation template using the file MSKAndKafkaClientEC2.yaml] - You can go to the AWS Cloudformation console, create a new stack by specifying the template file. You can keep the defaults for input parameters or modify them as necessary. Wait for the Cloudformation stack to be created. This Cloudformation template will create an MSK cluster (Provisioned or Serverless based on your selection). It will also create an EC2 machine that you can use as a client.

* **Parameter MSKClusterId**: The unique ID of the MSKCluster, eg. a4e132c8-6ad0-4334-a313-123456789012-s2
* **Parameter MSKTopic**: The Kafka topic on which the lambda function will listen on
* **Confirm changes before deploy**: If set to yes, any change sets will be shown to you before execution for manual review. If set to no, the AWS SAM CLI will automatically deploy application changes.
* **Allow SAM CLI IAM role creation**: Many AWS SAM templates, including this example, create AWS IAM roles required for the AWS Lambda function(s) included to access AWS services. By default, these are scoped down to minimum required permissions. To deploy an AWS CloudFormation stack which creates or modifies IAM roles, the `CAPABILITY_IAM` value for `capabilities` must be provided. If permission isn't provided through this prompt, to deploy this example you must explicitly pass `--capabilities CAPABILITY_IAM` to the `sam deploy` command.
* **Disable rollback**: Defaults to No and it preserves the state of previously provisioned resources when an operation fails
* **Save arguments to configuration file**: If set to yes, your choices will be saved to a configuration file inside the project, so that in the future you can just re-run `sam deploy` without parameters to deploy changes to your application.
* **SAM configuration file [samconfig.toml]**: Name of the configuration file to store configuration information locally
* **SAM configuration environment [default]**: Environment for storing deployment information locally
* [Connect to the EC2 machine] - Once the Cloudformation stack is created, you can go to the EC2 console and log into the machine using either "Connect using EC2 Instance Connect" or "Connect using EC2 Instance Connect Endpoint" option under the "EC2 Instance Connect" tab.
Note: You may need to wait for some time after the Cloudformation stack is created, as some UserData scripts continue running after the Cloudformation stack shows Created.

You should get a message "Successfully created/updated stack - <StackName> in <Region>" if all goes well.
* [Check if Kafka Topic has been created] - Once you are inside the EC2 machine, you should be in the /home/ec2-user folder. Check to see the contents of the file kafka_topic_creator_output.txt by running the command cat kafka_topic_creator_output.txt. You should see an output such as "Created topic MskIamPythonLambdaTopic."

Once you have run `sam deploy --guided` mode once and saved arguments to a configuration file (samconfig.toml), you can use `sam deploy` in future to use these defaults.
If you are not able to find the file kafka_topic_creator_output.txt or if it is blank or you see an error message, then you need to run the file ./kafka_topic_creator.sh. This file runs a script that goes and creates the Kafka topic that the Lambda function will subscribe to.

## How it works
## Pre-requisites to Deploy the sample Lambda function

This pattern creates a Lambda function along with a Lambda Event Source Mapping(ESM) resource. This maps a Kafka topic on an MSK Cluster as a trigger to a Lambda function. The ESM takes care of polling the Kafka topic and then invokes the Lambda function with a batch of messages.
The EC2 machine that was created by running the Cloudformation template has all the software that will be needed to deploy the Lambda function.

## Test the sample application
Once the Lambda function is deployed, send some Kafka messages to the topic that you configured in the Lambda function trigger.
Either send at least 10 messages or wait for 300 seconds (check the values of BatchSize: 10 and MaximumBatchingWindowInSeconds: 300 in the template.yaml file)
Then check Amazon CloudWatch logs and you should see messages in the CloudWatch Log Group with the name of the deployed Lambda function.
The Lambda code parses the Kafka messages and outputs the fields in the Kafka messages to CloudWatch logs.
The AWS SAM CLI is a serverless tool for building and testing Lambda applications. It uses Docker to locally test your functions in an Amazon Linux environment that resembles the Lambda execution environment. It can also emulate your application's build environment and API.

A single Lambda function receives a batch of messages. The messages are received as a map with each key being a combination of the topic and the partition, as a single batch can receive messages from multiple partitions.
* Python3 - We have installed the version of Python3 that you picked up at the time of specifying the input parameters to the Cloudformation template
* AWS SAM CLI - We have installed the AWS SAM CLI (https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html)
* Docker - We have installed the Docker Community Edition on the EC2 machine (https://hub.docker.com/search/?type=edition&offering=community)

Each key has a list of messages. Each Kafka message has the following properties - `Topic`, `Partition`, `Offset`, `TimeStamp`, `TimeStampType`, `Key`, and `Value`.
We have also cloned the Github repository for serverless-patterns on the EC2 machine already by running the below command
```
git clone https://github.com/aws-samples/serverless-patterns.git
```
Change directory to the pattern directory:
```
cd serverless-patterns/msk-lambda-iam-python-sam
```
## Use the SAM CLI to build and test locally

The `Key` and `Value` are base64 encoded and have to be decoded. A message can also have a list of headers, each header having a key and a value.
Build your application with the `sam build` command.

The code in this example prints out the fields in the Kafka message and also decrypts the key and the value and logs them to CloudWatch logs.
```bash
sam build
```

The SAM CLI creates a deployment package, and saves it in the `.aws-sam/build` folder.

### Local development
Test a single function by invoking it directly with a test event. An event is a JSON document that represents the input that the function receives from the event source. Test events are included in the `events` folder in this project.

**You can invoke the function locally using `sam local`**
Run functions locally and invoke them with the `sam local invoke` command.

```bash
sam local invoke --event=events/event.json
sam local invoke --event events/event.json
```

You should see a response similar to the below
You should see a response such as the below:
```
***** Begin sam local invoke response *****
`START RequestId: 5c10310a-abf9-416e-b017-697d2c3ba097 Version: $LATEST
Received an event: {'eventSource': 'aws:kafka', 'eventSourceArn': 'arn:aws:kafka:us-west-2:123456789012:cluster/MSKWorkshopCluster/a93759a9-c9d0-4952-984c-492c6bfa2be8-13', 'bootstrapServers': 'b-1.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098,b-3.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098,b-2.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098', 'records': {'myTopic-0': [{'topic': 'myTopic', 'partition': 0, 'offset': 383, 'timestamp': 1678484822068, 'timestampType': 'CREATE_TIME', 'value': 'bTE=', 'headers': []}, {'topic': 'myTopic', 'partition': 0, 'offset': 384, 'timestamp': 1678484823448, 'timestampType': 'CREATE_TIME', 'value': 'bTI=', 'headers': []}, {'topic': 'myTopic', 'partition': 0, 'offset': 385, 'timestamp': 1678484824763, 'timestampType': 'CREATE_TIME', 'value': 'bTM=', 'headers': []}, {'topic': 'myTopic', 'partition': 0, 'offset': 386, 'timestamp': 1678484825902, 'timestampType': 'CREATE_TIME', 'value': 'bTQ=', 'headers': []}, {'topic': 'myTopic', 'partition': 0, 'offset': 387, 'timestamp': 1678484827810, 'timestampType': 'CREATE_TIME', 'value': 'bTU=', 'headers': []}]}}
@@ -165,12 +138,91 @@ Now finished printing details of record number: 5
END RequestId: 5c10310a-abf9-416e-b017-697d2c3ba097
REPORT RequestId: 5c10310a-abf9-416e-b017-697d2c3ba097 Init Duration: 6.68 ms Duration: 1502.83 ms Billed Duration: 1503 ms Memory Size: 128 MB Max Memory Used: 128 MB
***** End sam local invoke response *****
```

## Deploy the sample application


To deploy your application for the first time, run the following in your shell:

```bash
sam deploy --capabilities CAPABILITY_IAM --no-confirm-changeset --no-disable-rollback --region $AWS_REGION --stack-name msk-lambda-iam-python-sam --guided
```

The sam deploy command will package and deploy your application to AWS, with a series of prompts. You can accept all the defaults by hitting Enter:

* **Stack Name**: The name of the stack to deploy to CloudFormation. This should be unique to your account and region, and a good starting point would be something matching your project name.
* **AWS Region**: The AWS region you want to deploy your app to.
* **Parameter MSKClusterName**: The name of the MSKCluster
* **Parameter MSKClusterId**: The unique ID of the MSKCluster
* **Parameter MSKTopic**: The Kafka topic on which the lambda function will listen on
* **Confirm changes before deploy**: If set to yes, any change sets will be shown to you before execution for manual review. If set to no, the AWS SAM CLI will automatically deploy application changes.
* **Allow SAM CLI IAM role creation**: Many AWS SAM templates, including this example, create AWS IAM roles required for the AWS Lambda function(s) included to access AWS services. By default, these are scoped down to minimum required permissions. To deploy an AWS CloudFormation stack which creates or modifies IAM roles, the `CAPABILITY_IAM` value for `capabilities` must be provided. If permission isn't provided through this prompt, to deploy this example you must explicitly pass `--capabilities CAPABILITY_IAM` to the `sam deploy` command.
* **Disable rollback**: Defaults to No and it preserves the state of previously provisioned resources when an operation fails
* **Save arguments to configuration file**: If set to yes, your choices will be saved to a configuration file inside the project, so that in the future you can just re-run `sam deploy` without parameters to deploy changes to your application.
* **SAM configuration file [samconfig.toml]**: Name of the configuration file to store configuration information locally
* **SAM configuration environment [default]**: Environment for storing deployment information locally

You should get a message "Successfully created/updated stack - <StackName> in <Region>" if all goes well

Once you have run `sam deploy --guided` mode once and saved arguments to a configuration file (samconfig.toml), you can use `sam deploy` in future to use these defaults.

## How it works

This pattern creates a Lambda function along with a Lambda Event Source Mapping(ESM) resource. This maps a Kafka topic on an MSK Cluster as a trigger to a Lambda function. The ESM takes care of polling the Kafka topic and then invokes the Lambda function with a batch of messages.

## Test the sample application

Once the lambda function is deployed, send some Kafka messages on the topic that the lambda function is listening on, on the MSK server.

For your convenience, a script has been created on the EC2 machine that was provisioned using Cloudformation.

cd /home/ec2-user

You should see a script called kafka_message_sender.sh. Run that script and you should be able to send a new Kafka message in every line as shown below

```
[ec2-user@ip-10-0-0-126 ~]$ sh kafka_message_sender.sh
>My first message
>My second message
>My third message
>My fourth message
>My fifth message
>My sixth message
>My seventh message
>My eigth message
>My ninth message
>My tenth message
>Ctrl-C
```
Either send at least 10 messages or wait for 300 seconds (check the values of BatchSize: 10 and MaximumBatchingWindowInSeconds: 300 in the template.yaml file)

Then check Amazon CloudWatch logs and you should see messages in the CloudWatch Log Group with the name of the deployed Lambda function.

The Lambda code parses the Kafka messages and outputs the fields in the Kafka messages to CloudWatch logs.

A single Lambda function receives a batch of messages. The messages are received as a map with each key being a combination of the topic and the partition, as a single batch can receive messages from multiple partitions.

Each key has a list of messages. Each Kafka message has the following properties - `Topic`, `Partition`, `Offset`, `TimeStamp`, `TimeStampType`, `Key`, and `Value`.

The `Key` and `Value` are base64 encoded and have to be decoded. A message can also have a list of headers, each header having a key and a value.

The code in this example prints out the fields in the Kafka message and also decrypts the key and the value and logs them to CloudWatch logs.

## Cleanup

1. Delete the stack
```bash
sam delete
```
You can first clean-up the Lambda function by running the sam delete command

```
cd /home/ec2-user/serverless-patterns/msk-lambda-iam-python-sam
sam delete
```
confirm by pressing y for both the questions
You should see the lambda function getting deleted and a final confirmation "Deleted successfully" on the command-line

Next you need to delete the Cloudformation template that created the MSK Server and the EC2 machine by going to the Cloudformation console and selecting the stack and then hitting the "Delete" button. It will run for sometime but eventually you should see the stack getting cleaned up.

----
Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.