Skip to content

Commit

Permalink
S3Transfer handle checksums on copy
Browse files Browse the repository at this point in the history
  • Loading branch information
aemous committed Sep 24, 2024
1 parent 5876d36 commit f60d1ad
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 9 deletions.
5 changes: 5 additions & 0 deletions .changes/next-release/bugfix-copy-71252.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"type": "bugfix",
"category": "copy",
"description": "Added support for ``ChecksumAlgorithm`` when uploading copy data in parts."
}
20 changes: 17 additions & 3 deletions awscli/s3transfer/copies.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ def _submit_multipart_request(
num_parts,
transfer_future.meta.size,
)
# Get the checksum algorithm of the multipart request.
checksum_algorithm = call_args.extra_args.get("ChecksumAlgorithm")
part_futures.append(
self._transfer_coordinator.submit(
request_executor,
Expand All @@ -240,6 +242,7 @@ def _submit_multipart_request(
'extra_args': extra_part_args,
'callbacks': progress_callbacks,
'size': size,
'checksum_algorithm': checksum_algorithm,
},
pending_main_kwargs={
'upload_id': create_multipart_future
Expand Down Expand Up @@ -337,6 +340,7 @@ def _main(
extra_args,
callbacks,
size,
checksum_algorithm=None,
):
"""
:param client: The client to use when calling PutObject
Expand All @@ -351,24 +355,34 @@ def _main(
:param callbacks: List of callbacks to call after copy part
:param size: The size of the transfer. This value is passed into
the callbacks
:param checksum_algorithm: The algorithm that was used to create the multipart
upload
:rtype: dict
:returns: A dictionary representing a part::
{'Etag': etag_value, 'PartNumber': part_number}
This value can be appended to a list to be used to complete
the multipart upload.
the multipart upload. If a checksum is in the response,
it will also be included.
"""
response = client.upload_part_copy(
CopySource=copy_source,
Bucket=bucket,
Key=key,
UploadId=upload_id,
PartNumber=part_number,
**extra_args
**extra_args,
)
for callback in callbacks:
callback(bytes_transferred=size)
etag = response['CopyPartResult']['ETag']
return {'ETag': etag, 'PartNumber': part_number}
part_metadata = {'ETag': etag, 'PartNumber': part_number}
if checksum_algorithm:
checksum_member = f'Checksum{checksum_algorithm.upper()}'
if checksum_member in response['CopyPartResult']:
part_metadata[checksum_member] = response['CopyPartResult'][
checksum_member
]
return part_metadata
160 changes: 154 additions & 6 deletions tests/functional/s3transfer/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,29 @@ def test_copy(self):
future.result()
self.stubber.assert_no_pending_responses()

def test_copy_with_checksum(self):
self.extra_args['ChecksumAlgorithm'] = 'crc32'
expected_head_params = {
'Bucket': 'mysourcebucket',
'Key': 'mysourcekey',
}
expected_copy_object = {
'Bucket': self.bucket,
'Key': self.key,
'CopySource': self.copy_source,
'ChecksumAlgorithm': 'crc32',
}
self.add_head_object_response(expected_params=expected_head_params)
self.add_successful_copy_responses(
expected_copy_params=expected_copy_object
)

call_kwargs = self.create_call_kwargs()
call_kwargs['extra_args'] = self.extra_args
future = self.manager.copy(**call_kwargs)
future.result()
self.stubber.assert_no_pending_responses()

def test_copy_with_extra_args(self):
self.extra_args['MetadataDirective'] = 'REPLACE'

Expand Down Expand Up @@ -302,6 +325,7 @@ def setUp(self):
multipart_chunksize=4,
)
self._manager = TransferManager(self.client, self.config)
self.multipart_id = 'my-upload-id'

def create_stubbed_responses(self):
return [
Expand All @@ -311,7 +335,7 @@ def create_stubbed_responses(self):
},
{
'method': 'create_multipart_upload',
'service_response': {'UploadId': 'my-upload-id'},
'service_response': {'UploadId': self.multipart_id},
},
{
'method': 'upload_part_copy',
Expand All @@ -328,6 +352,84 @@ def create_stubbed_responses(self):
{'method': 'complete_multipart_upload', 'service_response': {}},
]

def add_get_head_response_with_default_expected_params(
self, extra_expected_params=None
):
expected_params = {
'Bucket': 'mysourcebucket',
'Key': 'mysourcekey',
}
if extra_expected_params:
expected_params.update(extra_expected_params)
response = self.create_stubbed_responses()[0]
response['expected_params'] = expected_params
self.stubber.add_response(**response)

def add_create_multipart_response_with_default_expected_params(
self, extra_expected_params=None
):
expected_params = {'Bucket': self.bucket, 'Key': self.key}
if extra_expected_params:
expected_params.update(extra_expected_params)
response = self.create_stubbed_responses()[1]
response['expected_params'] = expected_params
self.stubber.add_response(**response)

def add_upload_part_copy_responses_with_default_expected_params(
self, extra_expected_params=None
):
ranges = [
'bytes=0-5242879',
'bytes=5242880-10485759',
'bytes=10485760-13107199',
]
upload_part_responses = self.create_stubbed_responses()[2:-1]
for i, range_val in enumerate(ranges):
upload_part_response = upload_part_responses[i]
expected_params = {
'Bucket': self.bucket,
'Key': self.key,
'CopySource': self.copy_source,
'UploadId': self.multipart_id,
'PartNumber': i + 1,
'CopySourceRange': range_val,
}
if extra_expected_params:
if 'ChecksumAlgorithm' in extra_expected_params:
name = extra_expected_params['ChecksumAlgorithm']
checksum_member = 'Checksum%s' % name.upper()
response = upload_part_response['service_response']
response['CopyPartResult'][checksum_member] = 'sum%s==' % (
i + 1
)
else:
expected_params.update(extra_expected_params)

upload_part_response['expected_params'] = expected_params
self.stubber.add_response(**upload_part_response)

def add_complete_multipart_response_with_default_expected_params(
self, extra_expected_params=None
):
expected_params = {
'Bucket': self.bucket,
'Key': self.key,
'UploadId': self.multipart_id,
'MultipartUpload': {
'Parts': [
{'ETag': 'etag-1', 'PartNumber': 1},
{'ETag': 'etag-2', 'PartNumber': 2},
{'ETag': 'etag-3', 'PartNumber': 3},
]
},
}
if extra_expected_params:
expected_params.update(extra_expected_params)

response = self.create_stubbed_responses()[-1]
response['expected_params'] = expected_params
self.stubber.add_response(**response)

def create_expected_progress_callback_info(self):
# Note that last read is from the empty sentinel indicating
# that the stream is done.
Expand All @@ -341,8 +443,6 @@ def add_create_multipart_upload_response(self):
self.stubber.add_response(**self.create_stubbed_responses()[1])

def _get_expected_params(self):
upload_id = 'my-upload-id'

# Add expected parameters to the head object
expected_head_params = {
'Bucket': 'mysourcebucket',
Expand All @@ -368,7 +468,7 @@ def _get_expected_params(self):
'Bucket': self.bucket,
'Key': self.key,
'CopySource': self.copy_source,
'UploadId': upload_id,
'UploadId': self.multipart_id,
'PartNumber': i + 1,
'CopySourceRange': range_val,
}
Expand All @@ -378,7 +478,7 @@ def _get_expected_params(self):
expected_complete_mpu_params = {
'Bucket': self.bucket,
'Key': self.key,
'UploadId': upload_id,
'UploadId': self.multipart_id,
'MultipartUpload': {
'Parts': [
{'ETag': 'etag-1', 'PartNumber': 1},
Expand Down Expand Up @@ -441,6 +541,54 @@ def test_copy_with_extra_args(self):
future.result()
self.stubber.assert_no_pending_responses()

def test_copy_passes_checksums(self):
# This extra argument should be added to the head object,
# the create multipart upload, and upload part copy.
self.extra_args['ChecksumAlgorithm'] = 'sha256'

self.add_get_head_response_with_default_expected_params()

# ChecksumAlgorithm should be passed on the create_multipart call
self.add_create_multipart_response_with_default_expected_params(
self.extra_args,
)

# ChecksumAlgorithm should be passed to the upload_part_copy calls
self.add_upload_part_copy_responses_with_default_expected_params(
self.extra_args,
)

# The checksums should be used in the complete call like etags
self.add_complete_multipart_response_with_default_expected_params(
extra_expected_params={
'MultipartUpload': {
'Parts': [
{
'ETag': 'etag-1',
'PartNumber': 1,
'ChecksumSHA256': 'sum1==',
},
{
'ETag': 'etag-2',
'PartNumber': 2,
'ChecksumSHA256': 'sum2==',
},
{
'ETag': 'etag-3',
'PartNumber': 3,
'ChecksumSHA256': 'sum3==',
},
]
}
}
)

call_kwargs = self.create_call_kwargs()
call_kwargs['extra_args'] = self.extra_args
future = self.manager.copy(**call_kwargs)
future.result()
self.stubber.assert_no_pending_responses()

def test_copy_blacklists_args_to_create_multipart(self):
# This argument can never be used for multipart uploads
self.extra_args['MetadataDirective'] = 'COPY'
Expand Down Expand Up @@ -530,7 +678,7 @@ def test_abort_on_failure(self):
expected_params={
'Bucket': self.bucket,
'Key': self.key,
'UploadId': 'my-upload-id',
'UploadId': self.multipart_id,
},
)

Expand Down
30 changes: 30 additions & 0 deletions tests/unit/s3transfer/test_copies.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def setUp(self):
self.upload_id = 'myuploadid'
self.part_number = 1
self.result_etag = 'my-etag'
self.checksum_sha1 = 'my-checksum_sha1'

def get_copy_task(self, **kwargs):
default_kwargs = {
Expand Down Expand Up @@ -133,6 +134,35 @@ def test_main(self):
)
self.stubber.assert_no_pending_responses()

def test_main_with_checksum(self):
self.stubber.add_response(
'upload_part_copy',
service_response={
'CopyPartResult': {
'ETag': self.result_etag,
'ChecksumSHA1': self.checksum_sha1,
}
},
expected_params={
'Bucket': self.bucket,
'Key': self.key,
'CopySource': self.copy_source,
'UploadId': self.upload_id,
'PartNumber': self.part_number,
'CopySourceRange': self.copy_source_range,
},
)
task = self.get_copy_task(checksum_algorithm="sha1")
self.assertEqual(
task(),
{
'PartNumber': self.part_number,
'ETag': self.result_etag,
'ChecksumSHA1': self.checksum_sha1,
},
)
self.stubber.assert_no_pending_responses()

def test_extra_args(self):
self.extra_args['RequestPayer'] = 'requester'
self.stubber.add_response(
Expand Down

0 comments on commit f60d1ad

Please sign in to comment.