Skip to content

Commit 76ace06

Browse files
GitHKAndrei Neagu
and
Andrei Neagu
authored
✨Streaming utils for zipping and reading/wiring to S3 (#7186)
Co-authored-by: Andrei Neagu <[email protected]>
1 parent 8754b76 commit 76ace06

File tree

41 files changed

+862
-61
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+862
-61
lines changed

packages/aws-library/requirements/_base.txt

+4
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ protobuf==5.29.3
244244
# opentelemetry-proto
245245
psutil==6.1.1
246246
# via -r requirements/../../../packages/service-library/requirements/_base.in
247+
pycryptodome==3.21.0
248+
# via stream-zip
247249
pydantic==2.10.6
248250
# via
249251
# -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt
@@ -368,6 +370,8 @@ six==1.17.0
368370
# via python-dateutil
369371
sniffio==1.3.1
370372
# via anyio
373+
stream-zip==0.0.83
374+
# via -r requirements/../../../packages/service-library/requirements/_base.in
371375
tenacity==9.0.0
372376
# via -r requirements/../../../packages/service-library/requirements/_base.in
373377
toolz==1.0.0

packages/aws-library/src/aws_library/s3/_client.py

+54
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@
1414
from botocore import exceptions as botocore_exc
1515
from botocore.client import Config
1616
from models_library.basic_types import SHA256Str
17+
from models_library.bytes_iters import BytesIter, DataSize
1718
from models_library.storage_schemas import ETag, S3BucketName, UploadedPart
1819
from pydantic import AnyUrl, ByteSize, TypeAdapter
20+
from servicelib.bytes_iters import DEFAULT_READ_CHUNK_SIZE, BytesStreamer
1921
from servicelib.logging_utils import log_catch, log_context
22+
from servicelib.s3_utils import FileLikeReader
2023
from servicelib.utils import limited_gather
2124
from settings_library.s3 import S3Settings
2225
from types_aiobotocore_s3 import S3Client
@@ -470,6 +473,57 @@ async def copy_objects_recursively(
470473
limit=_MAX_CONCURRENT_COPY,
471474
)
472475

476+
async def get_bytes_streamer_from_object(
477+
self,
478+
bucket_name: S3BucketName,
479+
object_key: S3ObjectKey,
480+
*,
481+
chunk_size: int = DEFAULT_READ_CHUNK_SIZE,
482+
) -> BytesStreamer:
483+
"""stream read an object from S3 chunk by chunk"""
484+
485+
# NOTE `download_fileobj` cannot be used to implement this because
486+
# it will buffer the entire file in memory instead of reading it
487+
# chunk by chunk
488+
489+
# below is a quick call
490+
head_response = await self._client.head_object(
491+
Bucket=bucket_name, Key=object_key
492+
)
493+
data_size = DataSize(head_response["ContentLength"])
494+
495+
async def _() -> BytesIter:
496+
# Download the file in chunks
497+
position = 0
498+
while position < data_size:
499+
# Calculate the range for this chunk
500+
end = min(position + chunk_size - 1, data_size - 1)
501+
range_header = f"bytes={position}-{end}"
502+
503+
# Download the chunk
504+
response = await self._client.get_object(
505+
Bucket=bucket_name, Key=object_key, Range=range_header
506+
)
507+
508+
chunk = await response["Body"].read()
509+
510+
# Yield the chunk for processing
511+
yield chunk
512+
513+
position += chunk_size
514+
515+
return BytesStreamer(data_size, _)
516+
517+
@s3_exception_handler(_logger)
518+
async def upload_object_from_file_like(
519+
self,
520+
bucket_name: S3BucketName,
521+
object_key: S3ObjectKey,
522+
file_like_reader: FileLikeReader,
523+
) -> None:
524+
"""streams write an object in S3 from an AsyncIterable[bytes]"""
525+
await self._client.upload_fileobj(file_like_reader, bucket_name, object_key) # type: ignore[arg-type]
526+
473527
@staticmethod
474528
def is_multipart(file_size: ByteSize) -> bool:
475529
return file_size >= MULTIPART_UPLOADS_MIN_TOTAL_SIZE

0 commit comments

Comments
 (0)