Skip to content

Commit

Permalink
fix(UploadDirectoryAsync): batch upload to prevent timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
MingboPeng committed Apr 17, 2024
1 parent aab69e8 commit cfeba48
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 24 deletions.
6 changes: 1 addition & 5 deletions src/PollinationSDK/Client/Configuration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,7 @@ public virtual string AccessToken
{
get
{
if (TokenRepo != null)
{
return TokenRepo.GetToken();
}
return "";
return TokenRepo?.GetToken();
}
}

Expand Down
1 change: 1 addition & 0 deletions src/PollinationSDK/Helper/AuthHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ private static bool CheckGetUser(AuthResult auth, out string errorMessage, bool
refreshURL: devEnv ? RefreshURL_Dev : RefreshURL,
idToken: auth.IDToken,
expiresInSeconds: auth.ExpiresInSeconds,
//expiresInSeconds: 62, // for testing. expires in 1 minute, but it should be refreshed in 2 seconds.
refreshToken: auth.RefreshToken
);
Helper.CurrentUser = Helper.GetUser();
Expand Down
53 changes: 41 additions & 12 deletions src/PollinationSDK/Helper/Helper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,49 @@ public static async Task<bool> UploadDirectoryAsync(Project project, string dire

var files = Directory.GetFiles(directory, "*", SearchOption.AllDirectories);
var api = new ArtifactsApi();


var tasks = files.Select(_ => UploadArtifactAsync(api, project, _, _.Replace(directory, ""))).ToList();
var total = files.Count();

LogHelper.LogInfo($"Uploading {total} assets for project {project.Name}");


var finished = 0;
var finishedPercent = 0;
reportProgressAction?.Invoke(finishedPercent);

var chunkSize = 20;
var subLists = files.Select((x, i) => new { Index = i, Value = x })
.GroupBy(x => x.Index / chunkSize)
.Select(x => x.Select(v => v.Value).ToList())
.ToList();

Action<int> reportPercent = (c) =>
{
var p = finished + c;
finishedPercent = (int)(p * 100 / total);
reportProgressAction?.Invoke(finishedPercent);
};

foreach (var chunk in subLists)
{
var chunkFiles = chunk.Select(_ => (_, _.Replace(directory, ""))).ToList();
await BatchExecute(api, project, chunkFiles, reportPercent, cancellationToken);
// auto refresh token between each chunk run
api.Configuration.TokenRepo?.CheckToken();
finished += chunk.Count;
}

LogHelper.LogInfo($"Finished uploading assets for project {project.Name}");

// canceled by user
if (cancellationToken.IsCancellationRequested) return false;

return true;
}

private static async Task<bool> BatchExecute(ArtifactsApi api, Project project, List<(string path, string relativePath)> files , Action<int> finishedCountProgressAction = default, CancellationToken cancellationToken = default)
{
var tasks = files.Select(_=> UploadArtifactAsync(api, project, _.path, _.relativePath)).ToList();

var finished = 0;
while (tasks.Count() > 0)
{
// canceled by user
Expand All @@ -116,25 +148,21 @@ public static async Task<bool> UploadDirectoryAsync(Project project, string dire
}

var finishedTask = await Task.WhenAny(tasks);
tasks.Remove(finishedTask);

if (finishedTask.IsFaulted || finishedTask.Exception != null)
{
LogHelper.LogError($"Upload exception: {finishedTask.Exception}");
throw finishedTask.Exception;
}

tasks.Remove(finishedTask);

var unfinishedUploadTasksCount = tasks.Count();
finishedPercent = (int)((total - unfinishedUploadTasksCount) / (double)total * 100);
reportProgressAction?.Invoke(finishedPercent);
finished++;
finishedCountProgressAction?.Invoke(finished);

}
LogHelper.LogInfo($"Finished uploading assets for project {project.Name}");

// canceled by user
if (cancellationToken.IsCancellationRequested) return false;

return true;
}

Expand All @@ -151,6 +179,7 @@ public static async Task<bool> UploadArtifactAsync(ArtifactsApi api, Project pro
if (fileRelativePath.StartsWith("/"))
fileRelativePath = fileRelativePath.Substring(1);


var artf = await api.CreateArtifactAsync(project.Owner.Name, project.Name, new KeyRequest(fileRelativePath));

//Use RestSharp
Expand All @@ -171,7 +200,7 @@ public static async Task<bool> UploadArtifactAsync(ArtifactsApi api, Project pro

restRequest.AddFile("file", filePath);

LogHelper.LogInfo($"Started upload of {relativePath}");
LogHelper.LogInfo($"Started upload of {fileRelativePath}");
var response = await restClient.ExecuteAsync(restRequest);

if (response.StatusCode == HttpStatusCode.NoContent)
Expand Down
22 changes: 15 additions & 7 deletions src/PollinationSDK/ManuallyAdded/Client/TokenRepo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ public TokenRepo(string refreshURL, string idToken, int expiresInSeconds, string
{
RefreshURL = refreshURL;
IDToken = idToken;
ExpiresAt = DateTime.Now.AddSeconds(expiresInSeconds);

// make it expire 1 mins early so that it refresh token before expiration.
ExpiresAt = DateTime.Now.AddSeconds(expiresInSeconds - 60);
RefreshToken = refreshToken;
this.LogTokenExpiration();
}
Expand Down Expand Up @@ -52,7 +54,8 @@ private void DoTokenRefresh()
}

this.IDToken = res.Data["id_token"];
this.ExpiresAt = DateTime.Now.AddSeconds(int.Parse(res.Data["expires_in"]));
var seconds = int.Parse(res.Data["expires_in"]);
this.ExpiresAt = DateTime.Now.AddSeconds(seconds);
this.RefreshToken = res.Data["refresh_token"];
}

Expand All @@ -63,13 +66,18 @@ private void DoTokenRefreshLogged()
LogHelper.LogInfo("Token refresh finished");
this.LogTokenExpiration();
}
public string GetToken()

public void CheckToken()
{
if (DateTime.Now >= this.ExpiresAt)
{
this.DoTokenRefreshLogged();
}
if (DateTime.Now < this.ExpiresAt)
return;

this.DoTokenRefreshLogged();
}

public string GetToken()
{
CheckToken();
return this.IDToken;
}
}
Expand Down

0 comments on commit cfeba48

Please sign in to comment.