1
+ using System . Collections . Specialized ;
1
2
using System . Threading . Channels ;
2
3
using Microsoft . Extensions . DependencyInjection ;
3
4
using Microsoft . Extensions . Hosting ;
@@ -14,7 +15,7 @@ public async Task JobShouldRetryOnFailure()
14
15
{
15
16
var fakeTimer = new FakeTimeProvider ( ) ;
16
17
ServiceCollection . AddSingleton < TimeProvider > ( fakeTimer ) ;
17
- ServiceCollection . AddSingleton < MaxFailuresWrapper > ( new MaxFailuresWrapper ( 3 ) ) ;
18
+ ServiceCollection . AddSingleton < MaxFailuresWrapper > ( new MaxFailuresWrapper ( 2 ) ) ;
18
19
ServiceCollection . AddNCronJob ( n => n . AddJob < FailingJob > ( p => p . WithCronExpression ( "* * * * *" ) ) ) ;
19
20
var provider = CreateServiceProvider ( ) ;
20
21
@@ -23,9 +24,9 @@ public async Task JobShouldRetryOnFailure()
23
24
fakeTimer . Advance ( TimeSpan . FromMinutes ( 1 ) ) ;
24
25
25
26
// Validate that the job was retried the correct number of times
26
- // Fail 3 times = 3 retries + 1 success
27
+ // Total = 2 retries + 1 success
27
28
var attempts = await CommunicationChannel . Reader . ReadAsync ( CancellationToken ) ;
28
- attempts . ShouldBe ( 4 ) ;
29
+ attempts . ShouldBe ( 3 ) ;
29
30
}
30
31
31
32
[ Fact ]
@@ -101,18 +102,44 @@ public async Task CancelledJobIsStillAValidExecution()
101
102
ServiceCollection . AddNCronJob ( n => n . AddJob < CancelRetryingJob2 > ( p => p . WithCronExpression ( "* * * * *" ) ) ) ;
102
103
var provider = CreateServiceProvider ( ) ;
103
104
var jobExecutor = provider . GetRequiredService < JobExecutor > ( ) ;
104
- var cronRegistryEntries = provider . GetServices < JobDefinition > ( ) ;
105
- var cancelRetryingJobEntry = cronRegistryEntries . First ( entry => entry . Type == typeof ( CancelRetryingJob2 ) ) ;
105
+ var jobQueueManager = provider . GetRequiredService < JobQueueManager > ( ) ;
106
+ var jobQueue = jobQueueManager . GetOrAddQueue ( typeof ( CancelRetryingJob2 ) . FullName ! ) ;
107
+
108
+ JobRun ? nextJob = null ;
109
+ var tcs = new TaskCompletionSource < JobStateType > ( ) ;
110
+
111
+ jobQueue . CollectionChanged += ( sender , args ) =>
112
+ {
113
+ if ( args . Action == NotifyCollectionChangedAction . Add && nextJob == null )
114
+ {
115
+ nextJob = args . NewItems ? . OfType < JobRun > ( ) . FirstOrDefault ( ) ;
116
+ nextJob ! . CurrentState . Type . ShouldBe ( JobStateType . NotStarted ) ;
117
+ nextJob ! . JobExecutionCount . ShouldBe ( 0 ) ;
118
+ nextJob ! . OnStateChanged += ( s , e ) =>
119
+ {
120
+ if ( e == JobStateType . Cancelled )
121
+ {
122
+ tcs . SetResult ( e ) ;
123
+ }
124
+ } ;
125
+ }
126
+ } ;
127
+
106
128
await provider . GetRequiredService < IHostedService > ( ) . StartAsync ( CancellationToken ) ;
107
129
fakeTimer . Advance ( TimeSpan . FromMinutes ( 1 ) ) ;
108
-
130
+
131
+ while ( nextJob ! . CurrentState != JobStateType . Retrying )
132
+ {
133
+ await Task . Delay ( 1 ) ;
134
+ }
135
+ // wait until we're in retrying state before cancelling
109
136
jobExecutor . CancelJobs ( ) ;
110
137
111
- var cancellationHandled = await Task . WhenAny ( CancellationSignaled , Task . Delay ( 100 ) ) ;
112
- cancellationHandled . ShouldBe ( CancellationSignaled ) ;
113
-
114
- var jobRun = provider . GetRequiredService < IJobHistory > ( ) . GetAll ( ) . Single ( s => s . JobDefinition == cancelRetryingJobEntry ) ;
115
- jobRun . JobExecutionCount . ShouldBe ( 1 ) ;
138
+ var cancellationHandled = await Task . WhenAny ( tcs . Task , Task . Delay ( 1000 ) ) ;
139
+ cancellationHandled . ShouldBe ( tcs . Task ) ;
140
+ await Task . Delay ( 10 ) ;
141
+ nextJob ! . CurrentState . Type . ShouldBe ( JobStateType . Cancelled ) ;
142
+ nextJob ! . JobExecutionCount . ShouldBe ( 1 ) ;
116
143
}
117
144
118
145
[ Fact ]
@@ -143,7 +170,7 @@ public async Task JobShouldHonorApplicationCancellationDuringRetry()
143
170
144
171
private sealed record MaxFailuresWrapper ( int MaxFailuresBeforeSuccess = 3 ) ;
145
172
146
- [ RetryPolicy ( retryCount : 4 , PolicyType . FixedInterval ) ]
173
+ [ RetryPolicy ( retryCount : 3 , PolicyType . FixedInterval ) ]
147
174
private sealed class FailingJob ( ChannelWriter < object > writer , MaxFailuresWrapper maxFailuresWrapper )
148
175
: IJob
149
176
{
@@ -229,15 +256,13 @@ public async Task RunAsync(JobExecutionContext context, CancellationToken token)
229
256
}
230
257
}
231
258
232
- [ RetryPolicy ( retryCount : 4 , PolicyType . FixedInterval ) ]
259
+ [ RetryPolicy ( retryCount : 2 , PolicyType . FixedInterval ) ]
233
260
private sealed class CancelRetryingJob2 : IJob
234
261
{
235
262
public Task RunAsync ( JobExecutionContext context , CancellationToken token )
236
263
{
237
264
token . ThrowIfCancellationRequested ( ) ;
238
- return ! token . IsCancellationRequested
239
- ? throw new InvalidOperationException ( "Job Failed" )
240
- : Task . CompletedTask ;
265
+ throw new InvalidOperationException ( "Job Failed" ) ;
241
266
}
242
267
}
243
268
0 commit comments