Skip to content

Commit 89d9d56

Browse files
authored
Fixed Jobs SDK bugs (#1456)
fix: Point-in-time not getting scheduled, job payload not being property set on job invocation When setting a single point-in-time job, the SDK was incorrectly assigning it as a schedule which would promptly fail cron validation. Rather, this now properly sets it to `dueTime` instead. Further, when a Job is invoked, only the payload it was registered with is provided in the callback, not all the elements of a Get Job response, so this was modified to return the `ReadOnlyMemory<byte>` originally provided in the payload back to the caller. Reviewed by: @philliphoff Refs: #1455 #1457
1 parent ab3ef30 commit 89d9d56

File tree

7 files changed

+61
-173
lines changed

7 files changed

+61
-173
lines changed

daprdocs/content/en/dotnet-sdk-docs/dotnet-jobs/dotnet-jobs-howto.md

+6-5
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ the dependency injection registration in `Program.cs`, add the following line:
6363
```cs
6464
var builder = WebApplication.CreateBuilder(args);
6565

66-
//Add anywhere between these two
67-
builder.Services.AddDaprJobsClient(); //That's it
66+
//Add anywhere between these two lines
67+
builder.Services.AddDaprJobsClient();
6868

6969
var app = builder.Build();
7070
```
@@ -203,7 +203,8 @@ public class MySampleClass
203203
It's easy to set up a jobs endpoint if you're at all familiar with [minimal APIs in ASP.NET Core](https://learn.microsoft.com/en-us/aspnet/core/fundamentals/minimal-apis/overview) as the syntax is the same between the two.
204204

205205
Once dependency injection registration has been completed, configure the application the same way you would to handle mapping an HTTP request via the minimal API functionality in ASP.NET Core. Implemented as an extension method,
206-
pass the name of the job it should be responsive to and a delegate. Services can be injected into the delegate's arguments as you wish and you can optionally pass a `JobDetails` to get information about the job that has been triggered (e.g. access its scheduling setup or payload).
206+
pass the name of the job it should be responsive to and a delegate. Services can be injected into the delegate's arguments as you wish and the job payload can be accessed from the `ReadOnlyMemory<byte>` originally provided to the
207+
job registration.
207208

208209
There are two delegates you can use here. One provides an `IServiceProvider` in case you need to inject other services into the handler:
209210

@@ -216,7 +217,7 @@ builder.Services.AddDaprJobsClient();
216217
var app = builder.Build();
217218

218219
//Add our endpoint registration
219-
app.MapDaprScheduledJob("myJob", (IServiceProvider serviceProvider, string? jobName, JobDetails? jobDetails) => {
220+
app.MapDaprScheduledJob("myJob", (IServiceProvider serviceProvider, string jobName, ReadOnlyMemory<byte> jobPayload) => {
220221
var logger = serviceProvider.GetService<ILogger>();
221222
logger?.LogInformation("Received trigger invocation for '{jobName}'", "myJob");
222223

@@ -237,7 +238,7 @@ builder.Services.AddDaprJobsClient();
237238
var app = builder.Build();
238239

239240
//Add our endpoint registration
240-
app.MapDaprScheduledJob("myJob", (string? jobName, JobDetails? jobDetails) => {
241+
app.MapDaprScheduledJob("myJob", (string jobName, ReadOnlyMemory<byte> jobPayload) => {
241242
//Do something...
242243
});
243244

daprdocs/content/en/dotnet-sdk-docs/dotnet-jobs/dotnet-jobsclient-usage.md

+6-19
Original file line numberDiff line numberDiff line change
@@ -165,31 +165,18 @@ var oneWeekFromNow = now.AddDays(7);
165165
await daprJobsClient.ScheduleOneTimeJobWithPayloadAsync("myOtherJob", oneWeekFromNow, "This is a test!");
166166
```
167167

168-
The `JobDetails` type returns the data as a `ReadOnlyMemory<byte>?` so the developer has the freedom to deserialize
168+
The delegate handling the job invocation expects at least two arguments to be present:
169+
- A `string` that is populated with the `jobName`, providing the name of the invoked job
170+
- A `ReadOnlyMemory<byte>` that is populated with the bytes originally provided during the job registration.
171+
172+
Because the payload is stored as a `ReadOnlyMemory<byte>`, the developer has the freedom to serialize and deserialize
169173
as they wish, but there are again two helper extensions included that can deserialize this to either a JSON-compatible
170174
type or a string. Both methods assume that the developer encoded the originally scheduled job (perhaps using the
171175
helper serialization methods) as these methods will not force the bytes to represent something they're not.
172176

173177
To deserialize the bytes to a string, the following helper method can be used:
174178
```cs
175-
if (jobDetails.Payload is not null)
176-
{
177-
string payloadAsString = jobDetails.Payload.DeserializeToString(); //If successful, returns a string value with the value
178-
}
179-
```
180-
181-
To deserialize JSON-encoded UTF-8 bytes to the corresponding type, the following helper method can be used. An
182-
overload argument is available that permits the developer to pass in their own `JsonSerializerOptions` to be applied
183-
during deserialization.
184-
185-
```cs
186-
public sealed record Doodad (string Name, int Value);
187-
188-
//...
189-
if (jobDetails.Payload is not null)
190-
{
191-
var deserializedDoodad = jobDetails.Payload.DeserializeFromJsonBytes<Doodad>();
192-
}
179+
var payloadAsString = Encoding.UTF8.GetString(jobPayload.Span); //If successful, returns a string with the value
193180
```
194181

195182
## Error handling

examples/Jobs/JobsSample/Program.cs

+13-20
Original file line numberDiff line numberDiff line change
@@ -16,43 +16,36 @@
1616
using Dapr.Jobs;
1717
using Dapr.Jobs.Extensions;
1818
using Dapr.Jobs.Models;
19-
using Dapr.Jobs.Models.Responses;
2019

2120
var builder = WebApplication.CreateBuilder(args);
22-
23-
builder.Services.AddDaprJobsClient();
21+
builder.Logging.ClearProviders();
22+
builder.Logging.AddConsole();
2423

2524
var app = builder.Build();
2625

2726
//Set a handler to deal with incoming jobs
28-
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
29-
app.MapDaprScheduledJobHandler((string? jobName, DaprJobDetails? jobDetails, ILogger? logger, CancellationToken cancellationToken) =>
27+
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(15));
28+
app.MapDaprScheduledJobHandler(async (string jobName, ReadOnlyMemory<byte> jobPayload, ILogger? logger, CancellationToken cancellationToken) =>
3029
{
3130
logger?.LogInformation("Received trigger invocation for job '{jobName}'", jobName);
32-
if (jobDetails?.Payload is not null)
33-
{
34-
var deserializedPayload = Encoding.UTF8.GetString(jobDetails.Payload);
35-
logger?.LogInformation("Received invocation for the job '{jobName}' with payload '{deserializedPayload}'",
36-
jobName, deserializedPayload);
37-
//Do something that needs the cancellation token
38-
}
39-
else
40-
{
41-
logger?.LogWarning("Failed to deserialize payload for job '{jobName}'", jobName);
42-
}
31+
32+
var deserializedPayload = Encoding.UTF8.GetString(jobPayload.Span);
33+
logger?.LogInformation("Received invocation for the job '{jobName}' with payload '{deserializedPayload}'",
34+
jobName, deserializedPayload);
35+
await Task.Delay(TimeSpan.FromSeconds(3), cancellationToken);
36+
4337
return Task.CompletedTask;
4438
}, cancellationTokenSource.Token);
4539

46-
app.Run();
47-
48-
await using var scope = app.Services.CreateAsyncScope();
49-
var logger = scope.ServiceProvider.GetRequiredService<ILogger>();
40+
using var scope = app.Services.CreateScope();
41+
var logger = scope.ServiceProvider.GetRequiredService<ILogger<Program>>();
5042
var daprJobsClient = scope.ServiceProvider.GetRequiredService<DaprJobsClient>();
5143

5244
logger.LogInformation("Scheduling one-time job 'myJob' to execute 10 seconds from now");
5345
await daprJobsClient.ScheduleJobAsync("myJob", DaprJobSchedule.FromDateTime(DateTime.UtcNow.AddSeconds(10)),
5446
Encoding.UTF8.GetBytes("This is a test"));
5547
logger.LogInformation("Scheduled one-time job 'myJob'");
5648

49+
app.Run();
5750

5851
#pragma warning restore CS0618 // Type or member is obsolete

src/Dapr.Jobs/DaprJobsGrpcClient.cs

+12-3
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,22 @@ public override async Task ScheduleJobAsync(string jobName, DaprJobSchedule sche
7777
ArgumentNullException.ThrowIfNull(jobName, nameof(jobName));
7878
ArgumentNullException.ThrowIfNull(schedule, nameof(schedule));
7979

80-
var job = new Autogenerated.Job { Name = jobName, Schedule = schedule.ExpressionValue };
80+
var job = new Autogenerated.Job { Name = jobName };
8181

82-
if (startingFrom is not null)
82+
//Set up the schedule (recurring or point in time)
83+
if (schedule.IsPointInTimeExpression)
84+
{
85+
job.DueTime = schedule.ExpressionValue;
86+
}
87+
else if (schedule.IsCronExpression || schedule.IsDurationExpression || schedule.IsPrefixedPeriodExpression)
88+
{
89+
job.Schedule = schedule.ExpressionValue;
90+
}
91+
else if (startingFrom is not null)
8392
{
8493
job.DueTime = ((DateTimeOffset)startingFrom).ToString("O");
8594
}
86-
95+
8796
if (repeats is not null)
8897
{
8998
if (repeats < 0)

src/Dapr.Jobs/Extensions/EndpointRouteBuilderExtensions.cs

+14-20
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
// limitations under the License.
1212
// ------------------------------------------------------------------------
1313

14-
using System.Text.Json;
15-
using Dapr.Jobs.Models.Responses;
1614
using Microsoft.AspNetCore.Builder;
1715
using Microsoft.AspNetCore.Routing;
1816

@@ -29,8 +27,8 @@ public static class EndpointRouteBuilderExtensions
2927
/// </summary>
3028
/// <param name="endpoints">The <see cref="IEndpointRouteBuilder"/> to add the route to.</param>
3129
/// <param name="action">The asynchronous action provided by the developer that handles any inbound requests. The first two
32-
/// parameters must be a nullable <see cref="string"/> for the jobName and a nullable <see cref="DaprJobDetails"/> with the
33-
/// payload details, but otherwise can be populated with additional services to be injected into the delegate.</param>
30+
/// parameters must be a <see cref="string"/> for the jobName and the originally registered ReadOnlyMemory&lt;byte&gt; with the
31+
/// payload value, but otherwise can be populated with additional services to be injected into the delegate.</param>
3432
/// <param name="cancellationToken">Cancellation token that will be passed in as the last parameter to the delegate action.</param>
3533
public static IEndpointRouteBuilder MapDaprScheduledJobHandler(this IEndpointRouteBuilder endpoints,
3634
Delegate action, CancellationToken cancellationToken = default)
@@ -40,29 +38,25 @@ public static IEndpointRouteBuilder MapDaprScheduledJobHandler(this IEndpointRou
4038

4139
endpoints.MapPost("/job/{jobName}", async context =>
4240
{
43-
var jobName = (string?)context.Request.RouteValues["jobName"];
44-
DaprJobDetails? jobPayload = null;
41+
//Retrieve the name of the job from the request path
42+
var jobName = string.Empty;
43+
if (context.Request.RouteValues.TryGetValue("jobName", out var capturedJobName))
44+
{
45+
jobName = (string)capturedJobName!;
46+
}
4547

48+
//Retrieve the job payload from the request body
49+
ReadOnlyMemory<byte> payload = new();
4650
if (context.Request.ContentLength is > 0)
4751
{
48-
using var reader = new StreamReader(context.Request.Body);
49-
var body = await reader.ReadToEndAsync();
50-
51-
try
52-
{
53-
var deserializedJobPayload = JsonSerializer.Deserialize<DeserializableDaprJobDetails>(body);
54-
jobPayload = deserializedJobPayload?.ToType() ?? null;
55-
}
56-
catch (JsonException)
57-
{
58-
jobPayload = null;
59-
}
52+
using var streamContent = new StreamContent(context.Request.Body);
53+
payload = await streamContent.ReadAsByteArrayAsync(cancellationToken);
6054
}
6155

62-
var parameters = new Dictionary<Type, object?>
56+
var parameters = new Dictionary<Type, object>
6357
{
6458
{ typeof(string), jobName },
65-
{ typeof(DaprJobDetails), jobPayload },
59+
{ typeof(ReadOnlyMemory<byte>), payload },
6660
{ typeof(CancellationToken), CancellationToken.None }
6761
};
6862

src/Dapr.Jobs/Models/Responses/DaprJobDetails.cs

-52
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,6 @@
1111
// limitations under the License.
1212
// ------------------------------------------------------------------------
1313

14-
using System.Text.Json.Serialization;
15-
using Dapr.Jobs.JsonConverters;
16-
1714
namespace Dapr.Jobs.Models.Responses;
1815

1916
/// <summary>
@@ -46,52 +43,3 @@ public sealed record DaprJobDetails(DaprJobSchedule Schedule)
4643
/// </summary>
4744
public byte[]? Payload { get; init; } = null;
4845
}
49-
50-
/// <summary>
51-
/// A deserializable version of the <see cref="DaprJobDetails"/>.
52-
/// </summary>
53-
internal sealed record DeserializableDaprJobDetails
54-
{
55-
/// <summary>
56-
/// Represents the schedule that triggers the job.
57-
/// </summary>
58-
public string? Schedule { get; init; }
59-
60-
/// <summary>
61-
/// Allows for jobs with fixed repeat counts.
62-
/// </summary>
63-
public int? RepeatCount { get; init; } = null;
64-
65-
/// <summary>
66-
/// Identifies a point-in-time representing when the job schedule should start from,
67-
/// or as a "one-shot" time if other scheduling fields are not provided.
68-
/// </summary>
69-
[JsonConverter(typeof(Iso8601DateTimeJsonConverter))]
70-
public DateTimeOffset? DueTime { get; init; } = null;
71-
72-
/// <summary>
73-
/// A point-in-time value representing with the job should expire.
74-
/// </summary>
75-
/// <remarks>
76-
/// This must be greater than <see cref="DueTime"/> if both are set.
77-
/// </remarks>
78-
[JsonConverter(typeof(Iso8601DateTimeJsonConverter))]
79-
public DateTimeOffset? Ttl { get; init; } = null;
80-
81-
/// <summary>
82-
/// Stores the main payload of the job which is passed to the trigger function.
83-
/// </summary>
84-
public byte[]? Payload { get; init; } = null;
85-
86-
public DaprJobDetails ToType()
87-
{
88-
var schedule = DaprJobSchedule.FromExpression(Schedule ?? string.Empty);
89-
return new DaprJobDetails(schedule)
90-
{
91-
DueTime = DueTime,
92-
Payload = Payload,
93-
RepeatCount = RepeatCount,
94-
Ttl = Ttl
95-
};
96-
}
97-
}

test/Dapr.Jobs.Test/Extensions/EndpointRouteBuilderExtensionsTests.cs

+10-54
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,7 @@ public async Task MapDaprScheduledJobHandler_ValidRequest_ExecutesAction()
4040
var client = server.CreateClient();
4141

4242
var serializedPayload = JsonSerializer.Serialize(new SamplePayload("Dapr", 789));
43-
var serializedPayloadBytes = Encoding.UTF8.GetBytes(serializedPayload);
44-
var jobDetails = new DaprJobDetails(new DaprJobSchedule("0 0 * * *"))
45-
{
46-
RepeatCount = 5,
47-
DueTime = DateTimeOffset.UtcNow,
48-
Ttl = DateTimeOffset.UtcNow.AddHours(1),
49-
Payload = serializedPayloadBytes
50-
};
51-
var content = new StringContent(JsonSerializer.Serialize(jobDetails), Encoding.UTF8, "application/json");
43+
var content = new StringContent(serializedPayload, Encoding.UTF8, "application/json");
5244

5345
const string jobName = "testJob";
5446
var response = await client.PostAsync($"/job/{jobName}", content);
@@ -68,15 +60,7 @@ public async Task MapDaprScheduleJobHandler_HandleMissingCancellationToken()
6860
var client = server.CreateClient();
6961

7062
var serializedPayload = JsonSerializer.Serialize(new SamplePayload("Dapr", 789));
71-
var serializedPayloadBytes = Encoding.UTF8.GetBytes(serializedPayload);
72-
var jobDetails = new DaprJobDetails(new DaprJobSchedule("0 0 * * *"))
73-
{
74-
RepeatCount = 5,
75-
DueTime = DateTimeOffset.UtcNow,
76-
Ttl = DateTimeOffset.UtcNow.AddHours(1),
77-
Payload = serializedPayloadBytes
78-
};
79-
var content = new StringContent(JsonSerializer.Serialize(jobDetails), Encoding.UTF8, "application/json");
63+
var content = new StringContent(serializedPayload, Encoding.UTF8, "application/json");
8064

8165
const string jobName = "testJob";
8266
var response = await client.PostAsync($"/job/{jobName}", content);
@@ -89,31 +73,11 @@ public async Task MapDaprScheduleJobHandler_HandleMissingCancellationToken()
8973
Assert.Equal(serializedPayload, validator.SerializedPayload);
9074
}
9175

92-
93-
[Fact]
94-
public async Task MapDaprScheduledJobHandler_InvalidPayload()
95-
{
96-
// Arrange
97-
var server = CreateTestServer();
98-
var client = server.CreateClient();
99-
100-
var content = new StringContent("", Encoding.UTF8, "application/json");
101-
102-
// Act
103-
const string jobName = "testJob";
104-
var response = await client.PostAsync($"/job/{jobName}", content);
105-
106-
var validator = server.Services.GetRequiredService<Validator>();
107-
Assert.Equal(jobName, validator.JobName);
108-
Assert.Null(validator.SerializedPayload);
109-
}
110-
11176
private sealed record SamplePayload(string Name, int Count);
11277

11378
public sealed class Validator
11479
{
11580
public string? JobName { get; set; }
116-
11781
public string? SerializedPayload { get; set; }
11882
}
11983

@@ -130,15 +94,10 @@ private static TestServer CreateTestServer()
13094
app.UseRouting();
13195
app.UseEndpoints(endpoints =>
13296
{
133-
endpoints.MapDaprScheduledJobHandler(async (string? jobName, DaprJobDetails? jobDetails, Validator validator, CancellationToken cancellationToken) =>
97+
endpoints.MapDaprScheduledJobHandler(async (string jobName, ReadOnlyMemory<byte> jobPayload, Validator validator, CancellationToken cancellationToken) =>
13498
{
135-
if (jobName is not null)
136-
validator.JobName = jobName;
137-
if (jobDetails?.Payload is not null)
138-
{
139-
var payloadString = Encoding.UTF8.GetString(jobDetails.Payload);
140-
validator.SerializedPayload = payloadString;
141-
}
99+
validator.JobName = jobName;
100+
validator.SerializedPayload = Encoding.UTF8.GetString(jobPayload.Span);
142101
await Task.CompletedTask;
143102
});
144103
});
@@ -160,15 +119,12 @@ private static TestServer CreateTestServer2()
160119
app.UseRouting();
161120
app.UseEndpoints(endpoints =>
162121
{
163-
endpoints.MapDaprScheduledJobHandler(async (string? jobName, Validator validator, DaprJobDetails? jobDetails) =>
122+
endpoints.MapDaprScheduledJobHandler(async (string jobName, Validator validator, ReadOnlyMemory<byte> payload) =>
164123
{
165-
if (jobName is not null)
166-
validator.JobName = jobName;
167-
if (jobDetails?.Payload is not null)
168-
{
169-
var payloadString = Encoding.UTF8.GetString(jobDetails.Payload);
170-
validator.SerializedPayload = payloadString;
171-
}
124+
validator.JobName = jobName;
125+
126+
var payloadString = Encoding.UTF8.GetString(payload.Span);
127+
validator.SerializedPayload = payloadString;
172128
await Task.CompletedTask;
173129
});
174130
});

0 commit comments

Comments
 (0)