Skip to content

Commit d7e1c79

Browse files
committed
feat(parser): add model for the DynamoDB Stream Lambda invocation record
This commit adds a model for the invocation record that a Lambda with a DynamoDB event source sends to a SNS, SQS or S3 destination for failed invocations. The record format is described in https://docs.aws.amazon.com/lambda/latest/dg/services-dynamodb-errors.html The model can be used in the handler of a lambda that processes failed invocations from SNS or SQS by combining it with the source envelope model, e.g., from aws_lambda_powertools.utilities.parser import envelopes, event_parser from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamLambdaInvocationModel @event_parser(model=DDBStreamInvocationModel, envelope=envelopes.SqsEnvelope) def lambda_handler(event: list[DDBStreamInvocationModel], context: LambdaContext): ...
1 parent 0d97c70 commit d7e1c79

File tree

6 files changed

+134
-0
lines changed

6 files changed

+134
-0
lines changed

aws_lambda_powertools/utilities/parser/models/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
)
6767
from .dynamodb import (
6868
DynamoDBStreamChangedRecordModel,
69+
DynamoDBStreamLambdaInvocationModel,
6970
DynamoDBStreamModel,
7071
DynamoDBStreamRecordModel,
7172
)
@@ -173,6 +174,7 @@
173174
"DynamoDBStreamModel",
174175
"EventBridgeModel",
175176
"DynamoDBStreamChangedRecordModel",
177+
"DynamoDBStreamLambdaInvocationModel",
176178
"DynamoDBStreamRecordModel",
177179
"DynamoDBStreamChangedRecordModel",
178180
"KinesisDataStreamModel",

aws_lambda_powertools/utilities/parser/models/dynamodb.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,83 @@ class DynamoDBStreamModel(BaseModel):
110110
},
111111
],
112112
)
113+
114+
115+
class DDBStreamBatchInfo(BaseModel):
116+
approximateArrivalOfFirstRecord: datetime = Field(
117+
description="The approximate date and time when the first stream record from the batch was created"
118+
", in ISO-8601 format.",
119+
examples=["1970-01-01T00:00:00.000Z"],
120+
)
121+
approximateArrivalOfLastRecord: datetime = Field(
122+
description="The approximate date and time when the last stream record from the batch was created"
123+
", in ISO-8601 format.",
124+
examples=["1970-01-01T00:00:00.000Z"],
125+
)
126+
batchSize: int = Field(
127+
description="The size of the batch.",
128+
examples=[1],
129+
)
130+
endSequenceNumber: str = Field(
131+
description="The unique identifier of the last stream record from the batch.",
132+
examples=["222"],
133+
)
134+
shardId: str = Field(
135+
description="The unique identifier of the DynamoDB Stream shard that contains the records from the batch.",
136+
examples=["shardId-00000000000000000000-00000000"],
137+
)
138+
startSequenceNumber: str = Field(
139+
description="The unique identifier of the first stream record from the batch.",
140+
examples=["222"],
141+
)
142+
streamArn: str = Field(
143+
description="The Amazon Resource Name (ARN) of the DynamoDB stream.",
144+
examples=["arn:aws:dynamodb:us-west-2:123456789012:table/ExampleTable/stream/2021-01-01T00:00:00.000"],
145+
)
146+
147+
148+
class RequestContext(BaseModel):
149+
approximateInvokeCount: int = Field(
150+
description="The number of Lambda invocations for the record.",
151+
examples=[1],
152+
)
153+
condition: str = Field(
154+
description="The condition that caused the record to be discarded.",
155+
examples=["RetryAttemptsExhausted"],
156+
)
157+
functionArn: str = Field(
158+
description="The Amazon Resource Name (ARN) of the Lambda.",
159+
examples=["arn:aws:lambda:eu-west-1:809313241:function:test"],
160+
)
161+
requestId: str = Field(
162+
description="The unique identifier of the request.",
163+
)
164+
165+
166+
class ResponseContext(BaseModel):
167+
executedVersion: str = Field(
168+
description="The version of the Lambda executed",
169+
examples=["$LATEST"],
170+
)
171+
functionError: str = Field(
172+
description="",
173+
examples=["Unhandled"],
174+
)
175+
statusCode: int = Field(
176+
description="The status code returned by the Lambda",
177+
)
178+
179+
180+
# https://docs.aws.amazon.com/lambda/latest/dg/services-dynamodb-errors.html
181+
class DynamoDBStreamLambdaInvocationModel(BaseModel):
182+
DDBStreamBatchInfo: DDBStreamBatchInfo
183+
requestContext: RequestContext
184+
responseContext: ResponseContext
185+
timestamp: datetime = Field(
186+
description="The record time, in ISO-8601 format.",
187+
examples=["1970-01-01T00:00:00.000Z"],
188+
)
189+
version: str = Field(
190+
description="The version of the record format.",
191+
examples=["1.0"],
192+
)

docs/utilities/parser.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ The example above uses `SqsModel`. Other built-in models can be found below.
133133
| **CognitoCreateAuthChallengeTriggerModel** | Lambda User Pool Create Auth Challenge trigger event |
134134
| **CognitoVerifyAuthChallengeTriggerModel** | Lambda User Pool Verify Auth Challenge trigger event |
135135
| **DynamoDBStreamModel** | Lambda Event Source payload for Amazon DynamoDB Streams |
136+
| **DynamoDBStreamLambdaInvocationModel** | Lambda invocation record for Amazon DynamoDB Streams
136137
| **EventBridgeModel** | Lambda Event Source payload for Amazon EventBridge |
137138
| **IoTCoreThingEvent** | Lambda Event Source payload for IoT Core Thing created, updated, or deleted. |
138139
| **IoTCoreThingTypeEvent** | Lambda Event Source payload for IoT Core Thing Type events. |
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"requestContext": {
3+
"requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
4+
"functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
5+
"condition": "RetryAttemptsExhausted",
6+
"approximateInvokeCount": 1
7+
},
8+
"responseContext": {
9+
"statusCode": 200,
10+
"executedVersion": "$LATEST",
11+
"functionError": "Unhandled"
12+
},
13+
"version": "1.0",
14+
"timestamp": "2019-11-14T00:13:49Z",
15+
"DDBStreamBatchInfo": {
16+
"shardId": "shardId-00000001573689847184-864758bb",
17+
"startSequenceNumber": "800000000003126276362",
18+
"endSequenceNumber": "800000000003126276362",
19+
"approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z",
20+
"approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z",
21+
"batchSize": 1,
22+
"streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388"
23+
}
24+
}

tests/unit/parser/_pydantic/schemas.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from aws_lambda_powertools.utilities.parser.models import (
66
DynamoDBStreamChangedRecordModel,
7+
DynamoDBStreamLambdaInvocationModel,
78
DynamoDBStreamModel,
89
DynamoDBStreamRecordModel,
910
EventBridgeModel,

tests/unit/parser/_pydantic/test_dynamodb.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pytest
22

33
from aws_lambda_powertools.utilities.parser import ValidationError, envelopes, parse
4+
from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamLambdaInvocationModel
45
from tests.functional.utils import load_event
56
from tests.unit.parser._pydantic.schemas import MyAdvancedDynamoBusiness, MyDynamoBusiness
67

@@ -83,3 +84,28 @@ def test_validate_event_does_not_conform_with_model():
8384
raw_event: dict = {"hello": "s"}
8485
with pytest.raises(ValidationError):
8586
parse(event=raw_event, model=MyDynamoBusiness, envelope=envelopes.DynamoDBStreamEnvelope)
87+
88+
89+
def test_dynamo_db_stream_lambda_invocation_event():
90+
raw_event = load_event("dynamoStreamLambdaInvocationEvent.json")
91+
parsed_event: DynamoDBStreamLambdaInvocationModel = parse(
92+
event=raw_event,
93+
model=DynamoDBStreamLambdaInvocationModel,
94+
)
95+
assert (
96+
parsed_event.DDBStreamBatchInfo.approximateArrivalOfFirstRecord.strftime("%Y-%m-%dT%H:%M:%SZ")
97+
== raw_event["DDBStreamBatchInfo"]["approximateArrivalOfFirstRecord"]
98+
)
99+
assert (
100+
parsed_event.DDBStreamBatchInfo.approximateArrivalOfLastRecord.strftime("%Y-%m-%dT%H:%M:%SZ")
101+
== raw_event["DDBStreamBatchInfo"]["approximateArrivalOfLastRecord"]
102+
)
103+
assert parsed_event.DDBStreamBatchInfo.batchSize == raw_event["DDBStreamBatchInfo"]["batchSize"]
104+
assert parsed_event.DDBStreamBatchInfo.endSequenceNumber == raw_event["DDBStreamBatchInfo"]["endSequenceNumber"]
105+
assert parsed_event.DDBStreamBatchInfo.shardId == raw_event["DDBStreamBatchInfo"]["shardId"]
106+
assert parsed_event.DDBStreamBatchInfo.startSequenceNumber == raw_event["DDBStreamBatchInfo"]["startSequenceNumber"]
107+
assert parsed_event.DDBStreamBatchInfo.streamArn == raw_event["DDBStreamBatchInfo"]["streamArn"]
108+
assert parsed_event.requestContext.model_dump() == raw_event["requestContext"]
109+
assert parsed_event.responseContext.model_dump() == raw_event["responseContext"]
110+
assert parsed_event.timestamp.strftime("%Y-%m-%dT%H:%M:%SZ") == raw_event["timestamp"]
111+
assert parsed_event.version == raw_event["version"]

0 commit comments

Comments
 (0)