Skip to content

Commit

Permalink
fix(PathArguments): add JobRunner.UploadJobAssetsAsync()
Browse files Browse the repository at this point in the history
  • Loading branch information
MingboPeng committed Oct 27, 2023
1 parent 7adb1fe commit 2642a00
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 28 deletions.
38 changes: 38 additions & 0 deletions src/PollinationSDK/ManuallyAdded/Model/JobPathArgument.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@

namespace PollinationSDK
{
public partial class JobPathArgument
{
public void IsAssetUploaded(bool uploaded)
{
if (this.Annotations == null)
this.Annotations = new System.Collections.Generic.Dictionary<string, string>();

var p = "Uploaded";
var v = uploaded ? "true" : "false";

if (this.Annotations.TryGetValue(p, out var path))
{
this.Annotations[p] = v;
}
else
{
this.Annotations.Add(p, v);
}
}

public bool IsAssetUploaded()
{
if (this.Annotations == null)
return false;

if (this.Annotations.TryGetValue("Uploaded", out var isUploaded))
{
return isUploaded == "true";
}

return false;

}
}
}
13 changes: 13 additions & 0 deletions src/PollinationSDK/Wrapper/JobInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,19 @@ private async Task<ScheduledJobInfo> RunJobOnCloudAsync(Action<string> progressR
return jobInfo;
}

public async Task<Job> UploadJobAssetsAsync(Action<string> progressReporting = default, System.Threading.CancellationToken token = default)
{
if (string.IsNullOrEmpty(this.ProjectSlug) || this.IsLocalJob)
throw new ArgumentException($"Please call SetCloudJob() before running a job");

var proj = GetWritableProject();
var runner = new JobRunner(this);
var newJob = await JobRunner.UploadJobAssetsAsync(proj, this.Job, this.SubFolderPath, progressReporting, token);

return newJob;
}


public void AddArgument(JobArgument arg) => this.Job.AddArgument(arg);
public void AddArgument(JobPathArgument arg) => this.Job.AddArgument(arg);

Expand Down
79 changes: 51 additions & 28 deletions src/PollinationSDK/Wrapper/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,44 @@ public async Task<CloudJob> RunOnCloudAsync(Project project, Action<string> prog

}

public static async Task<Job> UploadJobAssetsAsync(
Project project,
Job job,
string subfolderPath,
Action<string> progressLogAction = default,
CancellationToken cancellationToken = default,
Action actionWhenDone = default)
{
// check artifacts
var tempProjectDir = CheckArtifacts(job, subfolderPath);

// upload artifacts
if (!string.IsNullOrEmpty(tempProjectDir))
{
Action<int> updateMessageProgress = (int p) => {
progressLogAction?.Invoke($"Preparing: [{p}%]");
};
await Helper.UploadDirectoryAsync(project, tempProjectDir, updateMessageProgress, cancellationToken);
}

// suspended by user
var emptyID = Guid.Empty;
if (cancellationToken.IsCancellationRequested)
{
progressLogAction?.Invoke($"Canceled: {cancellationToken.IsCancellationRequested}");
Helper.Logger.Information($"ScheduleRunAsync: canceled by user");
return null;
}

// update Artifact to cloud's relative path after uploaded.
var newJob = UpdateArtifactPath(job, subfolderPath);

actionWhenDone?.Invoke();

return newJob;
}


/// <summary>
/// Run and monitor the simulation on Pollination
/// </summary>
Expand All @@ -68,38 +106,13 @@ public async Task<CloudJob> RunOnCloudAsync(Project project, Action<string> prog
{
// Get project
var proj = project;
//var job = this._Job;

// Check if recipe can be used in this project
CheckRecipeInProject(job.Source, proj);

// Upload artifacts

// check artifacts
var tempProjectDir = CheckArtifacts(job, this.JobInfo.SubFolderPath);

// upload artifacts
if (!string.IsNullOrEmpty(tempProjectDir))
{
Action<int> updateMessageProgress = (int p) => {
progressLogAction?.Invoke($"Preparing: [{p}%]");
};
await Helper.UploadDirectoryAsync(proj, tempProjectDir, updateMessageProgress, cancellationToken);
}

// suspended by user
var emptyID = Guid.Empty;
if (cancellationToken.IsCancellationRequested)
{
progressLogAction?.Invoke($"Canceled: {cancellationToken.IsCancellationRequested}");
Helper.Logger.Information($"ScheduleRunAsync: canceled by user");
return null;
}

// update Artifact to cloud's relative path after uploaded.
var newJob = UpdateArtifactPath(job, this.JobInfo.SubFolderPath);
//var json = newJob.ToJson();

var newJob = await UploadJobAssetsAsync(project, job, this.JobInfo.SubFolderPath, progressLogAction, cancellationToken);

// create a new Simulation
var api = new JobsApi();
progressLogAction?.Invoke($"Start running.");
Expand Down Expand Up @@ -306,7 +319,7 @@ private static string CheckArtifacts(Job job, string subFolderPath)
var temp = string.Empty;
var arg = job.Arguments;

var artis = arg.SelectMany(_=>_.OfType<JobPathArgument>());
var artis = arg.SelectMany(_ => _.OfType<JobPathArgument>()).Where(_ => !_.IsAssetUploaded());
if (artis == null || !artis.Any()) return temp;

// remove old temp files first
Expand Down Expand Up @@ -391,6 +404,13 @@ private static Job UpdateArtifactPath(Job job, string subFolderPath)
{
if (item.Obj is JobPathArgument path)
{
if (path.IsAssetUploaded())
{
// do nothing
newJob.AddArgument(path);
continue;
}

// only update the path for ProjectFolderSource for a relative path
var projFolderSource = path.Source.Obj as ProjectFolder;
if (projFolderSource == null) continue;
Expand All @@ -400,6 +420,7 @@ private static Job UpdateArtifactPath(Job job, string subFolderPath)
if (!string.IsNullOrEmpty(subFolderPath)) newFileOrDirname = $"{subFolderPath}/{newFileOrDirname}";
var pSource = new ProjectFolder(path: newFileOrDirname);
var newPath = new JobPathArgument(path.Name, pSource);
newPath.IsAssetUploaded(true);

// add it to the last available argument set.
newJob.AddArgument(newPath);
Expand All @@ -418,6 +439,8 @@ private static Job UpdateArtifactPath(Job job, string subFolderPath)
return newJob;
}





}
Expand Down

0 comments on commit 2642a00

Please sign in to comment.