diff --git a/README.md b/README.md index 2ba8b9c..0db25b2 100644 --- a/README.md +++ b/README.md @@ -6,3 +6,116 @@ This is a base package that can be used standalone with some core lambda functionality or as a dependency. + +## Package Overview + +The package contains several classes and functions that make it easy to create strongly typed lambda functions with many nice-to-have features (serialization/deserialization, easy to add metrics, utilities to create batch sqs and dynamo db event bridge processing ). In addition to these base classes, you can also use a collection of general purpose lambda handler classes. + +### Base Classes and Functions + +#### `LambdaHandler` + +The [`LambdaHandler`](src/aibs_informatics_aws_lambda/common/handler.py) class provides a base class for creating strongly typed lambda functions with features like serialization/deserialization, logging, and metrics. + + +#### `ApiLambdaHandler` and `ApiResolverBuilder` + +These classes extend the `LambdaHandler` class and provide a way to create strongly typed lambda functions that can be used as API Gateway endpoints. + +- [`ApiLambdaHandler`](src/aibs_informatics_aws_lambda/common/api/handler.py): A base class for API Gateway handlers. +- [`ApiResolverBuilder`](src/aibs_informatics_aws_lambda/common/api/resolver.py): A utility class for building API Gateway resolvers. + + +### Standalone Lambda Classes in this Package + +Most of these lambda functions are found under [src/aibs_informatics_aws_lambda/handlers/](./src/aibs_informatics_aws_lambda/handlers/) + +#### AWS Batch Functions + +- [`CreateDefinitionAndPrepareArgsHandler`](src/aibs_informatics_aws_lambda/handlers/batch/create.py): Handles the creation and preparation of AWS Batch job definitions. +- [`PrepareBatchDataSyncHandler`](src/aibs_informatics_aws_lambda/handlers/data_sync/operations.py): Prepares data synchronization tasks for AWS Batch. + +#### Data Sync Functions + +##### Data Sync Operations (Writing, Reading and Syncing Data) +- [`GetJSONFromFileHandler`](src/aibs_informatics_aws_lambda/handlers/data_sync/operations.py): Retrieves JSON data from a file. +- [`PutJSONToFileHandler`](src/aibs_informatics_aws_lambda/handlers/data_sync/operations.py): Writes JSON data to a file. +- [`DataSyncHandler`](src/aibs_informatics_aws_lambda/handlers/data_sync/operations.py): Simple data sync task. +- [`BatchDataSyncHandler`](src/aibs_informatics_aws_lambda/handlers/data_sync/operations.py): Handles batch of data sync tasks. +- [`PrepareBatchDataSyncHandler`](src/aibs_informatics_aws_lambda/handlers/data_sync/operations.py): Taking a data sync request, it analyzes and generates multiple batches of data sync tasks to evenly distribute the load across multiple batch data sync tasks. + + +#### Data Sync File System Functions (Managing Data Paths) +- [`GetDataPathStatsHandler`](src/aibs_informatics_aws_lambda/handlers/data_sync/file_system.py): Retrieves statistics about data paths. +- [`ListDataPathsHandler`](src/aibs_informatics_aws_lambda/handlers/data_sync/file_system.py): Lists data paths. +- [`OutdatedDataPathScannerHandler`](src/aibs_informatics_aws_lambda/handlers/data_sync/file_system.py): Scans for outdated data paths. +- [`RemoveDataPathsHandler`](src/aibs_informatics_aws_lambda/handlers/data_sync/file_system.py): Removes data paths. + +#### Demand Functions + +- [`PrepareDemandScaffoldingHandler`](src/aibs_informatics_aws_lambda/handlers/demand/scaffolding.py): Prepares scaffolding for demand execution. + +#### Notification Functions + +- [`NotificationRouter`](src/aibs_informatics_aws_lambda/handlers/notifications/router.py): Routes notifications to the appropriate notifier. +- [`SESNotifier`](src/aibs_informatics_aws_lambda/handlers/notifications/notifiers/ses.py): Sends notifications via Amazon SES. +- [`SNSNotifier`](src/aibs_informatics_aws_lambda/handlers/notifications/notifiers/sns.py): Sends notifications via Amazon SNS. + + +#### ECR Image Replicator + +- [`ImageReplicatorHandler`](src/aibs_informatics_aws_lambda/handlers/ecr/replicate_image.py): Handles the replication of ECR images between repositories using the [`ECRImageReplicator`](https://github.com/AllenInstitute/aibs-informatics-aws-utils/tree/main/src/aibs_informatics_aws_utils/ecr/image_replicator.py). + + + +### CLI Invocation + +With this package, you can also invoke lambda functions from the command line. The CLI executable is installed as `handle-lambda-request` and can be used to invoke lambda functions with payloads that can be specified as JSON, files, or S3 objects. + +``` +usage: handle-lambda-request [-h] [--handler-qualified-name HANDLER_QUALIFIED_NAME] [--payload PAYLOAD] [--response-location RESPONSE_LOCATION] + +CLI AWS Lambda Handler + +options: + -h, --help show this help message and exit + --handler-qualified-name HANDLER_QUALIFIED_NAME, --handler-name HANDLER_QUALIFIED_NAME, --handler HANDLER_QUALIFIED_NAME + handler function qualified name. If not provided, will try to load from ('AWS_LAMBDA_FUNCTION_HANDLER', '_HANDLER') env variables + --payload PAYLOAD, --event PAYLOAD, -e PAYLOAD + event payload of function. If not provided, will try to load from AWS_LAMBDA_EVENT_PAYLOAD env variable + --response-location RESPONSE_LOCATION, -o RESPONSE_LOCATION + optional response location to store response at. can be S3 or local file. If not provided, will load from AWS_LAMBDA_EVENT_RESPONSE_LOCATION env variable. +``` + +#### Examples + +##### Invoking a Lambda Function with a JSON Payload + +```bash +handle-lambda-request --handler-qualified-name aibs_informatics_aws_lambda.handlers.data_sync.operations.GetJSONFromFileHandler --payload '{"path": "/path/to/file.json"}' --response-location /tmp/response.json +``` + +##### Invoking a Lambda Function with a JSON Payload from a File + +```bash +handle-lambda-request --handler-qualified-name aibs_informatics_aws_lambda.handlers.data_sync.operations.GetJSONFromFileHandler --payload-file /path/to/payload.json --response-location /tmp/response.json +``` + +##### Invoking a Lambda Function with a JSON Payload from S3 and Saving the Response to S3 + +```bash +handle-lambda-request --handler-qualified-name aibs_informatics_aws_lambda.handlers.data_sync.operations.GetJSONFromFileHandler --payload-file s3://bucket/key/payload.json --response-location s3://bucket/key/response.json +``` + +##### Invoking a Lambda Function with environment variables + +```bash +AWS_LAMBDA_EVENT_PAYLOAD='{"path": "/path/to/file.json"}' +AWS_LAMBDA_EVENT_RESPONSE_LOCATION='/tmp/response.json' +handle-lambda-request --handler-qualified-name aibs_informatics_aws_lambda.handlers.data_sync.operations.GetJSONFromFileHandler +``` + + +## Testing + +The package includes comprehensive tests for all handlers, which can be found under the [test](test) directory. \ No newline at end of file diff --git a/src/aibs_informatics_aws_lambda/common/api/handler.py b/src/aibs_informatics_aws_lambda/common/api/handler.py index 6dd83f6..a7bebe5 100644 --- a/src/aibs_informatics_aws_lambda/common/api/handler.py +++ b/src/aibs_informatics_aws_lambda/common/api/handler.py @@ -113,6 +113,8 @@ def gateway_handler(logger=logger, metrics=metrics, **route_parameters) -> Any: logger.info(f"Handling {router.current_event.raw_event} event.") + cls._parse_event_headers(router.current_event, logger) + request = cls._parse_event( router.current_event, route_parameters, cast(logging.Logger, logger) ) @@ -164,6 +166,18 @@ def _parse_event( request = cls.get_request_from_http_parameters(http_parameters) return request + @classmethod + def _parse_event_headers(cls, event: BaseProxyEvent, logger: logging.Logger): + logger.info("Parsing and validating event headers") + cls.validate_headers(event.headers) + config = cls.resolve_request_config(event.headers) + try: + if config.service_log_level: + logger.info(f"Setting log level to {config.service_log_level}") + logger.setLevel(config.service_log_level) + except Exception as e: + logger.warning(f"Failed to set log level to {config.service_log_level}: {e}") + def __repr__(self) -> str: return ( f"{self.__class__.__name__}(route={self.route_rule()}, method={self.route_method()})" diff --git a/src/aibs_informatics_aws_lambda/common/handler.py b/src/aibs_informatics_aws_lambda/common/handler.py index cac42b3..16ca539 100644 --- a/src/aibs_informatics_aws_lambda/common/handler.py +++ b/src/aibs_informatics_aws_lambda/common/handler.py @@ -25,7 +25,7 @@ from aibs_informatics_aws_lambda.common.metrics import MetricsMixins LambdaEvent = Union[JSON] # type: ignore # https://github.com/python/mypy/issues/7866 -LambdaHandlerType = Callable[[JSON, LambdaContext], Optional[JSON]] +LambdaHandlerType = Callable[[LambdaEvent, LambdaContext], Optional[JSON]] logger = logging.getLogger(__name__) REQUEST = TypeVar("REQUEST", bound=ModelProtocol) diff --git a/src/aibs_informatics_aws_lambda/handlers/data_sync/file_system.py b/src/aibs_informatics_aws_lambda/handlers/data_sync/file_system.py index 4b24b0d..1bcd89f 100644 --- a/src/aibs_informatics_aws_lambda/handlers/data_sync/file_system.py +++ b/src/aibs_informatics_aws_lambda/handlers/data_sync/file_system.py @@ -2,15 +2,21 @@ from datetime import timedelta from pathlib import Path from typing import List, TypeVar +from xml.etree.ElementInclude import include from aibs_informatics_aws_utils.data_sync.file_system import BaseFileSystem, Node, get_file_system from aibs_informatics_aws_utils.efs import detect_mount_points, get_local_path from aibs_informatics_core.models.aws.efs import EFSPath from aibs_informatics_core.models.aws.s3 import S3URI -from aibs_informatics_core.utils.file_operations import get_path_size_bytes, remove_path +from aibs_informatics_core.utils.file_operations import ( + get_path_size_bytes, + remove_path, + strip_path_root, +) from aibs_informatics_aws_lambda.common.handler import LambdaHandler from aibs_informatics_aws_lambda.handlers.data_sync.model import ( + DataPath, GetDataPathStatsRequest, GetDataPathStatsResponse, ListDataPathsRequest, @@ -43,9 +49,21 @@ def handle(self, request: GetDataPathStatsRequest) -> GetDataPathStatsResponse: class ListDataPathsHandler(LambdaHandler[ListDataPathsRequest, ListDataPathsResponse]): def handle(self, request: ListDataPathsRequest) -> ListDataPathsResponse: root = get_file_system(request.path) - return ListDataPathsResponse( - paths=sorted([n.path for n in root.node.list_nodes()]), - ) + paths: List[DataPath] = sorted([n.path for n in root.node.list_nodes()]) + + if request.include_patterns or request.exclude_patterns: + new_paths = [] + for path in paths: + rel_path = strip_path_root(path, root.node.path) + if request.include_patterns: + if not any([i.match(rel_path) for i in request.include_patterns]): + continue + if request.exclude_patterns: + if any([i.match(rel_path) for i in request.exclude_patterns]): + continue + new_paths.append(path) + paths = new_paths + return ListDataPathsResponse(paths=paths) class OutdatedDataPathScannerHandler( diff --git a/src/aibs_informatics_aws_lambda/handlers/data_sync/model.py b/src/aibs_informatics_aws_lambda/handlers/data_sync/model.py index 05652ec..03384e3 100644 --- a/src/aibs_informatics_aws_lambda/handlers/data_sync/model.py +++ b/src/aibs_informatics_aws_lambda/handlers/data_sync/model.py @@ -1,6 +1,9 @@ +import re from dataclasses import dataclass from datetime import datetime +from functools import cached_property from pathlib import Path +from re import Pattern from typing import Dict, List, Optional, Union from aibs_informatics_aws_utils.data_sync.file_system import PathStats @@ -57,7 +60,42 @@ def local_path(self) -> Optional[Path]: @dataclass class ListDataPathsRequest(WithDataPath): - pass + """List Data paths request + + Args: + path (DataPath): path under which to list files + include (Optional[str|list[str]]): Optionally can specify regex patterns to filter on what + to include. If providing multiple options, a path is returned if it matches *any* of + the include patterns. Exclude patterns override include patterns. + Defaults to None + exclude (Optional[str|list[str]]): Optionally can specify regex patterns to filter on what + to exclude. If providing multiple options, a path is omitted if it matches *any* of + the exclude patterns. Exclude patterns override include patterns. + Defaults to None + """ + + include: Optional[Union[str, List[str]]] = custom_field( + default=None, + mm_field=UnionField([(str, StringField()), (list, ListField(StringField()))]), + ) + exclude: Optional[Union[str, List[str]]] = custom_field( + default=None, + mm_field=UnionField([(str, StringField()), (list, ListField(StringField()))]), + ) + + @cached_property + def include_patterns(self) -> Optional[List[Pattern]]: + return self._get_patterns(self.include) + + @cached_property + def exclude_patterns(self) -> Optional[List[Pattern]]: + return self._get_patterns(self.exclude) + + @staticmethod + def _get_patterns(value: Optional[Union[str, List[str]]]) -> Optional[List[Pattern]]: + if not value: + return None + return [re.compile(p) for p in ([value] if isinstance(value, str) else value)] @dataclass diff --git a/test/aibs_informatics_aws_lambda/common/api/test_resolver.py b/test/aibs_informatics_aws_lambda/common/api/test_resolver.py index 52a347c..956b6d6 100644 --- a/test/aibs_informatics_aws_lambda/common/api/test_resolver.py +++ b/test/aibs_informatics_aws_lambda/common/api/test_resolver.py @@ -69,6 +69,7 @@ def test__resolve__succeeds(self): self.builder.add_handlers(target_module=handlers_module) event = self.create_event(path="/health", method="GET", body="{}") + event["headers"]["X-Client-Version"] = "1.0.0" context = DefaultLambdaContext() lambda_handler = self.builder.get_lambda_handler() response = lambda_handler(event, context) @@ -90,6 +91,7 @@ def test__resolve__updates_logging(self): self.builder.add_handlers(target_module=handlers_module) event = self.create_event(path="/health", method="GET", body="{}") event["headers"]["X-Log-Level"] = "DEBUG" + event["headers"]["X-Client-Version"] = "1.0.0" context = DefaultLambdaContext() lambda_handler = self.builder.get_lambda_handler() response = lambda_handler(event, context) @@ -102,6 +104,7 @@ def test__resolve__handles_invalid_logging_level(self): self.builder.add_handlers(target_module=handlers_module) event = self.create_event(path="/health", method="GET", body="{}") event["headers"]["X-Log-Level"] = "DOES_NOT_DEBUG" + event["headers"]["X-Client-Version"] = "1.0.0" context = DefaultLambdaContext() lambda_handler = self.builder.get_lambda_handler() response = lambda_handler(event, context) @@ -163,6 +166,7 @@ def create_event( "X-Forwarded-For": "163.123.189.8, 15.158.4.70", "X-Forwarded-Port": "443", "X-Forwarded-Proto": "https", + "x-client-version": "1.0.0", }, "multiValueHeaders": { "Accept-Encoding": ["identity"], diff --git a/test/aibs_informatics_aws_lambda/handlers/data_sync/test_file_system.py b/test/aibs_informatics_aws_lambda/handlers/data_sync/test_file_system.py index f859390..bb680c6 100644 --- a/test/aibs_informatics_aws_lambda/handlers/data_sync/test_file_system.py +++ b/test/aibs_informatics_aws_lambda/handlers/data_sync/test_file_system.py @@ -53,6 +53,139 @@ def test__handles__simple__fetches_all_paths(self): response = ListDataPathsResponse(paths=[f"{root}/", root / "x", root / "y"]) self.assertHandles(self.handler, request.to_dict(), response=response.to_dict()) + def test__handles__nested_paths(self): + root = self.tmp_path() + self.add_files_to_file_system( + root, + ("x1/y1/z1", 1), + ("x1/y1/z2", 1), + ("x1/y2/z1", 1), + ("x1/y2/z2", 1), + ("x2/y1/z1", 1), + ("x2/y2/z2", 1), + ) + + request = ListDataPathsRequest(path=root / "x1") + response = ListDataPathsResponse( + paths=[ + f"{root}/x1/", + f"{root}/x1/y1/", + f"{root}/x1/y1/z1", + f"{root}/x1/y1/z2", + f"{root}/x1/y2/", + f"{root}/x1/y2/z1", + f"{root}/x1/y2/z2", + ] + ) + self.assertHandles(self.handler, request.to_dict(), response=response.to_dict()) + + def test__handles__include_regex_patterns(self): + root = self.prep_regex_pattern_test() + + # Single include pattern + self.assertHandles( + self.handler, + ListDataPathsRequest(path=root, include=".*A.*").to_dict(), + ListDataPathsResponse( + paths=[ + f"{root}/A/", + f"{root}/A/B/", + f"{root}/A/B/C", + f"{root}/A/B/D", + f"{root}/A/C/", + f"{root}/A/C/E", + f"{root}/A/C/F", + f"{root}/C/B/A", + ] + ).to_dict(), + ) + + # Multiple include patterns + self.assertHandles( + self.handler, + ListDataPathsRequest(path=root, include=[f"A.*", f".*B.*"]).to_dict(), + ListDataPathsResponse( + paths=[ + f"{root}/A/", + f"{root}/A/B/", + f"{root}/A/B/C", + f"{root}/A/B/D", + f"{root}/A/C/", + f"{root}/A/C/E", + f"{root}/A/C/F", + f"{root}/B/", + f"{root}/B/C/", + f"{root}/B/C/G", + f"{root}/B/D/", + f"{root}/B/D/H", + f"{root}/C/B/", + f"{root}/C/B/A", + ] + ).to_dict(), + ) + + def test__handles__regex_exclude_patterns(self): + root = self.prep_regex_pattern_test() + + # single exclude pattern + self.assertHandles( + self.handler, + ListDataPathsRequest(path=root, exclude=".*B.*").to_dict(), + ListDataPathsResponse( + paths=[ + f"{root}/", + f"{root}/A/", + f"{root}/A/C/", + f"{root}/A/C/E", + f"{root}/A/C/F", + f"{root}/C/", + ] + ).to_dict(), + ) + + # multiple exclude patterns + self.assertHandles( + self.handler, + ListDataPathsRequest(path=root, exclude=[".*A.*", ".*B.*"]).to_dict(), + ListDataPathsResponse( + paths=[ + f"{root}/", + f"{root}/C/", + ] + ).to_dict(), + ) + + def test__handles__include_exclude_patterns(self): + root = self.prep_regex_pattern_test() + + # include and exclude patterns + self.assertHandles( + self.handler, + ListDataPathsRequest(path=root, include=".*A.*", exclude=".*B.*").to_dict(), + ListDataPathsResponse( + paths=[ + f"{root}/A/", + f"{root}/A/C/", + f"{root}/A/C/E", + f"{root}/A/C/F", + ] + ).to_dict(), + ) + + def prep_regex_pattern_test(self): + root = self.tmp_path() + self.add_files_to_file_system( + root, + ("A/B/C", 1), + ("A/B/D", 1), + ("A/C/E", 1), + ("A/C/F", 1), + ("B/C/G", 1), + ("B/D/H", 1), + ("C/B/A", 1), + ) + return root + class GetDataPathStatsHandlerTests(BaseFileSystemHandlerTestCase): @property