Skip to content

Commit dc330b4

Browse files
authored
feat: 清洗任务增加重试次数记录和日志展示 (#280)
* feat: 清洗任务增加重试次数记录和日志展示 * feat: 数据清洗改为数据处理
1 parent 91b1e08 commit dc330b4

File tree

18 files changed

+76
-29
lines changed

18 files changed

+76
-29
lines changed

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public CleaningTaskDto createTask(CreateCleaningTaskRequest request) {
142142

143143
prepareTask(task, request.getInstance(), executorType);
144144
scanDataset(taskId, request.getSrcDatasetId());
145-
taskScheduler.executeTask(taskId);
145+
146146
return task;
147147
}
148148

@@ -157,9 +157,12 @@ public List<CleaningResultDto> getTaskResults(String taskId) {
157157
return cleaningResultRepo.findByInstanceId(taskId);
158158
}
159159

160-
public List<CleaningTaskLog> getTaskLog(String taskId) {
160+
public List<CleaningTaskLog> getTaskLog(String taskId, int retryCount) {
161161
cleanTaskValidator.checkTaskId(taskId);
162162
String logPath = FLOW_PATH + "/" + taskId + "/output.log";
163+
if (retryCount > 0) {
164+
logPath += "." + retryCount;
165+
}
163166
try (Stream<String> lines = Files.lines(Paths.get(logPath))) {
164167
List<CleaningTaskLog> logs = new ArrayList<>();
165168
AtomicReference<String> lastLevel = new AtomicReference<>("INFO");
@@ -215,7 +218,7 @@ public void executeTask(String taskId) {
215218
CleaningTaskDto task = cleaningTaskRepo.findTaskById(taskId);
216219
scanDataset(taskId, task.getSrcDatasetId(), succeedSet);
217220
cleaningResultRepo.deleteByInstanceId(taskId, "FAILED");
218-
taskScheduler.executeTask(taskId);
221+
taskScheduler.executeTask(taskId, task.getRetryCount() + 1);
219222
}
220223

221224
private void prepareTask(CleaningTaskDto task, List<OperatorInstanceDto> instances, ExecutorType executorType) {

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/scheduler/CleaningTaskScheduler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,16 @@ public class CleaningTaskScheduler {
2020

2121
private final ExecutorService taskExecutor = Executors.newFixedThreadPool(5);
2222

23-
public void executeTask(String taskId) {
24-
taskExecutor.submit(() -> submitTask(taskId));
23+
public void executeTask(String taskId, int retryCount) {
24+
taskExecutor.submit(() -> submitTask(taskId, retryCount));
2525
}
2626

27-
private void submitTask(String taskId) {
27+
private void submitTask(String taskId, int retryCount) {
2828
CleaningTaskDto task = new CleaningTaskDto();
2929
task.setId(taskId);
3030
task.setStatus(CleaningTaskStatusEnum.RUNNING);
3131
task.setStartedAt(LocalDateTime.now());
32+
task.setRetryCount(retryCount);
3233
cleaningTaskRepo.updateTask(task);
3334
runtimeClient.submitTask(taskId);
3435
}

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/entity/CleaningTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public class CleaningTask extends BaseEntity<String> {
3636

3737
private Integer fileCount;
3838

39+
private Integer retryCount;
40+
3941
private LocalDateTime startedAt;
4042

4143
private LocalDateTime finishedAt;

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTaskDto.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ public class CleaningTaskDto {
3838

3939
private Integer fileCount;
4040

41+
private Integer retryCount;
42+
4143
private CleaningTaskStatusEnum status;
4244

4345
private String templateId;

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTaskController.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.datamate.cleaning.interfaces.rest;
22

33
import com.datamate.cleaning.application.CleaningTaskService;
4+
import com.datamate.cleaning.application.scheduler.CleaningTaskScheduler;
45
import com.datamate.cleaning.interfaces.dto.*;
56
import com.datamate.common.interfaces.PagedResponse;
67
import lombok.RequiredArgsConstructor;
@@ -18,6 +19,8 @@
1819
public class CleaningTaskController {
1920
private final CleaningTaskService cleaningTaskService;
2021

22+
private final CleaningTaskScheduler taskScheduler;
23+
2124
@GetMapping
2225
public PagedResponse<CleaningTaskDto> cleaningTasksGet(
2326
@RequestParam("page") Integer page,
@@ -36,7 +39,9 @@ public CleaningTaskDto cleaningTasksPost(@McpToolParam(description = "创建任
3639
if (request.getInstance().isEmpty() && StringUtils.isNotBlank(request.getTemplateId())) {
3740
request.setInstance(cleaningTaskService.getInstanceByTemplateId(request.getTemplateId()));
3841
}
39-
return cleaningTaskService.createTask(request);
42+
CleaningTaskDto task = cleaningTaskService.createTask(request);
43+
taskScheduler.executeTask(task.getId(), 0);
44+
return task;
4045
}
4146

4247
@PostMapping("/{taskId}/stop")
@@ -74,8 +79,9 @@ public List<CleaningResultDto> cleaningTasksTaskIdGetResult(@PathVariable("taskI
7479
return cleaningTaskService.getTaskResults(taskId);
7580
}
7681

77-
@GetMapping("/{taskId}/log")
78-
public List<CleaningTaskLog> cleaningTasksTaskIdGetLog(@PathVariable("taskId") String taskId) {
79-
return cleaningTaskService.getTaskLog(taskId);
82+
@GetMapping("/{taskId}/log/{retryCount}")
83+
public List<CleaningTaskLog> cleaningTasksTaskIdGetLog(@PathVariable("taskId") String taskId,
84+
@PathVariable("retryCount") int retryCount) {
85+
return cleaningTaskService.getTaskLog(taskId, retryCount);
8086
}
8187
}

frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,10 @@ export default function CleansingTaskDetail() {
7676

7777
const [taskLog, setTaskLog] = useState();
7878

79-
const fetchTaskLog = async () => {
79+
const fetchTaskLog = async (retryCount: number) => {
8080
if (!id) return;
8181
try {
82-
const { data } = await queryCleaningTaskLogByIdUsingGet(id);
82+
const { data } = await queryCleaningTaskLogByIdUsingGet(id, retryCount);
8383
setTaskLog(data);
8484
} catch (error) {
8585
message.error("获取清洗日志失败");
@@ -90,7 +90,7 @@ export default function CleansingTaskDetail() {
9090
const handleRefresh = async () => {
9191
fetchTaskDetail();
9292
{activeTab === "files" && await fetchTaskResult()}
93-
{activeTab === "logs" && await fetchTaskLog()}
93+
{activeTab === "logs" && await fetchTaskLog(task.retryCount)}
9494
};
9595

9696
useEffect(() => {
@@ -190,7 +190,7 @@ export default function CleansingTaskDetail() {
190190

191191
const breadItems = [
192192
{
193-
title: <Link to="/data/cleansing">数据清洗</Link>,
193+
title: <Link to="/data/cleansing">数据处理</Link>,
194194
},
195195
{
196196
title: "清洗任务详情",
@@ -215,7 +215,7 @@ export default function CleansingTaskDetail() {
215215
)}
216216
{activeTab === "operators" && <OperatorTable task={task} />}
217217
{activeTab === "files" && <FileTable result={result} fetchTaskResult={fetchTaskResult} />}
218-
{activeTab === "logs" && <LogsTable taskLog={taskLog} fetchTaskLog={fetchTaskLog} />}
218+
{activeTab === "logs" && <LogsTable taskLog={taskLog} fetchTaskLog={fetchTaskLog} retryCount={task.retryCount} />}
219219
</div>
220220
</div>
221221
</>

frontend/src/pages/DataCleansing/Detail/TemplateDetail.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ export default function CleansingTemplateDetail() {
9494

9595
const breadItems = [
9696
{
97-
title: <Link to="/data/cleansing">数据清洗</Link>,
97+
title: <Link to="/data/cleansing">数据处理</Link>,
9898
},
9999
{
100100
title: "模板详情",

frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ export default function BasicInfo({ task }: { task: CleansingTask }) {
5656
),
5757
span: 2,
5858
},
59+
{ key: "finishedTime", label: "结束时间", children: task?.finishedAt },
60+
{ key: "name", label: "重试次数", children: task?.retryCount },
5961
];
6062

6163
return (

frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,35 @@
1-
import {useEffect} from "react";
1+
import {useEffect, useState} from "react";
22
import {useParams} from "react-router";
33
import {FileClock} from "lucide-react";
44

5-
export default function LogsTable({taskLog, fetchTaskLog} : {taskLog: any[], fetchTaskLog: () => Promise<any>}) {
5+
export default function LogsTable({taskLog, fetchTaskLog, retryCount} : {taskLog: any[], fetchTaskLog: () => Promise<any>, retryCount: number}) {
66
const { id = "" } = useParams();
7+
const [selectedLog, setSelectedLog] = useState(retryCount + 1);
78

89
useEffect(() => {
9-
fetchTaskLog();
10-
}, [id]);
10+
fetchTaskLog(selectedLog - 1);
11+
}, [id, selectedLog]);
1112

1213
return taskLog?.length > 0 ? (
1314
<>
15+
{/* --- 新增区域:左上角 Select 组件 --- */}
16+
<div className="flex items-center justify-between pb-3">
17+
<div className="flex items-center gap-3">
18+
<label className="text-sm font-medium text-gray-500">选择运行轮次:</label>
19+
<select
20+
value={selectedLog}
21+
onChange={(e) => setSelectedLog(Number(e.target.value))}
22+
className="bg-gray-700 border border-gray-600 !text-white text-sm rounded-md focus:ring-blue-500 focus:border-blue-500 block px-2.5 py-1.5 min-w-[120px]"
23+
>
24+
{Array.from({ length: retryCount + 1 }, (_, i) => retryCount + 1 - i).map((num) => (
25+
<option key={num} value={num}>
26+
{num}
27+
</option>
28+
))}
29+
</select>
30+
</div>
31+
<span className="text-s text-gray-500 px-2">当前展示: 第 {selectedLog}</span>
32+
</div>
1433
<div className="text-gray-300 p-4 border border-gray-700 bg-gray-800 rounded-lg">
1534
<div className="font-mono text-sm">
1635
{taskLog?.map?.((log, index) => (

frontend/src/pages/DataCleansing/Home/DataCleansing.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ export default function DataProcessingPage() {
2222
<div className="h-full flex flex-col gap-4">
2323
{/* Header */}
2424
<div className="flex justify-between items-center">
25-
<h1 className="text-xl font-bold">数据清洗</h1>
25+
<h1 className="text-xl font-bold">数据处理</h1>
2626
<div className="flex gap-2">
2727
<Button
2828
icon={<PlusOutlined />}

0 commit comments

Comments
 (0)