Skip to content

Commit

Permalink
Merge pull request #17 from openwurl/ddb-batch-get
Browse files Browse the repository at this point in the history
Add dynamodb.batch_yield_items
  • Loading branch information
bbayles authored Apr 15, 2024
2 parents ea6534d + 861e031 commit 76faf04
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 2 deletions.
57 changes: 57 additions & 0 deletions boto3_helpers/dynamodb.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from boto3 import resource as boto3_resource
from time import sleep


def _table_or_name(x):
Expand Down Expand Up @@ -144,3 +145,59 @@ def update_attributes(ddb_table, key, update_map, **kwargs):
ExpressionAttributeValues=attrib_values,
**kwargs,
)


def batch_yield_items(
table_name,
all_keys,
ddb_resource=None,
batch_size=100,
backoff_base=0.1,
backoff_max=5,
**kwargs,
):
"""Do a series a DyanmoDB ``batch_get_item`` queries against a single table, taking
care of retries and paging. Yield the returned items as they are available.
* *table_name* is the name of the table.
* *all_keys* is an iterable of dictionaries with the keys for the
``batch_get_item`` operation.
* *ddb_resource* is a ``boto3.resource('dynamodb')`` instance. If not supplied, one
will be created.
* *batch_size* is the number of items to request per page (default: 100).
* *backoff_base* is the value, in seconds, of the exponential backoff base for
retries.
* *backoff_max* is the value, in seconds, of the maximum time to wait between
retries.
* *kwargs* are passed directly to the the ``batch_get_item`` method.
Usage:
.. code-block:: python
from boto3_helpers.dynamodb import batch_yield_items
all_keys = [
{'primary_key': '1', 'sort_key', 'a'},
{'primary_key': '1', 'sort_key', 'b'},
{'primary_key': '2', 'sort_key', 'a'},
{'primary_key': '2', 'sort_key', 'b'},
]
all_items = list('example-table', all_keys)
"""
ddb_resource = ddb_resource or boto3_resource('dynamodb')

i = 0
unprocessed_keys = list(all_keys)
while True:
batch_keys = unprocessed_keys[:batch_size]
unprocessed_keys = unprocessed_keys[batch_size:]
resp = ddb_resource.batch_get_item(
RequestItems={table_name: {'Keys': batch_keys}}, **kwargs
)
yield from resp['Responses'][table_name]
unprocessed_keys += resp.get('UnprocessedKeys', {}).get(table_name, [])
if not unprocessed_keys:
break
sleep(min(backoff_base * (2**i), backoff_max))
i += 1
85 changes: 83 additions & 2 deletions tests/test_dynamodb.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
from decimal import Decimal
from unittest import TestCase
from unittest.mock import patch
from unittest.mock import call as MockCall, patch

from boto3.dynamodb.conditions import Attr as ddb_attr, Key as ddb_key
from boto3 import resource as boto3_resource
from botocore.stub import Stubber

from boto3_helpers.dynamodb import query_table, scan_table, update_attributes
from boto3_helpers.dynamodb import (
batch_yield_items,
query_table,
scan_table,
update_attributes,
)


class DynamoDBTests(TestCase):
Expand Down Expand Up @@ -153,3 +158,79 @@ def test_update_attributes(self, mock_boto3_resource):
}
}
self.assertEqual(actual, expected)

@patch('boto3_helpers.dynamodb.sleep', autospec=True)
@patch('boto3_helpers.dynamodb.boto3_resource', autospec=True)
def test_batch_yield_items(self, mock_boto3_resource, mock_sleep):
table_name = 'test-table'
all_keys = [
{'primary_key': '1', 'sort_key': 'a'},
{'primary_key': '1', 'sort_key': 'b'},
{'primary_key': '2', 'sort_key': 'a'},
{'primary_key': '2', 'sort_key': 'b'},
{'primary_key': '3', 'sort_key': 'a'},
{'primary_key': '3', 'sort_key': 'b'},
]

mock_boto3_resource.return_value.batch_get_item.side_effect = [
{
'Responses': {table_name: all_keys[:3]},
'UnprocessedKeys': {table_name: all_keys[3:]},
},
{
'Responses': {table_name: all_keys[3:]},
'UnprocessedKeys': {},
},
]
actual = list(batch_yield_items(table_name, all_keys[:], backoff_base=0.1))
self.assertEqual(actual, all_keys)

mock_sleep.assert_called_once_with(0.1)
mock_boto3_resource.assert_called_once_with('dynamodb')
self.assertEqual(mock_boto3_resource.return_value.batch_get_item.call_count, 2)

@patch('boto3_helpers.dynamodb.sleep', autospec=True)
@patch('boto3_helpers.dynamodb.boto3_resource', autospec=True)
def test_batch_yield_items_batch_size(self, mock_boto3_resource, mock_sleep):
table_name = 'test-table'
all_keys = [
{'primary_key': '1', 'sort_key': 'a'},
{'primary_key': '1', 'sort_key': 'b'},
{'primary_key': '2', 'sort_key': 'a'},
{'primary_key': '2', 'sort_key': 'b'},
{'primary_key': '3', 'sort_key': 'a'},
{'primary_key': '3', 'sort_key': 'b'},
{'primary_key': '4', 'sort_key': 'a'},
{'primary_key': '4', 'sort_key': 'b'},
]

mock_boto3_resource.return_value.batch_get_item.side_effect = [
{
'Responses': {table_name: all_keys[:2]},
'UnprocessedKeys': {},
},
{
'Responses': {table_name: all_keys[2:4]},
'UnprocessedKeys': {},
},
{
'Responses': {table_name: all_keys[4:6]},
'UnprocessedKeys': {},
},
{
'Responses': {table_name: all_keys[6:8]},
'UnprocessedKeys': {},
},
]
actual = list(
batch_yield_items(
table_name, all_keys[:], batch_size=2, backoff_base=0.1, backoff_max=0.2
)
)
self.assertEqual(actual, all_keys)

self.assertEqual(
mock_sleep.mock_calls, [MockCall(0.1), MockCall(0.2), MockCall(0.2)]
)
mock_boto3_resource.assert_called_once_with('dynamodb')
self.assertEqual(mock_boto3_resource.return_value.batch_get_item.call_count, 4)

0 comments on commit 76faf04

Please sign in to comment.