Skip to content

Commit

Permalink
Port newer get_sqs_batch_handler method from OCS
Browse files Browse the repository at this point in the history
The newer sqs batch handler, can deal with both standard and fifo
SQS queues.
  • Loading branch information
njmei committed May 9, 2024
1 parent c229c42 commit b201494
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 12 deletions.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ dynamic = [
"version",
]
dependencies = [
"aws-lambda-powertools~=2.35",
"pydantic>=2.0.3",
"aws-lambda-powertools ~= 2.35",
"pydantic >= 2.0.3, < 3",
"aws-lambda-typing",
"aibs-informatics-aws-utils @ git+ssh://[email protected]/AllenInstitute/aibs-informatics-aws-utils.git@main",
"aibs-informatics-core @ git+ssh://[email protected]/AllenInstitute/aibs-informatics-core.git@main",
Expand Down
42 changes: 32 additions & 10 deletions src/aibs_informatics_aws_lambda/common/handler.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
import json
import logging
from dataclasses import dataclass
from typing import Callable, Generic, Optional, TypeVar, Union
from typing import Callable, Generic, Literal, Optional, TypeVar, Union, cast

from aibs_informatics_aws_utils.s3 import download_to_json_object, upload_json
from aibs_informatics_core.executors.base import BaseExecutor
from aibs_informatics_core.models.aws.s3 import S3URI
from aibs_informatics_core.models.base import ModelProtocol
from aibs_informatics_core.utils.json import JSON
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.batch import (
BatchProcessor,
EventType,
SqsFifoPartialProcessor,
batch_processor,
process_partial_response,
)
from aws_lambda_powertools.utilities.batch.types import PartialItemFailureResponse
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext
Expand Down Expand Up @@ -115,24 +122,35 @@ def deserialize_sqs_record(cls, record: SQSRecord) -> REQUEST:
return cls.deserialize_request(json.loads(record["body"]))

@classmethod
def get_sqs_batch_handler(cls, *args, **kwargs) -> LambdaHandlerType:
def get_sqs_batch_handler(
cls, *args, queue_type: Literal["standard", "fifo"] = "standard", **kwargs
) -> LambdaHandlerType:
"""Creates a handler for processing SQS batch records
More info:
https://awslabs.github.io/aws-lambda-powertools-python/1.25.1/utilities/batch/#processing-messages-from-sqs
https://docs.powertools.aws.dev/lambda/python/2.26.1/utilities/batch/#processing-messages-from-sqs
Returns:
Callable[[LambdaEvent, LambdaContext], Optional[JSON]]: [description]
"""
processor = BatchProcessor(event_type=EventType.SQS)
if queue_type == "standard":
processor = BatchProcessor(event_type=EventType.SQS)
elif queue_type == "fifo":
processor = SqsFifoPartialProcessor()
else:
raise RuntimeError(
"An invalid SQS queue_type ({queue_type}) was provided to the "
"get_sqs_batch_handler() method. Valid values include: "
"[standard, fifo]"
)
logger = cls.get_logger(cls.service_name())

# Create a record handler for each record in batch.
def record_handler(record: SQSRecord) -> Optional[JSON]:
if not cls.should_process_sqs_record(record):
logger.info(f"SQS record {record} elected not to be processed.")
return None
lambda_handler = cls(*args, **kwargs) # type: ignore[call-arg]
lambda_handler = cls(*args, **kwargs)
lambda_handler.log = logger
lambda_handler.add_logger_to_root()

Expand All @@ -146,11 +164,15 @@ def record_handler(record: SQSRecord) -> Optional[JSON]:

# Now create top-level handler
@logger.inject_lambda_context(log_event=True)
@batch_processor(record_handler=record_handler, processor=processor) # type: ignore
def handler(event, context: LambdaContext):
return processor.response()
def handler(event: dict, context: LambdaContext) -> PartialItemFailureResponse:
return process_partial_response(
event=event,
record_handler=record_handler,
processor=processor,
context=context,
)

return handler # type: ignore
return cast(LambdaHandlerType, handler)

@classmethod
def should_process_dynamodb_record(cls, record: DynamoDBRecord) -> bool:
Expand Down

0 comments on commit b201494

Please sign in to comment.