Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New serverless pattern - Serverless Messaging Redrive #2543

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions serverless-message-processing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Serverless Message Processing Pattern

## Overview
An adaptable pattern for message processing using AWS serverless services, featuring error handling and automatic recovery mechanisms.

## Core Components
- API Gateway (message ingestion)
- SQS Queues (main + DLQs)
- Lambda Functions (processing + recovery)

## Basic Flow
1. Messages enter through API Gateway
2. Main queue receives messages
3. Processor Lambda handles messages

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would include that this Lambda function polls the main queue using Event Source Mappings (ESMs) and handles the messages. Include links to docs / blogs / posts where they can build understanding.

4. Failed messages route to DLQs

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're going to use terse statements in a numbered or bulleted list, it would be good to include a more full explanation alongside them. In this context what's a "failed message"? See if you can go a level deeper to explain what's going on as it may be confusing to somebody unfamiliar with the pattern or certain service features, like DLQ.

5. Decision maker attempts an automated recovery

## Deployment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely to come up in @ellisms review, but we typically include a statement on users being responsible for spinning up / tearing down / paying for resources in their own AWS accounts. We'd also typically talk about having the AWS CLI installed and configured as a pre-requisite to using SAM. Take a look at some other SAM examples to get a feel for the norm.

# Build the SAM application
```
sam build
```
# Deploy the application
```
sam deploy --guided
```

## Key Features

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These may make sense to you in the context of being the author, but somebody coming to this as a new idea will need each of these explaining.

- Automatic retry mechanism
- Segregation of recoverable/fatal errors
- Extensible processing logic

## API Reference
# Send Message

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to include a copy of the API contract, but It would be nice to see commands that users can use in their terminal as an example curl https://${endpoint}/message.... with the relevant params to send a sample message.

```

POST /message
Content-Type: application/json
```
```
{
"messageType": "TYPE_A|TYPE_B|TYPE_C",
"payload": {},
"timestamp": "ISO8601_TIMESTAMP"
}
```


## Adaptation Points

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to keep a terse list then you might need a sentence or 2 before the list to explain what you mean by Adaptation Points (earlier you called it extensible, pick one way of describing this concept).

- Message validation rules
- Processing logic
- Error handling strategies
- Recovery mechanisms
- Monitoring requirements
- API Design

## Note

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this section, or is it a note to self that should have been deleted?

This is a sample pattern. Adapt security, scaling, and processing logic according to your requirements.
60 changes: 60 additions & 0 deletions serverless-message-processing/example-pattern.json

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zalilias you need to rename the file to be in line with the name of your pattern and replace the contents of the template / example with information about your pattern.

Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{
"title": "Step Functions to Athena",
"description": "Create a Step Functions workflow to query Amazon Athena.",
"language": "Python",
"level": "200",
"framework": "SAM",
"introBox": {
"headline": "How it works",
"text": [
"This sample project demonstrates how to use a serverless solution for processing and fixing malformed messages using SQS queues and Lambda functions",
"The system automatically handles message validation, applies fixes where possible, and routes messages to appropriate queues based on their fixability.",
"It has built-in error handling and detailed logging, it provides a robust framework for message processing that can be easily extended for specific business needs.",
"This pattern uses AWS Lambda for processing, multiple SQS queues for message routing, and includes 2 dead-letter queue (DLQ) for messages requiring human intervention or for auto-remediation."
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/serverless-messaging-processing",
"templateURL": "serverless-patterns/serverless-messaging-processing",
"projectFolder": "serverless-messaging-processing",
"templateFile": "template.yaml"
}
},
"resources": {
"bullets": [
{
"text": "Amazon SQS Docs",
"link": "https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html"
},
{
"text": "Using dead-letter queues in Amazon SQS",
"link": "https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html"
}
]
},
"deploy": {
"text": [
"sam build",
"sam deploy --guided"
]
},
"testing": {
"text": [
"See the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"text": [
"Delete the stack: <code>sam delete</code>."
]
},
"authors": [
{
"name": "Ilias Ali",
"image": "link-to-your-photo.jpg",
"bio": "I am a Solutions Architect working at AWS based in the UK.",
"linkedin": "ilias-ali-0849991a4"
}
]
}
152 changes: 152 additions & 0 deletions serverless-message-processing/functions/decision_maker/app.py

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't see where you were making use of the original failure message from the main queue to decide which branching logic to take. Using your "email fix" as an example.

  1. Invalid email address provided in payload to API GW
  2. Message stored durably in SQS main queue
  3. Lambda function polls the queue and attempts to use that malformed email address to some imagined down-stream service.
  4. Downstream service returns an error due to malformed email.
  5. Your main Lambda poller would send a response to SQS for that message and give a failure reason (shared enum of failures maybe), in this example "MALFORMED_EMAIL".
  6. When your decision maker picks that up, I presumed that it would switch through those failure reasons and apply the relevant fix.

For this example it's fine to just use the email scenario as 1 example with the view that the enum can be extended and more failure scenarios might be encountered.

Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import json
import re
import boto3
import os
import logging

# Set up logging
logger = logging.getLogger()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an opportunity to use the PowerTools logging module here instead of Python native?

logger.setLevel(logging.INFO)

# Initialize AWS clients
sqs = boto3.client('sqs')

# Environment variables
MAIN_QUEUE_URL = os.environ['MAIN_QUEUE_URL']
FATAL_DLQ_URL = os.environ['FATAL_DLQ_URL']

# Email validation pattern
EMAIL_PATTERN = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'

def fix_email(email):
"""
Attempt to fix common email format issues
Can be amended to other scenarios e.g. Downstream issues
"""
# Remove multiple @ symbols, keep the last one
if email.count('@') > 1:
parts = email.split('@')
email = f"{parts[0]}@{parts[-1]}"

# Remove spaces
email = email.strip().replace(' ', '')

# Fix common typos in domain extensions
common_fixes = {
'.con': '.com',
'.vom': '.com',
'.comm': '.com',
'.orgg': '.org',
'.nett': '.net'
}

for wrong, right in common_fixes.items():
if email.endswith(wrong):
email = email[:-len(wrong)] + right

return email

def can_fix_email(message):
"""
Check if the email in the message can be fixed
"""
if 'email' not in message:
return False

email = message['email']
fixed_email = fix_email(email)

return bool(re.match(EMAIL_PATTERN, fixed_email))


def lambda_handler(event, context):
"""
Processes messages from a DLQ that have already failed to be automatically processed,
and attempts automated remediation and redelivery of the messages back to the main queue.
If no suitable fixes can be applied, messages end up in a fatal DLQ where the typical
approach of human intervention is required.

Flow:
1. Attempt to fix message
2. If fixable -> Main Queue

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't always true is it? If a message needs to be held for a period before being retried this function would deliver the message to a "delay queue" and ends its responsibility there. The delay queue is then responsible for delivering the message back to the main queue.

3. If unfixable -> Fatal DLQ

Extension points:
1. Add more sophisticated routing logic- including a delay queue
2. Implement custom error handling
3. Add message transformation
4. Implement retry mechanisms
5. Add monitoring and metrics

"""
processed_count = 0

for record in event['Records']:
try:
# Parse the message body
message = json.loads(record['body'])
original_message_id = record.get('messageId', 'unknown')

logger.info(f"Processing failed message: {original_message_id}")




# Option A: Try to fix malformed email

if can_fix_email(message) and not re.match(EMAIL_PATTERN, message['email']):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your function can_fix_email() seems to itself make a call (line 57) to fix_email() and then you call the same function again on the next line (line 98). Why does it need to be called twice? Seems like you have functions with mixed responsibilities.

fixed_email = fix_email(message['email'])
logger.info(f"Fixed email from '{message['email']}' to '{fixed_email}'")

# Update the message with fixed email
message['email'] = fixed_email
message['emailWasFixed'] = True

# Send back to main queue
sqs.send_message(
QueueUrl=MAIN_QUEUE_URL,
MessageBody=json.dumps(message)
)

logger.info(f"Sent fixed message back to main queue: {original_message_id}")

# Option B: Cannot fix - send to fatal DLQ
else:
logger.warning(f"Message cannot be fixed, sending to fatal DLQ: {original_message_id}")

# Add failure reason if not present
if 'failureReason' not in message:
message['failureReason'] = 'Unrecoverable error - could not fix message'

# Send to fatal DLQ
sqs.send_message(
QueueUrl=FATAL_DLQ_URL,
MessageBody=json.dumps(message)
)

processed_count += 1

except Exception as e:
logger.error(f"Error processing message {original_message_id}: {str(e)}")
# If we can't process the decision, send to fatal DLQ
try:
error_message = {
'originalMessage': record['body'],
'failureReason': f"Decision maker error: {str(e)}",
'timestamp': context.invoked_function_arn
}
sqs.send_message(
QueueUrl=FATAL_DLQ_URL,
MessageBody=json.dumps(error_message)
)

except Exception as fatal_e:
logger.critical(f"Could not send to fatal DLQ: {str(fatal_e)}")
raise

return {
'statusCode': 200,
'body': json.dumps({
'processedMessages': processed_count
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
boto3==1.26.137
121 changes: 121 additions & 0 deletions serverless-message-processing/functions/processor/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import json
import logging

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use PowerTools logging implementation?


# Set up logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def validate_message_structure(message):
"""
Validate message structure and required fields.
Args:
message: Dictionary containing message data
Returns:
bool: True if valid message structure, False otherwise
"""
required_fields = ['messageType', 'payload', 'timestamp']
return all(field in message for field in required_fields)

def process_message(message):
"""
Process the message content.
Args:
message: Dictionary containing message data
Returns:
bool: True if processing successful, False otherwise
"""
try:
# Validate message structure
if not validate_message_structure(message):
logger.error("Message missing required fields")
raise ValueError("Invalid message structure")

message_type = message['messageType']
payload = message['payload']

# Validate message type
valid_types = ['TYPE_A', 'TYPE_B', 'TYPE_C']
if message_type not in valid_types:
logger.error(f"Invalid message type: {message_type}")
raise ValueError(f"Invalid message type: {message_type}")

# Check for downstream system status
if 'systemStatus' in message and message['systemStatus'].lower() == 'unavailable':
logger.error("Target system is unavailable")
raise ValueError("DOWNSTREAM_ERROR: Target system unavailable")

# Process the message based on type
logger.info(f"Processing message type: {message_type}")

# Add type-specific processing logic here
if message_type == 'TYPE_A':
# Process TYPE_A messages
pass
elif message_type == 'TYPE_B':
# Process TYPE_B messages
pass
elif message_type == 'TYPE_C':
# Process TYPE_C messages
pass

return True

except Exception as e:
logger.error(f"Error processing message: {str(e)}")
raise

def lambda_handler(event, context):
"""
Main Lambda handler function.
Args:
event: Lambda event object
context: Lambda context object
Returns:
dict: Response object
"""
logger.info(f"Processing {len(event['Records'])} messages")

processed_count = 0
failed_count = 0
downstream_errors = 0

for record in event['Records']:
try:
# Parse the message body
message = json.loads(record['body'])

# Process the message
if process_message(message):
processed_count += 1
logger.info(f"Successfully processed message: {message.get('messageId', 'unknown')}")
else:
failed_count += 1
logger.warning(f"Message processing returned False: {message.get('messageId', 'unknown')}")

except json.JSONDecodeError as e:
failed_count += 1
logger.error(f"Invalid JSON in message: {str(e)}")
raise

except ValueError as e:
if "DOWNSTREAM_ERROR" in str(e):
downstream_errors += 1
logger.error("Downstream error detected")
raise
failed_count += 1
logger.error(f"Validation error: {str(e)}")
raise

except Exception as e:
failed_count += 1
logger.error(f"Unexpected error processing message: {str(e)}")
raise

return {
'statusCode': 200,
'body': json.dumps({
'processed': processed_count,
'failed': failed_count,
'downstream_errors': downstream_errors
})
}
Loading