Skip to content

Commit

Permalink
Upload build artifacts to blob with batching (#3403)
Browse files Browse the repository at this point in the history
* Upload build artifacts to blob with batching

* attempt to fix bad diff

* cleanup

* merge fix

* Adjust reporting timers

* thread safety

* fixes
  • Loading branch information
alex-peck authored Jun 4, 2021
1 parent 1be8295 commit 097ca60
Show file tree
Hide file tree
Showing 4 changed files with 335 additions and 60 deletions.
217 changes: 163 additions & 54 deletions src/Agent.Worker/Build/FileContainerServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
using Microsoft.VisualStudio.Services.WebApi;
using System.Net.Http;
using System.Net;
using BuildXL.Cache.ContentStore.Hashing;
using Microsoft.TeamFoundation.DistributedTask.WebApi;
using Microsoft.VisualStudio.Services.BlobStore.WebApi;

namespace Microsoft.VisualStudio.Services.Agent.Worker.Build
{
Expand All @@ -29,8 +27,6 @@ public class FileContainerServer
private readonly ConcurrentDictionary<string, ConcurrentQueue<string>> _fileUploadProgressLog = new ConcurrentDictionary<string, ConcurrentQueue<string>>();
private readonly FileContainerHttpClient _fileContainerHttpClient;
private readonly VssConnection _connection;
private DedupStoreClient _dedupClient;
private BlobStoreClientTelemetryTfs _blobTelemetry;

private CancellationTokenSource _uploadCancellationTokenSource;
private TaskCompletionSource<int> _uploadFinished;
Expand Down Expand Up @@ -98,8 +94,19 @@ public async Task<long> CopyToContainerAsync(

try
{
var uploadToBlob = String.Equals(context.GetVariableValueOrDefault(WellKnownDistributedTaskVariables.UploadBuildArtifactsToBlob), "true", StringComparison.InvariantCultureIgnoreCase)
&& !AgentKnobs.DisableBuildArtifactsToBlob.GetValue(context).AsBoolean();

// try upload all files for the first time.
UploadResult uploadResult = await ParallelUploadAsync(context, files, maxConcurrentUploads, _uploadCancellationTokenSource.Token);
UploadResult uploadResult;
if (uploadToBlob)
{
uploadResult = await BlobUploadAsync(context, files, maxConcurrentUploads, _uploadCancellationTokenSource.Token);
}
else
{
uploadResult = await ParallelUploadAsync(context, files, maxConcurrentUploads, _uploadCancellationTokenSource.Token);
}

if (uploadResult.FailedFiles.Count == 0)
{
Expand All @@ -121,7 +128,15 @@ public async Task<long> CopyToContainerAsync(

// Retry upload all failed files.
context.Output(StringUtil.Loc("FileUploadRetry", uploadResult.FailedFiles.Count));
UploadResult retryUploadResult = await ParallelUploadAsync(context, uploadResult.FailedFiles, maxConcurrentUploads, _uploadCancellationTokenSource.Token);
UploadResult retryUploadResult;
if (uploadToBlob)
{
retryUploadResult = await BlobUploadAsync(context, uploadResult.FailedFiles, maxConcurrentUploads, _uploadCancellationTokenSource.Token);
}
else
{
retryUploadResult = await ParallelUploadAsync(context, uploadResult.FailedFiles, maxConcurrentUploads, _uploadCancellationTokenSource.Token);
}

if (retryUploadResult.FailedFiles.Count == 0)
{
Expand Down Expand Up @@ -153,18 +168,6 @@ private async Task<UploadResult> ParallelUploadAsync(IAsyncCommandContext contex
return uploadResult;
}

var uploadToBlob = String.Equals(context.GetVariableValueOrDefault(WellKnownDistributedTaskVariables.UploadBuildArtifactsToBlob), "true", StringComparison.InvariantCultureIgnoreCase)
&& !AgentKnobs.DisableBuildArtifactsToBlob.GetValue(context).AsBoolean();
if (uploadToBlob)
{
var verbose = String.Equals(context.GetVariableValueOrDefault("system.debug"), "true", StringComparison.InvariantCultureIgnoreCase);
var (dedupClient, clientTelemetry) = await DedupManifestArtifactClientFactory.Instance
.CreateDedupClientAsync(verbose, (str) => context.Output(str), this._connection, token);

_dedupClient = dedupClient;
_blobTelemetry = clientTelemetry;
}

// ensure the file upload queue is empty.
if (!_fileUploadQueue.IsEmpty)
{
Expand All @@ -188,7 +191,7 @@ private async Task<UploadResult> ParallelUploadAsync(IAsyncCommandContext contex
List<Task<UploadResult>> parallelUploadingTasks = new List<Task<UploadResult>>();
for (int uploader = 0; uploader < concurrentUploads; uploader++)
{
parallelUploadingTasks.Add(UploadAsync(context, uploader, uploadToBlob, _uploadCancellationTokenSource.Token));
parallelUploadingTasks.Add(UploadAsync(context, uploader, _uploadCancellationTokenSource.Token));
}

// Wait for parallel upload finish.
Expand All @@ -203,24 +206,10 @@ private async Task<UploadResult> ParallelUploadAsync(IAsyncCommandContext contex
_uploadFinished.TrySetResult(0);
await uploadMonitor;

// report telemetry
if (uploadToBlob)
{
if (!Guid.TryParse(context.GetVariableValueOrDefault(WellKnownDistributedTaskVariables.PlanId), out var planId))
{
planId = Guid.Empty;
}
if (!Guid.TryParse(context.GetVariableValueOrDefault(WellKnownDistributedTaskVariables.JobId), out var jobId))
{
jobId = Guid.Empty;
}
await _blobTelemetry.CommitTelemetryUpload(planId, jobId);
}

return uploadResult;
}

private async Task<UploadResult> UploadAsync(IAsyncCommandContext context, int uploaderId, bool uploadToBlob, CancellationToken token)
private async Task<UploadResult> UploadAsync(IAsyncCommandContext context, int uploaderId, CancellationToken token)
{
List<string> failedFiles = new List<string>();
long uploadedSize = 0;
Expand All @@ -238,24 +227,10 @@ private async Task<UploadResult> UploadAsync(IAsyncCommandContext context, int u
long uploadLength = 0;
try
{
if (uploadToBlob)
using (FileStream fs = File.Open(fileToUpload, FileMode.Open, FileAccess.Read, FileShare.Read))
{
var result = await UploadToBlobStore(context, fileToUpload, token);
var retryHelper = new RetryHelper(context);

response = await retryHelper.Retry(async () => await _fileContainerHttpClient.CreateItemForArtifactUpload(_containerId, itemPath, _projectId,
result.dedupId.ValueString, (long) result.length, token),
(retryCounter) => (int) Math.Pow(retryCounter, 2) * 5,
(exception) => true);
uploadLength = (long) result.length;
}
else
{
using (FileStream fs = File.Open(fileToUpload, FileMode.Open, FileAccess.Read, FileShare.Read))
{
response = await _fileContainerHttpClient.UploadFileAsync(_containerId, itemPath, fs, _projectId, cancellationToken: token, chunkSize: 4 * 1024 * 1024);
uploadLength = fs.Length;
}
response = await _fileContainerHttpClient.UploadFileAsync(_containerId, itemPath, fs, _projectId, cancellationToken: token, chunkSize: 4 * 1024 * 1024);
uploadLength = fs.Length;
}
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
Expand Down Expand Up @@ -334,12 +309,137 @@ private async Task<UploadResult> UploadAsync(IAsyncCommandContext context, int u
return new UploadResult(failedFiles, uploadedSize);
}

private async Task<(DedupIdentifier dedupId, ulong length)> UploadToBlobStore(IAsyncCommandContext context, string itemPath, CancellationToken cancellationToken)
private async Task<UploadResult> BlobUploadAsync(IAsyncCommandContext context, IReadOnlyList<string> files, int concurrentUploads, CancellationToken token)
{
// return files that fail to upload and total artifact size
var uploadResult = new UploadResult();

// nothing needs to upload
if (files.Count == 0)
{
return uploadResult;
}

var verbose = String.Equals(context.GetVariableValueOrDefault("system.debug"), "true", StringComparison.InvariantCultureIgnoreCase);
var (dedupClient, clientTelemetry) = await DedupManifestArtifactClientFactory.Instance
.CreateDedupClientAsync(verbose, (str) => context.Output(str), this._connection, token);

// Upload to blobstore
var results = await BlobStoreUtils.UploadBatchToBlobstore(verbose, files, (level, uri, type) =>
new BuildArtifactActionRecord(level, uri, type, nameof(BlobUploadAsync), context), (str) => context.Output(str), dedupClient, clientTelemetry, token, enableReporting: true);

// Associate with TFS
context.Output(StringUtil.Loc("AssociateFiles"));
var queue = new ConcurrentQueue<BlobFileInfo>();
foreach (var file in results.fileDedupIds)
{
queue.Enqueue(file);
}

return await BlobStoreUtils.UploadToBlobStore(verbose, itemPath, (level, uri, type) =>
new BuildArtifactActionRecord(level, uri, type, nameof(UploadToBlobStore), context), (str) => context.Output(str), _dedupClient, _blobTelemetry, cancellationToken);
// Start associate monitor
var uploadFinished = new TaskCompletionSource<int>();
var associateMonitor = AssociateReportingAsync(context, files.Count(), uploadFinished, token);

// Start parallel associate tasks.
var parallelAssociateTasks = new List<Task<UploadResult>>();
for (int uploader = 0; uploader < concurrentUploads; uploader++)
{
parallelAssociateTasks.Add(AssociateAsync(context, queue, token));
}

// Wait for parallel associate tasks to finish.
await Task.WhenAll(parallelAssociateTasks);
foreach (var associateTask in parallelAssociateTasks)
{
// record all failed files.
uploadResult.AddUploadResult(await associateTask);
}

// Stop monitor task
uploadFinished.SetResult(0);
await associateMonitor;

// report telemetry
if (!Guid.TryParse(context.GetVariableValueOrDefault(WellKnownDistributedTaskVariables.PlanId), out var planId))
{
planId = Guid.Empty;
}
if (!Guid.TryParse(context.GetVariableValueOrDefault(WellKnownDistributedTaskVariables.JobId), out var jobId))
{
jobId = Guid.Empty;
}
await clientTelemetry.CommitTelemetryUpload(planId, jobId);

return uploadResult;
}

private async Task<UploadResult> AssociateAsync(IAsyncCommandContext context, ConcurrentQueue<BlobFileInfo> associateQueue, CancellationToken token)
{
var uploadResult = new UploadResult();

var retryHelper = new RetryHelper(context);
var uploadTimer = new Stopwatch();
while (associateQueue.TryDequeue(out var file))
{
uploadTimer.Restart();
string itemPath = (_containerPath.TrimEnd('/') + "/" + file.Path.Remove(0, _sourceParentDirectory.Length + 1)).Replace('\\', '/');
bool catchExceptionDuringUpload = false;
HttpResponseMessage response = null;
try
{
if (file.Success)
{
var length = (long) file.Node.TransitiveContentBytes;
response = await retryHelper.Retry(async () => await _fileContainerHttpClient.CreateItemForArtifactUpload(_containerId, itemPath, _projectId,
file.DedupId.ValueString, length, token),
(retryCounter) => (int) Math.Pow(retryCounter, 2) * 5,
(exception) => true);
uploadResult.TotalFileSizeUploaded += length;
}
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
context.Output(StringUtil.Loc("FileUploadCancelled", itemPath));
if (response != null)
{
response.Dispose();
}
throw;
}
catch (Exception ex)
{
catchExceptionDuringUpload = true;
context.Output(StringUtil.Loc("FileUploadFailed", itemPath, ex.Message));
context.Output(ex.ToString());
}
if (catchExceptionDuringUpload || (response != null && response.StatusCode != HttpStatusCode.Created) || !file.Success)
{
if (response != null)
{
context.Output(StringUtil.Loc("FileContainerUploadFailed", response.StatusCode, response.ReasonPhrase, file.Path, itemPath));
}
if (!file.Success)
{
context.Output(StringUtil.Loc("FileContainerUploadFailedBlob", file.Path, itemPath));
}

// tracking file that failed to upload.
uploadResult.FailedFiles.Add(file.Path);
}
else
{
context.Debug(StringUtil.Loc("FileUploadFinish", file.Path, uploadTimer.ElapsedMilliseconds));
}

if (response != null)
{
response.Dispose();
}

Interlocked.Increment(ref _filesProcessed);
}

return uploadResult;
}

private async Task ReportingAsync(IAsyncCommandContext context, int totalFiles, CancellationToken token)
Expand Down Expand Up @@ -368,6 +468,15 @@ private async Task ReportingAsync(IAsyncCommandContext context, int totalFiles,
}
}

private async Task AssociateReportingAsync(IAsyncCommandContext context, int totalFiles, TaskCompletionSource<int> uploadFinished, CancellationToken token)
{
while (!uploadFinished.Task.IsCompleted && !token.IsCancellationRequested)
{
context.Output(StringUtil.Loc("FileAssociateProgress", totalFiles, _filesProcessed, (_filesProcessed * 100) / totalFiles));
await Task.WhenAny(uploadFinished.Task, Task.Delay(10000, token));
}
}

private void UploadFileTraceReportReceived(object sender, ReportTraceEventArgs e)
{
ConcurrentQueue<string> logQueue = _fileUploadTraceLog.GetOrAdd(e.File, new ConcurrentQueue<string>());
Expand Down
21 changes: 21 additions & 0 deletions src/Microsoft.VisualStudio.Services.Agent/Blob/BlobFileInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using BuildXL.Cache.ContentStore.Hashing;

namespace Microsoft.VisualStudio.Services.Agent.Blob
{
public class BlobFileInfo
{
public DedupNode Node { get; set; }
public string Path { get; set; }
public DedupIdentifier DedupId
{
get
{
return Node.GetDedupIdentifier(HashType.Dedup64K);
}
}
public bool Success { get; set; }
}
}
Loading

0 comments on commit 097ca60

Please sign in to comment.