Skip to content

Commit 989e391

Browse files
authored
fix: cancel tasks from tasks service mongodb request was not valid (#439)
2 parents 014edee + 2c3d2a2 commit 989e391

14 files changed

+417
-142
lines changed

Adaptors/Memory/src/TaskTable.cs

+10-8
Original file line numberDiff line numberDiff line change
@@ -240,12 +240,13 @@ public IAsyncEnumerable<string> ListTasksAsync(TaskFilter filter,
240240
}
241241

242242
/// <inheritdoc />
243-
public Task<(IEnumerable<TaskData> tasks, long totalCount)> ListTasksAsync(Expression<Func<TaskData, bool>> filter,
244-
Expression<Func<TaskData, object?>> orderField,
245-
bool ascOrder,
246-
int page,
247-
int pageSize,
248-
CancellationToken cancellationToken = default)
243+
public Task<(IEnumerable<T> tasks, long totalCount)> ListTasksAsync<T>(Expression<Func<TaskData, bool>> filter,
244+
Expression<Func<TaskData, object?>> orderField,
245+
Expression<Func<TaskData, T>> selector,
246+
bool ascOrder,
247+
int page,
248+
int pageSize,
249+
CancellationToken cancellationToken = default)
249250
{
250251
var queryable = taskId2TaskData_.AsQueryable()
251252
.Select(pair => pair.Value)
@@ -255,8 +256,9 @@ public IAsyncEnumerable<string> ListTasksAsync(TaskFilter filter,
255256
? queryable.OrderBy(orderField)
256257
: queryable.OrderByDescending(orderField);
257258

258-
return Task.FromResult<(IEnumerable<TaskData> tasks, long totalCount)>((ordered.Skip(page * pageSize)
259-
.Take(pageSize), ordered.Count()));
259+
return Task.FromResult<(IEnumerable<T> tasks, long totalCount)>((ordered.Skip(page * pageSize)
260+
.Take(pageSize)
261+
.Select(selector), ordered.Count()));
260262
}
261263

262264
/// <inheritdoc />

Adaptors/MongoDB/src/TaskTable.cs

+8-6
Original file line numberDiff line numberDiff line change
@@ -296,12 +296,13 @@ public async IAsyncEnumerable<string> ListTasksAsync(TaskFilter
296296
}
297297

298298
/// <inheritdoc />
299-
public async Task<(IEnumerable<TaskData> tasks, long totalCount)> ListTasksAsync(Expression<Func<TaskData, bool>> filter,
300-
Expression<Func<TaskData, object?>> orderField,
301-
bool ascOrder,
302-
int page,
303-
int pageSize,
304-
CancellationToken cancellationToken = default)
299+
public async Task<(IEnumerable<T> tasks, long totalCount)> ListTasksAsync<T>(Expression<Func<TaskData, bool>> filter,
300+
Expression<Func<TaskData, object?>> orderField,
301+
Expression<Func<TaskData, T>> selector,
302+
bool ascOrder,
303+
int page,
304+
int pageSize,
305+
CancellationToken cancellationToken = default)
305306
{
306307
using var activity = activitySource_.StartActivity($"{nameof(ListTasksAsync)}");
307308
var sessionHandle = sessionProvider_.Get();
@@ -319,6 +320,7 @@ public async IAsyncEnumerable<string> ListTasksAsync(TaskFilter
319320

320321
var taskList = ordered.Skip(page * pageSize)
321322
.Limit(pageSize)
323+
.Project(selector)
322324
.ToListAsync(cancellationToken);
323325

324326
var taskCount = findFluent2.CountDocumentsAsync(cancellationToken);

Common/src/Storage/ITaskTable.cs

+8-6
Original file line numberDiff line numberDiff line change
@@ -176,19 +176,21 @@ IAsyncEnumerable<string> ListTasksAsync(TaskFilter filter,
176176
/// </summary>
177177
/// <param name="filter">Filter to select tasks</param>
178178
/// <param name="orderField">Select the field that will be used to order the tasks</param>
179+
/// <param name="selector">Expression to select part of the returned task data</param>
179180
/// <param name="ascOrder">Is the order ascending</param>
180181
/// <param name="page">The page of results to retrieve</param>
181182
/// <param name="pageSize">The number of results pages</param>
182183
/// <param name="cancellationToken">Token used to cancel the execution of the method</param>
183184
/// <returns>
184185
/// Collection of task metadata matching the request and total number of results without paging
185186
/// </returns>
186-
Task<(IEnumerable<TaskData> tasks, long totalCount)> ListTasksAsync(Expression<Func<TaskData, bool>> filter,
187-
Expression<Func<TaskData, object?>> orderField,
188-
bool ascOrder,
189-
int page,
190-
int pageSize,
191-
CancellationToken cancellationToken = default);
187+
Task<(IEnumerable<T> tasks, long totalCount)> ListTasksAsync<T>(Expression<Func<TaskData, bool>> filter,
188+
Expression<Func<TaskData, object?>> orderField,
189+
Expression<Func<TaskData, T>> selector,
190+
bool ascOrder,
191+
int page,
192+
int pageSize,
193+
CancellationToken cancellationToken = default);
192194

193195
/// <summary>
194196
/// Find all tasks matching the given filter and ordering

Common/src/Storage/TaskData.cs

-51
Original file line numberDiff line numberDiff line change
@@ -254,57 +254,6 @@ public static implicit operator TaskRaw(TaskData taskData)
254254
InitialTaskId = taskData.InitialTaskId,
255255
};
256256

257-
/// <summary>
258-
/// Conversion operator from <see cref="TaskData" /> to <see cref="TaskSummary" />
259-
/// </summary>
260-
/// <param name="taskData">The input task data</param>
261-
/// <returns>
262-
/// The converted task data
263-
/// </returns>
264-
public static implicit operator TaskSummary(TaskData taskData)
265-
=> new()
266-
{
267-
SessionId = taskData.SessionId,
268-
Status = taskData.Status,
269-
OwnerPodId = taskData.OwnerPodId,
270-
Options = taskData.Options.ToGrpcTaskOptions(),
271-
CreatedAt = FromDateTime(taskData.CreationDate),
272-
EndedAt = taskData.EndDate is not null
273-
? FromDateTime(taskData.EndDate.Value)
274-
: null,
275-
Id = taskData.TaskId,
276-
PodTtl = taskData.PodTtl is not null
277-
? FromDateTime(taskData.PodTtl.Value)
278-
: null,
279-
StartedAt = taskData.StartDate is not null
280-
? FromDateTime(taskData.StartDate.Value)
281-
: null,
282-
Error = taskData.Status == TaskStatus.Error
283-
? taskData.Output.Error
284-
: "",
285-
StatusMessage = taskData.StatusMessage,
286-
SubmittedAt = taskData.SubmittedDate is not null
287-
? FromDateTime(taskData.SubmittedDate.Value)
288-
: null,
289-
AcquiredAt = taskData.AcquisitionDate is not null
290-
? FromDateTime(taskData.AcquisitionDate.Value)
291-
: null,
292-
ReceivedAt = taskData.ReceptionDate is not null
293-
? FromDateTime(taskData.ReceptionDate.Value)
294-
: null,
295-
PodHostname = taskData.OwnerPodName,
296-
CreationToEndDuration = taskData.CreationToEndDuration is not null
297-
? Duration.FromTimeSpan(taskData.CreationToEndDuration.Value)
298-
: null,
299-
ProcessingToEndDuration = taskData.ProcessingToEndDuration is not null
300-
? Duration.FromTimeSpan(taskData.ProcessingToEndDuration.Value)
301-
: null,
302-
InitialTaskId = taskData.InitialTaskId,
303-
CountDataDependencies = taskData.DataDependencies.Count,
304-
CountExpectedOutputIds = taskData.ExpectedOutputIds.Count,
305-
CountParentTaskIds = taskData.ParentTaskIds.Count,
306-
CountRetryOfIds = taskData.RetryOfIds.Count,
307-
};
308257

309258
/// <summary>
310259
/// Conversion operator from <see cref="TaskData" /> to <see cref="Application" />

Common/src/Storage/TaskDataSummary.cs

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// This file is part of the ArmoniK project
2+
//
3+
// Copyright (C) ANEO, 2021-2023. All rights reserved.
4+
//
5+
// This program is free software: you can redistribute it and/or modify
6+
// it under the terms of the GNU Affero General Public License as published
7+
// by the Free Software Foundation, either version 3 of the License, or
8+
// (at your option) any later version.
9+
//
10+
// This program is distributed in the hope that it will be useful,
11+
// but WITHOUT ANY WARRANTY, without even the implied warranty of
12+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
// GNU Affero General Public License for more details.
14+
//
15+
// You should have received a copy of the GNU Affero General Public License
16+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
17+
18+
using System;
19+
20+
using ArmoniK.Api.gRPC.V1;
21+
22+
using TaskOptions = ArmoniK.Core.Base.DataStructures.TaskOptions;
23+
24+
namespace ArmoniK.Core.Common.Storage;
25+
26+
/// <summary>
27+
/// Task metadata summary
28+
/// </summary>
29+
/// <param name="SessionId">Unique identifier of the session in which the task belongs</param>
30+
/// <param name="TaskId">Unique identifier of the task</param>
31+
/// <param name="OwnerPodId">Identifier of the polling agent running the task</param>
32+
/// <param name="ParentTaskIdsCount">
33+
/// Count of the tasks that submitted the current task up to the session id which
34+
/// represents a submission from the client
35+
/// </param>
36+
/// <param name="DataDependenciesCount">Count of identifiers of the results the task depends on</param>
37+
/// <param name="ExpectedOutputIdsCount">
38+
/// Count of the outputs the task should produce or should transmit the
39+
/// responsibility to produce
40+
/// </param>
41+
/// <param name="InitialTaskId">Task id before retry</param>
42+
/// <param name="RetryOfIdsCount">Count of the previous tasks ids before the current retry</param>
43+
/// <param name="Status">Current status of the task</param>
44+
/// <param name="StatusMessage">Message associated to the status</param>
45+
/// <param name="Options">Task options</param>
46+
/// <param name="CreationDate">Date when the task is created</param>
47+
/// <param name="SubmittedDate">Date when the task is submitted</param>
48+
/// <param name="StartDate">Date when the task execution begins</param>
49+
/// <param name="EndDate">Date when the task ends</param>
50+
/// <param name="ReceptionDate">Date when the task is received by the polling agent</param>
51+
/// <param name="AcquisitionDate">Date when the task is acquired by the pollster</param>
52+
/// <param name="ProcessingToEndDuration">Duration between the start of processing and the end of the task</param>
53+
/// <param name="CreationToEndDuration">Duration between the creation and the end of the task</param>
54+
/// <param name="PodTtl">Task Time To Live on the current pod</param>
55+
/// <param name="Output">Output of the task after its successful completion</param>
56+
public record TaskDataSummary(string SessionId,
57+
string TaskId,
58+
string OwnerPodId,
59+
string OwnerPodName,
60+
int ParentTaskIdsCount,
61+
int DataDependenciesCount,
62+
int ExpectedOutputIdsCount,
63+
string InitialTaskId,
64+
int RetryOfIdsCount,
65+
TaskStatus Status,
66+
string StatusMessage,
67+
TaskOptions Options,
68+
DateTime CreationDate,
69+
DateTime? SubmittedDate,
70+
DateTime? StartDate,
71+
DateTime? EndDate,
72+
DateTime? ReceptionDate,
73+
DateTime? AcquisitionDate,
74+
DateTime? PodTtl,
75+
TimeSpan? ProcessingToEndDuration,
76+
TimeSpan? CreationToEndDuration,
77+
Output Output);
+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// This file is part of the ArmoniK project
2+
//
3+
// Copyright (C) ANEO, 2021-2023. All rights reserved.
4+
//
5+
// This program is free software: you can redistribute it and/or modify
6+
// it under the terms of the GNU Affero General Public License as published
7+
// by the Free Software Foundation, either version 3 of the License, or
8+
// (at your option) any later version.
9+
//
10+
// This program is distributed in the hope that it will be useful,
11+
// but WITHOUT ANY WARRANTY, without even the implied warranty of
12+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
// GNU Affero General Public License for more details.
14+
//
15+
// You should have received a copy of the GNU Affero General Public License
16+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
17+
18+
using ArmoniK.Api.gRPC.V1;
19+
using ArmoniK.Api.gRPC.V1.Tasks;
20+
21+
using Google.Protobuf.WellKnownTypes;
22+
23+
using static Google.Protobuf.WellKnownTypes.Timestamp;
24+
25+
namespace ArmoniK.Core.Common.Storage;
26+
27+
public static class TaskDataSummaryExt
28+
{
29+
/// <summary>
30+
/// Conversion operator from <see cref="TaskDataSummary" /> to gRPC <see cref="TaskSummary" />
31+
/// </summary>
32+
/// <param name="taskDataSummary">The input task data</param>
33+
/// <returns>
34+
/// The converted task data
35+
/// </returns>
36+
public static TaskSummary ToTaskSummary(this TaskDataSummary taskDataSummary)
37+
=> new()
38+
{
39+
SessionId = taskDataSummary.SessionId,
40+
Status = taskDataSummary.Status,
41+
OwnerPodId = taskDataSummary.OwnerPodId,
42+
Options = taskDataSummary.Options.ToGrpcTaskOptions(),
43+
CreatedAt = FromDateTime(taskDataSummary.CreationDate),
44+
EndedAt = taskDataSummary.EndDate is not null
45+
? FromDateTime(taskDataSummary.EndDate.Value)
46+
: null,
47+
Id = taskDataSummary.TaskId,
48+
PodTtl = taskDataSummary.PodTtl is not null
49+
? FromDateTime(taskDataSummary.PodTtl.Value)
50+
: null,
51+
StartedAt = taskDataSummary.StartDate is not null
52+
? FromDateTime(taskDataSummary.StartDate.Value)
53+
: null,
54+
Error = taskDataSummary.Status == TaskStatus.Error
55+
? taskDataSummary.Output.Error
56+
: "",
57+
StatusMessage = taskDataSummary.StatusMessage,
58+
SubmittedAt = taskDataSummary.SubmittedDate is not null
59+
? FromDateTime(taskDataSummary.SubmittedDate.Value)
60+
: null,
61+
AcquiredAt = taskDataSummary.AcquisitionDate is not null
62+
? FromDateTime(taskDataSummary.AcquisitionDate.Value)
63+
: null,
64+
ReceivedAt = taskDataSummary.ReceptionDate is not null
65+
? FromDateTime(taskDataSummary.ReceptionDate.Value)
66+
: null,
67+
PodHostname = taskDataSummary.OwnerPodName,
68+
CreationToEndDuration = taskDataSummary.CreationToEndDuration is not null
69+
? Duration.FromTimeSpan(taskDataSummary.CreationToEndDuration.Value)
70+
: null,
71+
ProcessingToEndDuration = taskDataSummary.ProcessingToEndDuration is not null
72+
? Duration.FromTimeSpan(taskDataSummary.ProcessingToEndDuration.Value)
73+
: null,
74+
InitialTaskId = taskDataSummary.InitialTaskId,
75+
CountDataDependencies = taskDataSummary.DataDependenciesCount,
76+
CountExpectedOutputIds = taskDataSummary.ExpectedOutputIdsCount,
77+
CountParentTaskIds = taskDataSummary.ParentTaskIdsCount,
78+
CountRetryOfIds = taskDataSummary.RetryOfIdsCount,
79+
};
80+
}

0 commit comments

Comments
 (0)