Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New serverless pattern - Serverless Messaging Redrive #2543

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions serverless-message-processing/functions/decision_maker/app.py

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't see where you were making use of the original failure message from the main queue to decide which branching logic to take. Using your "email fix" as an example.

  1. Invalid email address provided in payload to API GW
  2. Message stored durably in SQS main queue
  3. Lambda function polls the queue and attempts to use that malformed email address to some imagined down-stream service.
  4. Downstream service returns an error due to malformed email.
  5. Your main Lambda poller would send a response to SQS for that message and give a failure reason (shared enum of failures maybe), in this example "MALFORMED_EMAIL".
  6. When your decision maker picks that up, I presumed that it would switch through those failure reasons and apply the relevant fix.

For this example it's fine to just use the email scenario as 1 example with the view that the enum can be extended and more failure scenarios might be encountered.

Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import json
import boto3
import os
import logging

# Set up logging
logger = logging.getLogger()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an opportunity to use the PowerTools logging module here instead of Python native?

logger.setLevel(logging.INFO)

# Initialize AWS clients
sqs = boto3.client('sqs')

# Environment variables
MAIN_QUEUE_URL = os.environ['MAIN_QUEUE_URL']
FATAL_DLQ_URL = os.environ['FATAL_DLQ_URL']

def can_fix_message(message):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're using a function to determine if a message can be fixed or not, then as part of its return you need to know which "fix strategy" to use, and this needs to be a parameter for your fix_message function too doesn't it so that it knows which fix to apply.

"""
Determine if a message can be fixed automatically.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include a sample scenario for customers to reference as a model? That could be "bad character replacement", "downstream system temporarily unavailable", or anything else you choose.


Extension points:
1. Add validation for specific message formats
2. Implement business-specific fix rules
3. Add data transformation logic
4. Implement retry strategies
5. Add validation against external systems
"""
try:
# Basic message validation
# Add your validation logic here
return True
except Exception as e:
logger.error(f"Validation error: {str(e)}")
return False

def fix_message(message):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function definition needs to be expanded to include a fix strategy that you fork to within the function depending on what type of "fix" is needed, whether that's character replacement, a delay, etc.

"""
Apply fixes to the message.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as on the other function, the wording needs to be expanded to help people with zero context understand what's meant to be happening here.


Extension points:
1. Add data normalization
2. Implement field-specific fixes
3. Add data enrichment
4. Implement format conversion
5. Add validation rules
"""
try:
fixed_message = message.copy()
# Add your fix logic here
fixed_message['wasFixed'] = True
return fixed_message
except Exception as e:
logger.error(f"Fix error: {str(e)}")
return None

def lambda_handler(event, context):
"""
Process messages and route them based on fixability.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to expand the wording and define what you mean by "fixability". I believe what you're aiming for is something like:

"Processes messages from a DLQ that have already failed to be automatically processed, and attempts automated remediation and redelivery of the messages back to the main queue. If no suitable fixes can be applied, messages end up in a fatal DLQ where the typical approach of human intervention is required."


Flow:
1. Attempt to fix message
2. If fixable -> Main Queue

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't always true is it? If a message needs to be held for a period before being retried this function would deliver the message to a "delay queue" and ends its responsibility there. The delay queue is then responsible for delivering the message back to the main queue.

3. If unfixable -> Fatal DLQ

Extension points:
1. Add more sophisticated routing logic
2. Implement custom error handling
3. Add message transformation
4. Implement retry mechanisms
5. Add monitoring and metrics
"""
processed_count = 0

for record in event['Records']:
message_id = 'unknown' # Initialize message_id with default value

try:
message = json.loads(record['body'])
message_id = record.get('messageId', 'unknown')

logger.info(f"Processing message: {message_id}")

if can_fix_message(message):
fixed_message = fix_message(message)
if fixed_message:
# Send to main queue
sqs.send_message(
QueueUrl=MAIN_QUEUE_URL,
MessageBody=json.dumps(fixed_message)
)
logger.info(f"Fixed message sent to main queue: {message_id}")
else:
raise ValueError("Message fix failed")
else:
# Send to fatal DLQ
message['failureReason'] = 'Message cannot be automatically fixed'
sqs.send_message(
QueueUrl=FATAL_DLQ_URL,
MessageBody=json.dumps(message)
)
logger.warning(f"Message sent to fatal DLQ: {message_id}")

processed_count += 1

except Exception as e:
logger.error(f"Error processing message {message_id}: {str(e)}")
try:
error_message = {
'originalMessage': record['body'],
'failureReason': str(e),
'timestamp': context.invoked_function_arn
}
sqs.send_message(
QueueUrl=FATAL_DLQ_URL,
MessageBody=json.dumps(error_message)
)
logger.error(f"Error message sent to fatal DLQ: {message_id}")
except Exception as fatal_e:
logger.critical(f"Fatal DLQ error: {str(fatal_e)}")
raise

return {
'statusCode': 200,
'body': json.dumps({
'processedMessages': processed_count
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
boto3==1.26.137
jsonschema==4.17.3

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this being used directly anywhere?

121 changes: 121 additions & 0 deletions serverless-message-processing/functions/processor/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import json
import logging

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use PowerTools logging implementation?


# Set up logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def validate_message_structure(message):
"""
Validate message structure and required fields.
Args:
message: Dictionary containing message data
Returns:
bool: True if valid message structure, False otherwise
"""
required_fields = ['messageType', 'payload', 'timestamp']
return all(field in message for field in required_fields)

def process_message(message):
"""
Process the message content.
Args:
message: Dictionary containing message data
Returns:
bool: True if processing successful, False otherwise
"""
try:
# Validate message structure
if not validate_message_structure(message):
logger.error("Message missing required fields")
raise ValueError("Invalid message structure")

message_type = message['messageType']
payload = message['payload']

# Validate message type
valid_types = ['TYPE_A', 'TYPE_B', 'TYPE_C']
if message_type not in valid_types:
logger.error(f"Invalid message type: {message_type}")
raise ValueError(f"Invalid message type: {message_type}")

# Check for downstream system status
if 'systemStatus' in message and message['systemStatus'].lower() == 'unavailable':
logger.error("Target system is unavailable")
raise ValueError("DOWNSTREAM_ERROR: Target system unavailable")

# Process the message based on type
logger.info(f"Processing message type: {message_type}")

# Add type-specific processing logic here
if message_type == 'TYPE_A':
# Process TYPE_A messages
pass
elif message_type == 'TYPE_B':
# Process TYPE_B messages
pass
elif message_type == 'TYPE_C':
# Process TYPE_C messages
pass

return True

except Exception as e:
logger.error(f"Error processing message: {str(e)}")
raise

def lambda_handler(event, context):
"""
Main Lambda handler function.
Args:
event: Lambda event object
context: Lambda context object
Returns:
dict: Response object
"""
logger.info(f"Processing {len(event['Records'])} messages")

processed_count = 0
failed_count = 0
downstream_errors = 0

for record in event['Records']:
try:
# Parse the message body
message = json.loads(record['body'])

# Process the message
if process_message(message):
processed_count += 1
logger.info(f"Successfully processed message: {message.get('messageId', 'unknown')}")
else:
failed_count += 1
logger.warning(f"Message processing returned False: {message.get('messageId', 'unknown')}")

except json.JSONDecodeError as e:
failed_count += 1
logger.error(f"Invalid JSON in message: {str(e)}")
raise

except ValueError as e:
if "DOWNSTREAM_ERROR" in str(e):
downstream_errors += 1
logger.error("Downstream error detected")
raise
failed_count += 1
logger.error(f"Validation error: {str(e)}")
raise

except Exception as e:
failed_count += 1
logger.error(f"Unexpected error processing message: {str(e)}")
raise

return {
'statusCode': 200,
'body': json.dumps({
'processed': processed_count,
'failed': failed_count,
'downstream_errors': downstream_errors
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
boto3==1.26.137
jsonschema==4.17.3

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this being used directly?

Loading