From 861e0316662d32dc3a9529a60e43e30caa236daf Mon Sep 17 00:00:00 2001 From: Bo Bayles Date: Mon, 15 Apr 2024 15:46:32 -0500 Subject: [PATCH] Add dynamodb.batch_yield_items --- boto3_helpers/dynamodb.py | 57 ++++++++++++++++++++++++++ tests/test_dynamodb.py | 85 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 140 insertions(+), 2 deletions(-) diff --git a/boto3_helpers/dynamodb.py b/boto3_helpers/dynamodb.py index dbdab16..b1063c0 100644 --- a/boto3_helpers/dynamodb.py +++ b/boto3_helpers/dynamodb.py @@ -1,4 +1,5 @@ from boto3 import resource as boto3_resource +from time import sleep def _table_or_name(x): @@ -144,3 +145,59 @@ def update_attributes(ddb_table, key, update_map, **kwargs): ExpressionAttributeValues=attrib_values, **kwargs, ) + + +def batch_yield_items( + table_name, + all_keys, + ddb_resource=None, + batch_size=100, + backoff_base=0.1, + backoff_max=5, + **kwargs, +): + """Do a series a DyanmoDB ``batch_get_item`` queries against a single table, taking + care of retries and paging. Yield the returned items as they are available. + + * *table_name* is the name of the table. + * *all_keys* is an iterable of dictionaries with the keys for the + ``batch_get_item`` operation. + * *ddb_resource* is a ``boto3.resource('dynamodb')`` instance. If not supplied, one + will be created. + * *batch_size* is the number of items to request per page (default: 100). + * *backoff_base* is the value, in seconds, of the exponential backoff base for + retries. + * *backoff_max* is the value, in seconds, of the maximum time to wait between + retries. + * *kwargs* are passed directly to the the ``batch_get_item`` method. + + Usage: + + .. code-block:: python + + from boto3_helpers.dynamodb import batch_yield_items + + all_keys = [ + {'primary_key': '1', 'sort_key', 'a'}, + {'primary_key': '1', 'sort_key', 'b'}, + {'primary_key': '2', 'sort_key', 'a'}, + {'primary_key': '2', 'sort_key', 'b'}, + ] + all_items = list('example-table', all_keys) + """ + ddb_resource = ddb_resource or boto3_resource('dynamodb') + + i = 0 + unprocessed_keys = list(all_keys) + while True: + batch_keys = unprocessed_keys[:batch_size] + unprocessed_keys = unprocessed_keys[batch_size:] + resp = ddb_resource.batch_get_item( + RequestItems={table_name: {'Keys': batch_keys}}, **kwargs + ) + yield from resp['Responses'][table_name] + unprocessed_keys += resp.get('UnprocessedKeys', {}).get(table_name, []) + if not unprocessed_keys: + break + sleep(min(backoff_base * (2**i), backoff_max)) + i += 1 diff --git a/tests/test_dynamodb.py b/tests/test_dynamodb.py index c8781d6..1a58204 100644 --- a/tests/test_dynamodb.py +++ b/tests/test_dynamodb.py @@ -1,12 +1,17 @@ from decimal import Decimal from unittest import TestCase -from unittest.mock import patch +from unittest.mock import call as MockCall, patch from boto3.dynamodb.conditions import Attr as ddb_attr, Key as ddb_key from boto3 import resource as boto3_resource from botocore.stub import Stubber -from boto3_helpers.dynamodb import query_table, scan_table, update_attributes +from boto3_helpers.dynamodb import ( + batch_yield_items, + query_table, + scan_table, + update_attributes, +) class DynamoDBTests(TestCase): @@ -153,3 +158,79 @@ def test_update_attributes(self, mock_boto3_resource): } } self.assertEqual(actual, expected) + + @patch('boto3_helpers.dynamodb.sleep', autospec=True) + @patch('boto3_helpers.dynamodb.boto3_resource', autospec=True) + def test_batch_yield_items(self, mock_boto3_resource, mock_sleep): + table_name = 'test-table' + all_keys = [ + {'primary_key': '1', 'sort_key': 'a'}, + {'primary_key': '1', 'sort_key': 'b'}, + {'primary_key': '2', 'sort_key': 'a'}, + {'primary_key': '2', 'sort_key': 'b'}, + {'primary_key': '3', 'sort_key': 'a'}, + {'primary_key': '3', 'sort_key': 'b'}, + ] + + mock_boto3_resource.return_value.batch_get_item.side_effect = [ + { + 'Responses': {table_name: all_keys[:3]}, + 'UnprocessedKeys': {table_name: all_keys[3:]}, + }, + { + 'Responses': {table_name: all_keys[3:]}, + 'UnprocessedKeys': {}, + }, + ] + actual = list(batch_yield_items(table_name, all_keys[:], backoff_base=0.1)) + self.assertEqual(actual, all_keys) + + mock_sleep.assert_called_once_with(0.1) + mock_boto3_resource.assert_called_once_with('dynamodb') + self.assertEqual(mock_boto3_resource.return_value.batch_get_item.call_count, 2) + + @patch('boto3_helpers.dynamodb.sleep', autospec=True) + @patch('boto3_helpers.dynamodb.boto3_resource', autospec=True) + def test_batch_yield_items_batch_size(self, mock_boto3_resource, mock_sleep): + table_name = 'test-table' + all_keys = [ + {'primary_key': '1', 'sort_key': 'a'}, + {'primary_key': '1', 'sort_key': 'b'}, + {'primary_key': '2', 'sort_key': 'a'}, + {'primary_key': '2', 'sort_key': 'b'}, + {'primary_key': '3', 'sort_key': 'a'}, + {'primary_key': '3', 'sort_key': 'b'}, + {'primary_key': '4', 'sort_key': 'a'}, + {'primary_key': '4', 'sort_key': 'b'}, + ] + + mock_boto3_resource.return_value.batch_get_item.side_effect = [ + { + 'Responses': {table_name: all_keys[:2]}, + 'UnprocessedKeys': {}, + }, + { + 'Responses': {table_name: all_keys[2:4]}, + 'UnprocessedKeys': {}, + }, + { + 'Responses': {table_name: all_keys[4:6]}, + 'UnprocessedKeys': {}, + }, + { + 'Responses': {table_name: all_keys[6:8]}, + 'UnprocessedKeys': {}, + }, + ] + actual = list( + batch_yield_items( + table_name, all_keys[:], batch_size=2, backoff_base=0.1, backoff_max=0.2 + ) + ) + self.assertEqual(actual, all_keys) + + self.assertEqual( + mock_sleep.mock_calls, [MockCall(0.1), MockCall(0.2), MockCall(0.2)] + ) + mock_boto3_resource.assert_called_once_with('dynamodb') + self.assertEqual(mock_boto3_resource.return_value.batch_get_item.call_count, 4)