Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage][DataMovement] Fixes to ensure we queue all work to chunk queue #48557

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 47 additions & 44 deletions sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,14 @@ public override async Task ProcessPartToChunkAsync()
return;
}
await OnTransferStateChangedAsync(TransferState.InProgress).ConfigureAwait(false);

if (!_sourceResource.Length.HasValue)
{
await UnknownDownloadInternal().ConfigureAwait(false);
await UnknownLengthDownloadInternal().ConfigureAwait(false);
}
else
{
await LengthKnownDownloadInternal().ConfigureAwait(false);
await KnownLengthDownloadInternal().ConfigureAwait(false);
}
}
catch (Exception ex)
Expand All @@ -220,36 +221,34 @@ public override async Task ProcessPartToChunkAsync()
}
}

internal async Task UnknownDownloadInternal()
internal async Task UnknownLengthDownloadInternal()
{
Task<StorageResourceReadStreamResult> initialTask = _sourceResource.ReadStreamAsync(
position: 0,
length: _initialTransferSize,
_cancellationToken);

try
{
StorageResourceReadStreamResult initialResult = default;
try
{
initialResult = await initialTask.ConfigureAwait(false);
initialResult = await _sourceResource.ReadStreamAsync(
position: 0,
length: _initialTransferSize,
_cancellationToken).ConfigureAwait(false);
}
catch
{
// Range not accepted, we need to attempt to use a default range
// This can happen if the source is empty.
initialResult = await _sourceResource.ReadStreamAsync(
cancellationToken: _cancellationToken)
.ConfigureAwait(false);
}
// If the initial request returned no content (i.e., a 304),
// we'll pass that back to the user immediately

long? initialLength = initialResult?.ContentLength;

// There needs to be at least 1 chunk to create the blob even if the
// length is 0 bytes.
if (initialResult == default || (initialLength ?? 0) == 0)
{
await CreateZeroLengthDownload().ConfigureAwait(false);
await QueueChunkToChannelAsync(CreateZeroLengthDownload).ConfigureAwait(false);
return;
}

Expand Down Expand Up @@ -285,41 +284,20 @@ internal async Task UnknownDownloadInternal()
}
}

internal async Task LengthKnownDownloadInternal()
internal async Task KnownLengthDownloadInternal()
{
long totalLength = _sourceResource.Length.Value;
if (totalLength == 0)
{
await CreateZeroLengthDownload().ConfigureAwait(false);
await QueueChunkToChannelAsync(CreateZeroLengthDownload).ConfigureAwait(false);
}
// Download with a single GET
else if (_initialTransferSize >= totalLength)
{
// To prevent requesting a range that is invalid when
// we already know the length we can just make one get blob request.
StorageResourceReadStreamResult result = await _sourceResource.
ReadStreamAsync(length: totalLength, cancellationToken: _cancellationToken)
await QueueChunkToChannelAsync(
async () =>
await DownloadSingle(totalLength).ConfigureAwait(false))
.ConfigureAwait(false);

long downloadLength = result.ContentLength.Value;
// This should not occur but add a check just in case
if (downloadLength != totalLength)
{
throw Errors.SingleDownloadLengthMismatch(totalLength, downloadLength);
}

bool successfulCopy = await CopyToStreamInternal(
offset: 0,
sourceLength: downloadLength,
source: result.Content,
expectedLength: totalLength,
initial: true).ConfigureAwait(false);
if (successfulCopy)
{
await ReportBytesWrittenAsync(downloadLength).ConfigureAwait(false);
// Queue the work to end the download
await QueueCompleteFileDownload().ConfigureAwait(false);
}
}
// Download in chunks
else
Expand All @@ -328,7 +306,6 @@ internal async Task LengthKnownDownloadInternal()
}
}

#region PartitionedDownloader
private async Task QueueChunksToChannel(long initialLength, long totalLength)
{
// Create Download Chunk event handler to manage when the ranges finish downloading
Expand All @@ -352,7 +329,7 @@ private async Task QueueChunksToChannel(long initialLength, long totalLength)
// return before it's completed downloading)
await QueueChunkToChannelAsync(
async () =>
await DownloadStreamingInternal(range: httpRange).ConfigureAwait(false))
await DownloadChunk(range: httpRange).ConfigureAwait(false))
.ConfigureAwait(false);
chunkCount++;
}
Expand Down Expand Up @@ -389,7 +366,35 @@ await _destinationResource.CompleteTransferAsync(
}
}

internal async Task DownloadStreamingInternal(HttpRange range)
private async Task DownloadSingle(long totalLength)
{
// To prevent requesting a range that is invalid when
// we already know the length we can just make one get blob request.
StorageResourceReadStreamResult result = await _sourceResource
.ReadStreamAsync(length: totalLength, cancellationToken: _cancellationToken)
.ConfigureAwait(false);

long downloadLength = result.ContentLength.Value;
// This should not occur but add a check just in case
if (downloadLength != totalLength)
{
throw Errors.SingleDownloadLengthMismatch(totalLength, downloadLength);
}

bool successfulCopy = await CopyToStreamInternal(
offset: 0,
sourceLength: downloadLength,
source: result.Content,
expectedLength: totalLength,
initial: true).ConfigureAwait(false);
if (successfulCopy)
{
await ReportBytesWrittenAsync(downloadLength).ConfigureAwait(false);
await CompleteFileDownload().ConfigureAwait(false);
}
}

private async Task DownloadChunk(HttpRange range)
{
try
{
Expand Down Expand Up @@ -488,7 +493,6 @@ private static IEnumerable<HttpRange> GetRanges(long initialLength, long totalLe
yield return new HttpRange(offset, Math.Min(totalLength - offset, rangeSize));
}
}
#endregion PartitionedDownloader

public override async Task InvokeSkippedArgAsync()
{
Expand Down Expand Up @@ -520,8 +524,7 @@ private async Task CreateZeroLengthDownload()
initial: true).ConfigureAwait(false);
if (successfulCreation)
{
// Queue the work to end the download
await QueueCompleteFileDownload().ConfigureAwait(false);
await CompleteFileDownload().ConfigureAwait(false);
}
else
{
Expand Down