Skip to content

Commit

Permalink
singled threaded transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
aj-ya committed Jun 7, 2024
1 parent 509b655 commit a94a51e
Showing 1 changed file with 55 additions and 32 deletions.
87 changes: 55 additions & 32 deletions outpostcli/lfs/storage_class/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,38 +61,61 @@ def s3_multipart_upload(msg: Dict[str, Any]):
_log.info({"cores": cores})
bytes_so_far = 0
try:
with multimap(cores) as pmap:
for resp in pmap(
transfer_part,
(
PartInfo(
filepath,
part_no,
chunk_size,
signed_url,
)
for (part_no, signed_url) in pre_signed_urls
),
):
if isinstance(resp, ProxyLFSException):
raise LFSException(
code=resp.code,
message=resp.message,
doc_url=resp.doc_url,
)
else:
bytes_so_far += chunk_size
# Not precise but that's ok.
write_msg(
{
"event": "progress",
"oid": oid,
"bytesSoFar": bytes_so_far,
"bytesSinceLast": chunk_size,
}
)
parts.append(resp)
pass
for part_no, signed_url in pre_signed_urls:
part_info = PartInfo(filepath, part_no, chunk_size, signed_url)
resp = transfer_part(part_info)
if isinstance(resp, ProxyLFSException):
raise LFSException(
code=resp.code,
message=resp.message,
doc_url=resp.doc_url,
)
else:
bytes_so_far += chunk_size
# Not precise but that's ok.
write_msg(
{
"event": "progress",
"oid": oid,
"bytesSoFar": bytes_so_far,
"bytesSinceLast": chunk_size,
}
)
parts.append(resp)
pass

# with multimap(cores) as pmap:
# for resp in pmap(
# transfer_part,
# (
# PartInfo(
# filepath,
# part_no,
# chunk_size,
# signed_url,
# )
# for (part_no, signed_url) in pre_signed_urls
# ),
# ):
# if isinstance(resp, ProxyLFSException):
# raise LFSException(
# code=resp.code,
# message=resp.message,
# doc_url=resp.doc_url,
# )
# else:
# bytes_so_far += chunk_size
# # Not precise but that's ok.
# write_msg(
# {
# "event": "progress",
# "oid": oid,
# "bytesSoFar": bytes_so_far,
# "bytesSinceLast": chunk_size,
# }
# )
# parts.append(resp)
# pass
complete_resp = complete_multipart_upload(complete_url, parts)
if isinstance(complete_resp, LFSException):
abort_resp = abort_multipart_upload(abort_url)
Expand Down

0 comments on commit a94a51e

Please sign in to comment.