Skip to content

Commit 14df206

Browse files
committed
feat: Add Minimal API for dependent jobs
1 parent 7582f75 commit 14df206

File tree

10 files changed

+128
-27
lines changed

10 files changed

+128
-27
lines changed

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,19 @@ All notable changes to **NCronJob** will be documented in this file. The project
99
### Added
1010
- Ability to add a timezone for a "minimal job".
1111
- Run jobs automatically when a job either succeeded or failed allowing to model a job pipeline. By [@linkdotnet](https://github.com/linkdotnet).
12+
```csharp
13+
builder.Services.AddNCronJob(options =>
14+
{
15+
options.AddJob<ImportData>(p => p.WithCronExpression("0 0 * * *")
16+
.ExecuteWhen(
17+
success: s => s.RunJob<TransformData>("Optional Parameter"),
18+
faulted: s => s.RunJob<Notify>("Another Optional Parameter"));
19+
});
20+
```
21+
- Minimal API for instant jobs and job dependencies. By [@linkdotnet](https://github.com/linkdotnet).
22+
```csharp
23+
public void MyOtherMethod() => jobRegistry.RunInstantJob((MyOtherService service) => service.Do());
24+
```
1225

1326
### Changed
1427
- Replace `Microsoft.Extensions.Hosting` with `Microsoft.Extensions.Hosting.Abstractions` for better compatibility. Reported by [@chrisls121](https://github.com/chrisls121) in [#74](https://github.com/NCronJob-Dev/NCronJob/issues/74). Implemented by [@linkdotnet](https://github.com/linkdotnet).

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,18 @@ builder.Services.AddNCronJob(options =>
176176
});
177177
```
178178

179+
You just want to trigger a service and don't want to define a whole new job? No problem! The Minimal API is available here as well:
180+
181+
```csharp
182+
builder.Services.AddNCronJob(options =>
183+
{
184+
options.AddJob<ImportData>(p => p.WithCronExpression("0 0 * * *")
185+
.ExecuteWhen(
186+
success: s => s.RunJob(async (ITransformer transformer) => await transformer.TransformDataAsync()),
187+
faulted: s => s.RunJob(async (INotificationService notifier) => await notifier.NotifyAsync())
188+
});
189+
```
190+
179191
## Support & Contributing
180192

181193
Thanks to all [contributors](https://github.com/NCronJob-Dev/NCronJob/graphs/contributors) and people that are creating

docs/features/minimal-api.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,6 @@ The minimal API has some restrictions over the "full approach":
9191

9292
## Minimal API for instant Jobs
9393
The minimal API also supports instant jobs, for this check out the [Instant Jobs](instant-jobs.md) documentation.
94+
95+
## Minimal API for dependent jobs
96+
The minimal API also supports dependent jobs, for this check out the [Dependent Jobs](model-dependencies.md#minimal-api) documentation.

docs/features/model-dependencies.md

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ Services.AddNCronJob(options =>
1616
Both `success` and `faulted` are optional so that you can define only one of them if needed. `RunJob` can be chained to allow multiple jobs to run after the completion of the main job.
1717

1818
This allows very complex job dependencies to be defined in a simple and readable way.
19-
![dependencies](../assets/flow.png)
19+
![dependencies](../assets/flow.webp)
2020

2121
### Passing parameters to dependent jobs
2222
The `RunJob` method allows optional parameters to be passed to the dependent job.
@@ -91,3 +91,29 @@ builder.Services.AddNCronJob(options =>
9191
options.AddJob<JobA>().ExecuteWhen(success: s => s.RunJob<JobB>());
9292
});
9393
```
94+
95+
## Minimal API
96+
The `ExecuteWhen` method can also be used in a [Minimal API](minimal-api.md) to define job dependencies:
97+
```csharp
98+
builder.Services.AddNCronJob(options =>
99+
{
100+
options.AddJob<ImportDataJob>().ExecuteWhen(
101+
success: s => s.RunJob(async (ITransfomerService transformerService) => await transformerService.TransformDataAsync()),
102+
faulted: f => f.RunJob(async (INotificationService notificationService) => await notificationService.SendNotificationAsync()));
103+
});
104+
```
105+
106+
### Getting the parent job's output in a Minimal API
107+
If you pass in a `JobExecutionContext` to the dependent job, you can access the parent job's output:
108+
109+
```csharp
110+
builder.Services.AddNCronJob(options =>
111+
{
112+
options.AddJob<ImportDataJob>().ExecuteWhen(
113+
success: s => s.RunJob(async (JobExecutionContext context, ITransfomerService transformerService) =>
114+
{
115+
var parentOutput = (MyDataModel)context.ParentOutput;
116+
await transformerService.TransformDataAsync(parentOutput);
117+
}));
118+
});
119+
```

src/NCronJob/Configuration/DependencyBuilder.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using Microsoft.Extensions.DependencyInjection;
2+
13
namespace NCronJob;
24

35
/// <summary>
@@ -7,6 +9,9 @@ public sealed class DependencyBuilder<TPrincipalJob>
79
where TPrincipalJob : IJob
810
{
911
private readonly List<JobDefinition> dependentJobOptions = [];
12+
private readonly IServiceCollection services;
13+
14+
internal DependencyBuilder(IServiceCollection services) => this.services = services;
1015

1116
/// <summary>
1217
/// Adds a job that runs after the principal job has finished with a given <paramref name="parameter"/>.
@@ -21,5 +26,23 @@ public DependencyBuilder<TPrincipalJob> RunJob<TJob>(object? parameter = null)
2126
return this;
2227
}
2328

29+
/// <summary>
30+
/// Adds an anonymous delegate job that runs after the principal job has finished.
31+
/// </summary>
32+
/// <param name="jobDelegate">The delegate that represents the job to be executed. This delegate must return either void or Task.</param>
33+
public DependencyBuilder<TPrincipalJob> RunJob(Delegate jobDelegate)
34+
{
35+
ArgumentNullException.ThrowIfNull(jobDelegate);
36+
37+
var jobPolicyMetadata = new JobExecutionAttributes(jobDelegate);
38+
var entry = new JobDefinition(typeof(DynamicJobFactory), null, null, null,
39+
JobName: DynamicJobNameGenerator.GenerateJobName(jobDelegate),
40+
JobPolicyMetadata: jobPolicyMetadata);
41+
dependentJobOptions.Add(entry);
42+
services.AddSingleton(entry);
43+
services.AddSingleton(new DynamicJobRegistration(entry, sp => new DynamicJobFactory(sp, jobDelegate)));
44+
return this;
45+
}
46+
2447
internal List<JobDefinition> GetDependentJobOption() => dependentJobOptions;
2548
}

src/NCronJob/Configuration/DynamicJobFactory.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ public DynamicJobFactory(IServiceProvider serviceProvider, Delegate jobAction)
2929
).ToArray();
3030
}
3131

32-
private static Func<object[], Task> BuildInvoker(Delegate jobAction)
32+
private static Func<object[], Task> BuildInvoker(Delegate jobDelegate)
3333
{
34-
var method = jobAction.Method;
34+
var method = jobDelegate.Method;
3535
var returnType = method.ReturnType;
3636
var param = Expression.Parameter(typeof(object[]), "args");
3737
var args = method.GetParameters().Select((p, index) =>
3838
Expression.Convert(Expression.ArrayIndex(param, Expression.Constant(index)), p.ParameterType)).ToArray();
39-
var call = Expression.Call(Expression.Constant(jobAction.Target), method, args);
39+
var call = Expression.Call(Expression.Constant(jobDelegate.Target), method, args);
4040

4141
if (returnType == typeof(Task))
4242
{

src/NCronJob/Configuration/NCronJobOptionBuilder.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
using Microsoft.Extensions.DependencyInjection;
33
using Microsoft.Extensions.DependencyInjection.Extensions;
44
using System.Reflection;
5-
using System.Text;
65

76
namespace NCronJob;
87

@@ -202,7 +201,7 @@ public INotificationStage<TJob> ExecuteWhen(Action<DependencyBuilder<TJob>>? suc
202201
{
203202
if (success is not null)
204203
{
205-
var dependencyBuilder = new DependencyBuilder<TJob>();
204+
var dependencyBuilder = new DependencyBuilder<TJob>(services);
206205
success(dependencyBuilder);
207206
var runWhenSuccess = dependencyBuilder.GetDependentJobOption();
208207
runWhenSuccess.ForEach(s =>
@@ -215,7 +214,7 @@ public INotificationStage<TJob> ExecuteWhen(Action<DependencyBuilder<TJob>>? suc
215214

216215
if (faulted is not null)
217216
{
218-
var dependencyBuilder = new DependencyBuilder<TJob>();
217+
var dependencyBuilder = new DependencyBuilder<TJob>(services);
219218
faulted(dependencyBuilder);
220219
var runWhenFaulted = dependencyBuilder.GetDependentJobOption();
221220
runWhenFaulted.ForEach(s =>
@@ -279,7 +278,7 @@ public INotificationStage<TJob> ExecuteWhen(Action<DependencyBuilder<TJob>>? suc
279278
{
280279
if (success is not null)
281280
{
282-
var dependencyBuilder = new DependencyBuilder<TJob>();
281+
var dependencyBuilder = new DependencyBuilder<TJob>(services);
283282
success(dependencyBuilder);
284283
var runWhenSuccess = dependencyBuilder.GetDependentJobOption();
285284
runWhenSuccess.ForEach(s =>
@@ -292,7 +291,7 @@ public INotificationStage<TJob> ExecuteWhen(Action<DependencyBuilder<TJob>>? suc
292291

293292
if (faulted is not null)
294293
{
295-
var dependencyBuilder = new DependencyBuilder<TJob>();
294+
var dependencyBuilder = new DependencyBuilder<TJob>(services);
296295
faulted(dependencyBuilder);
297296
var runWhenFaulted = dependencyBuilder.GetDependentJobOption();
298297
runWhenFaulted.ForEach(s =>

src/NCronJob/Registry/DynamicJobFactoryRegistry.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,25 @@ public DynamicJobFactoryRegistry(
1313
map = entries.ToDictionary(e => e.JobDefinition.JobFullName, v => v.DynamicJobFactoryResolver);
1414
}
1515

16-
public JobDefinition Add(Delegate jobAction)
16+
public JobDefinition Add(Delegate jobDelegate)
1717
{
18-
var jobPolicyMetadata = new JobExecutionAttributes(jobAction);
18+
var jobPolicyMetadata = new JobExecutionAttributes(jobDelegate);
1919
var entry = new JobDefinition(typeof(DynamicJobFactory), null, null, null,
20-
JobName: DynamicJobNameGenerator.GenerateJobName(jobAction),
20+
JobName: DynamicJobNameGenerator.GenerateJobName(jobDelegate),
2121
JobPolicyMetadata: jobPolicyMetadata);
2222
jobRegistry.Add(entry);
23-
map[entry.JobFullName] = serviceProvider => new DynamicJobFactory(serviceProvider, jobAction);
23+
map[entry.JobFullName] = serviceProvider => new DynamicJobFactory(serviceProvider, jobDelegate);
2424

2525
return entry;
2626
}
2727

28+
/// <summary>
29+
/// Gets the job instance and removes it from the registry. The instance will be drained so that the Garbage Collector can collect it.
30+
/// </summary>
31+
/// <remarks>
32+
/// This function is called for triggering instant jobs. As the time interval between executions can be long (to indefinite),
33+
/// the job instance should be removed from the registry to prevent memory leaks.
34+
/// </remarks>
2835
public IJob GetAndDrainJobInstance(IServiceProvider serviceProvider, JobDefinition jobDefinition)
2936
{
3037
var element = map[jobDefinition.JobFullName](serviceProvider);

src/NCronJob/Registry/IInstantJobRegistry.cs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ void RunInstantJob<TJob>(object? parameter = null, CancellationToken token = def
2929
/// Runs an instant job, which gets directly executed.
3030
/// </summary>
3131
/// <remarks>
32-
/// The <paramref name="jobAction"/> delegate supports, like <see cref="ServiceCollectionExtensions.AddNCronJob"/>, that services can be retrieved dynamically.
32+
/// The <paramref name="jobDelegate"/> delegate supports, like <see cref="ServiceCollectionExtensions.AddNCronJob"/>, that services can be retrieved dynamically.
3333
/// Also the <see cref="CancellationToken"/> can be retrieved in this way.
3434
/// </remarks>
35-
/// <param name="jobAction">The delegate to execute.</param>
36-
void RunInstantJob(Delegate jobAction);
35+
/// <param name="jobDelegate">The delegate to execute.</param>
36+
void RunInstantJob(Delegate jobDelegate);
3737

3838
/// <summary>
3939
/// Runs a job that will be executed after the given <paramref name="delay"/>.
@@ -47,13 +47,13 @@ void RunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, Cancellatio
4747
/// <summary>
4848
/// Runs a job that will be executed after the given <paramref name="delay"/>.
4949
/// </summary>
50-
/// <param name="jobAction">The delegate to execute.</param>
50+
/// <param name="jobDelegate">The delegate to execute.</param>
5151
/// <param name="delay">The delay until the job will be executed.</param>
5252
/// <remarks>
53-
/// The <paramref name="jobAction"/> delegate supports, like <see cref="ServiceCollectionExtensions.AddNCronJob"/>, that services can be retrieved dynamically.
53+
/// The <paramref name="jobDelegate"/> delegate supports, like <see cref="ServiceCollectionExtensions.AddNCronJob"/>, that services can be retrieved dynamically.
5454
/// Also the <see cref="CancellationToken"/> can be retrieved in this way.
5555
/// </remarks>
56-
void RunScheduledJob(Delegate jobAction, TimeSpan delay);
56+
void RunScheduledJob(Delegate jobDelegate, TimeSpan delay);
5757

5858
/// <summary>
5959
/// Runs a job that will be executed at <paramref name="startDate"/>.
@@ -67,13 +67,13 @@ void RunScheduledJob<TJob>(DateTimeOffset startDate, object? parameter = null, C
6767
/// <summary>
6868
/// Runs a job that will be executed at the given <paramref name="startDate"/>.
6969
/// </summary>
70-
/// <param name="jobAction">The delegate to execute.</param>
70+
/// <param name="jobDelegate">The delegate to execute.</param>
7171
/// <param name="startDate">The starting point when the job will be executed.</param>
7272
/// <remarks>
73-
/// The <paramref name="jobAction"/> delegate supports, like <see cref="ServiceCollectionExtensions.AddNCronJob"/>, that services can be retrieved dynamically.
73+
/// The <paramref name="jobDelegate"/> delegate supports, like <see cref="ServiceCollectionExtensions.AddNCronJob"/>, that services can be retrieved dynamically.
7474
/// Also the <see cref="CancellationToken"/> can be retrieved in this way.
7575
/// </remarks>
76-
void RunScheduledJob(Delegate jobAction, DateTimeOffset startDate);
76+
void RunScheduledJob(Delegate jobDelegate, DateTimeOffset startDate);
7777
}
7878

7979
internal sealed partial class InstantJobRegistry : IInstantJobRegistry
@@ -102,7 +102,7 @@ public InstantJobRegistry(
102102
public void RunInstantJob<TJob>(object? parameter = null, CancellationToken token = default)
103103
where TJob : IJob => RunScheduledJob<TJob>(TimeSpan.Zero, parameter, token);
104104

105-
public void RunInstantJob(Delegate jobAction) => RunScheduledJob(jobAction, TimeSpan.Zero);
105+
public void RunInstantJob(Delegate jobDelegate) => RunScheduledJob(jobDelegate, TimeSpan.Zero);
106106

107107
/// <inheritdoc />
108108
public void RunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, CancellationToken token = default)
@@ -112,10 +112,10 @@ public void RunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, Canc
112112
RunScheduledJob<TJob>(utcNow + delay, parameter, token);
113113
}
114114

115-
public void RunScheduledJob(Delegate jobAction, TimeSpan delay)
115+
public void RunScheduledJob(Delegate jobDelegate, TimeSpan delay)
116116
{
117117
var utcNow = timeProvider.GetUtcNow();
118-
RunScheduledJob(jobAction, utcNow + delay);
118+
RunScheduledJob(jobDelegate, utcNow + delay);
119119
}
120120

121121
/// <inheritdoc />
@@ -135,9 +135,9 @@ public void RunScheduledJob<TJob>(DateTimeOffset startDate, object? parameter =
135135
jobQueue.EnqueueForDirectExecution(run, startDate);
136136
}
137137

138-
public void RunScheduledJob(Delegate jobAction, DateTimeOffset startDate)
138+
public void RunScheduledJob(Delegate jobDelegate, DateTimeOffset startDate)
139139
{
140-
var definition = dynamicJobFactoryRegistry.Add(jobAction);
140+
var definition = dynamicJobFactoryRegistry.Add(jobDelegate);
141141
var run = JobRun.Create(definition);
142142
run.Priority = JobPriority.High;
143143

tests/NCronJob.Tests/RunDependentJobTests.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,24 @@ public async Task SkipChildrenShouldPreventDependentJobsFromRunning()
8383
storage.Guids.Count.ShouldBe(1);
8484
}
8585

86+
[Fact]
87+
public async Task WhenJobWasSuccessful_DependentAnonymousJobShouldRun()
88+
{
89+
var fakeTimer = new FakeTimeProvider();
90+
ServiceCollection.AddSingleton<TimeProvider>(fakeTimer);
91+
Func<ChannelWriter<object>, JobExecutionContext, Task> execution = async (writer, context) => await writer.WriteAsync($"Parent: {context.ParentOutput}");
92+
ServiceCollection.AddNCronJob(n => n.AddJob<PrincipalJob>()
93+
.ExecuteWhen(success: s => s.RunJob(execution)));
94+
var provider = CreateServiceProvider();
95+
await provider.GetRequiredService<IHostedService>().StartAsync(CancellationToken);
96+
97+
provider.GetRequiredService<IInstantJobRegistry>().RunInstantJob<PrincipalJob>(true);
98+
99+
using var tcs = new CancellationTokenSource(TimeSpan.FromMilliseconds(500));
100+
var result = await CommunicationChannel.Reader.ReadAsync(tcs.Token) as string;
101+
result.ShouldBe("Parent: Success");
102+
}
103+
86104
private sealed class PrincipalJob : IJob
87105
{
88106
public Task RunAsync(JobExecutionContext context, CancellationToken token)

0 commit comments

Comments
 (0)