Skip to content

Commit af4aff0

Browse files
authored
add data lineage page and data quality page (#287)
* feature: the backend realizes the basic capabilities of data lineage, and improves data lineage registration through collection, data management, and cleaning. * feature: add data lineage page and data quality page * fix: add data lineage front-end implementation.
1 parent d9c381a commit af4aff0

File tree

23 files changed

+2417
-728
lines changed

23 files changed

+2417
-728
lines changed

backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@
1010
import com.datamate.cleaning.domain.repository.OperatorInstanceRepository;
1111
import com.datamate.cleaning.infrastructure.validator.CleanTaskValidator;
1212
import com.datamate.cleaning.interfaces.dto.*;
13+
import com.datamate.common.domain.enums.EdgeType;
14+
import com.datamate.common.domain.enums.NodeType;
15+
import com.datamate.common.domain.model.LineageEdge;
16+
import com.datamate.common.domain.model.LineageNode;
17+
import com.datamate.common.domain.service.LineageService;
1318
import com.datamate.common.infrastructure.exception.BusinessException;
1419
import com.datamate.common.infrastructure.exception.SystemErrorCode;
1520
import com.datamate.common.interfaces.PagedResponse;
@@ -73,6 +78,8 @@ public class CleaningTaskService {
7378

7479
private final CleanTaskValidator cleanTaskValidator;
7580

81+
private final LineageService lineageService;
82+
7683
private final String DATASET_PATH = "/dataset";
7784

7885
private final String FLOW_PATH = "/flow";
@@ -134,6 +141,8 @@ public CleaningTaskDto createTask(CreateCleaningTaskRequest request) {
134141
task.setBeforeSize(srcDataset.getSizeBytes());
135142
task.setFileCount(srcDataset.getFileCount().intValue());
136143
cleaningTaskRepo.insertTask(task);
144+
// 记录血缘关系
145+
addCleaningToGraph(srcDataset, task, destDataset);
137146

138147
operatorInstanceRepo.insertInstance(taskId, request.getInstance());
139148
operatorRepo.incrementUsageCount(request.getInstance().stream()
@@ -146,6 +155,30 @@ public CleaningTaskDto createTask(CreateCleaningTaskRequest request) {
146155
return task;
147156
}
148157

158+
private void addCleaningToGraph(Dataset srcDataset, CleaningTaskDto task, Dataset destDataset) {
159+
LineageNode fromNode = new LineageNode();
160+
fromNode.setId(srcDataset.getId());
161+
fromNode.setName(srcDataset.getName());
162+
fromNode.setDescription(srcDataset.getDescription());
163+
fromNode.setNodeType(NodeType.DATASET);
164+
165+
LineageNode toNode = new LineageNode();
166+
toNode.setId(destDataset.getId());
167+
toNode.setName(destDataset.getName());
168+
toNode.setDescription(destDataset.getDescription());
169+
toNode.setNodeType(NodeType.DATASET);
170+
171+
LineageEdge edge = new LineageEdge();
172+
edge.setProcessId(task.getId());
173+
edge.setName(task.getName());
174+
edge.setDescription(task.getDescription());
175+
edge.setEdgeType(EdgeType.DATA_CLEANING);
176+
edge.setFromNodeId(fromNode.getId());
177+
edge.setToNodeId(toNode.getId());
178+
179+
lineageService.generateGraph(fromNode, edge, toNode);
180+
}
181+
149182
public CleaningTaskDto getTask(String taskId) {
150183
CleaningTaskDto task = cleaningTaskRepo.findTaskById(taskId);
151184
setProcess(task);

backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
import com.baomidou.mybatisplus.core.metadata.IPage;
44
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
5+
import com.datamate.common.domain.enums.EdgeType;
6+
import com.datamate.common.domain.enums.NodeType;
7+
import com.datamate.common.domain.model.LineageEdge;
8+
import com.datamate.common.domain.model.LineageNode;
9+
import com.datamate.common.domain.service.LineageService;
510
import com.datamate.common.domain.utils.ChunksSaver;
611
import com.datamate.common.setting.application.SysParamApplicationService;
712
import com.datamate.datamanagement.interfaces.dto.*;
@@ -17,7 +22,6 @@
1722
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository;
1823
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository;
1924
import com.datamate.datamanagement.interfaces.converter.DatasetConverter;
20-
import com.datamate.datamanagement.interfaces.dto.*;
2125
import com.fasterxml.jackson.core.JsonProcessingException;
2226
import com.fasterxml.jackson.databind.ObjectMapper;
2327
import com.fasterxml.jackson.databind.SerializationFeature;
@@ -54,6 +58,7 @@ public class DatasetApplicationService {
5458
private final CollectionTaskClient collectionTaskClient;
5559
private final DatasetFileApplicationService datasetFileApplicationService;
5660
private final SysParamApplicationService sysParamService;
61+
private final LineageService lineageService;
5762

5863
@Value("${datamate.data-management.base-path:/dataset}")
5964
private String datasetBasePath;
@@ -72,6 +77,8 @@ public Dataset createDataset(CreateDatasetRequest createDatasetRequest) {
7277
dataset.setTags(processTagNames(createDatasetRequest.getTags()));
7378
}
7479
datasetRepository.save(dataset);
80+
// 记录血缘关系
81+
addDatasetToGraph(dataset, null);
7582

7683
//todo 需要解耦这块逻辑
7784
if (StringUtils.hasText(createDatasetRequest.getDataSource())) {
@@ -81,6 +88,43 @@ public Dataset createDataset(CreateDatasetRequest createDatasetRequest) {
8188
return dataset;
8289
}
8390

91+
private void addDatasetToGraph(Dataset dataset, CollectionTaskDetailResponse collection) {
92+
LineageNode datasetNode = new LineageNode();
93+
datasetNode.setId(dataset.getId());
94+
datasetNode.setNodeType(NodeType.DATASET);
95+
datasetNode.setName(dataset.getName());
96+
datasetNode.setDescription(dataset.getDescription());
97+
98+
LineageNode collectionNode = null;
99+
LineageEdge collectionEdge = null;
100+
if(Objects.nonNull(collection)) {
101+
collectionNode = new LineageNode();
102+
collectionNode.setId(collection.getId());
103+
collectionNode.setName(collection.getName());
104+
collectionNode.setDescription(collection.getDescription());
105+
collectionNode.setNodeType(NodeType.DATASOURCE);
106+
107+
collectionEdge = new LineageEdge();
108+
collectionEdge.setProcessId(collection.getId());
109+
collectionEdge.setName(collection.getName());
110+
collectionEdge.setEdgeType(EdgeType.DATA_COLLECTION);
111+
collectionEdge.setDescription(dataset.getDescription());
112+
collectionEdge.setFromNodeId(collectionNode.getId());
113+
collectionEdge.setToNodeId(datasetNode.getId());
114+
}
115+
lineageService.generateGraph(collectionNode, collectionEdge, datasetNode);
116+
}
117+
118+
public DatasetLineage getDatasetLineage(String datasetId) {
119+
Dataset dataset = datasetRepository.getById(datasetId);
120+
if (Objects.isNull(dataset)) {
121+
return new DatasetLineage();
122+
}
123+
LineageNode datasetNode = lineageService.getNodeById(datasetId);
124+
String graphId = datasetNode.getGraphId();
125+
return new DatasetLineage(lineageService.getNodesByGraphId(graphId), lineageService.getEdgesByGraphId(graphId));
126+
}
127+
84128
public String getDatasetPvcName() {
85129
return sysParamService.getParamByKey(DATASET_PVC_NAME);
86130
}
@@ -100,11 +144,11 @@ public Dataset updateDataset(String datasetId, UpdateDatasetRequest updateDatase
100144
if (Objects.nonNull(updateDatasetRequest.getStatus())) {
101145
dataset.setStatus(updateDatasetRequest.getStatus());
102146
}
147+
datasetRepository.updateById(dataset);
103148
if (StringUtils.hasText(updateDatasetRequest.getDataSource())) {
104149
// 数据源id不为空,使用异步线程进行文件扫盘落库
105150
processDataSourceAsync(dataset.getId(), updateDatasetRequest.getDataSource());
106151
}
107-
datasetRepository.updateById(dataset);
108152
return dataset;
109153
}
110154

@@ -261,6 +305,8 @@ private List<String> getFilePaths(String dataSourceId, Dataset dataset) {
261305
log.warn("Fail to get collection task detail, task ID: {}", dataSourceId);
262306
return Collections.emptyList();
263307
}
308+
// 记录血缘关系
309+
addDatasetToGraph(dataset, taskDetail);
264310
Path targetPath = Paths.get(taskDetail.getTargetPath());
265311
if (!Files.exists(targetPath) || !Files.isDirectory(targetPath)) {
266312
log.warn("Target path not exists or is not a directory: {}", taskDetail.getTargetPath());
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.datamate.datamanagement.interfaces.dto;
2+
3+
import com.datamate.common.domain.model.LineageEdge;
4+
import com.datamate.common.domain.model.LineageNode;
5+
import lombok.AllArgsConstructor;
6+
import lombok.Getter;
7+
import lombok.NoArgsConstructor;
8+
import lombok.Setter;
9+
10+
import java.util.List;
11+
12+
/**
13+
* 数据集血缘
14+
*
15+
* @since 2026/1/23
16+
*/
17+
@Getter
18+
@Setter
19+
@NoArgsConstructor
20+
@AllArgsConstructor
21+
public class DatasetLineage {
22+
/**
23+
* 节点列表
24+
*/
25+
private List<LineageNode> lineageNodes;
26+
/**
27+
* 边列表
28+
*/
29+
private List<LineageEdge> lineageEdges;
30+
}

backend/services/data-management-service/src/main/java/com/datamate/datamanagement/interfaces/rest/DatasetController.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ public DatasetResponse updateDataset(@PathVariable("datasetId") String datasetId
8080
return DatasetConverter.INSTANCE.convertToResponse(dataset);
8181
}
8282

83+
@GetMapping("/{datasetId}/lineage")
84+
public DatasetLineage getDatasetLineage(@PathVariable("datasetId") String datasetId) {
85+
return datasetApplicationService.getDatasetLineage(datasetId);
86+
}
87+
8388
/**
8489
* 根据ID删除数据集
8590
*
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.datamate.common.domain.enums;
2+
3+
/**
4+
* 边类型:DATA_COLLECTION/DATA_CLEANING/DATA_LABELING/DATA_SYNTHESIS/DATA_RATIO
5+
*
6+
* @since 2026/1/23
7+
*/
8+
public enum EdgeType {
9+
DATA_COLLECTION,
10+
DATA_CLEANING,
11+
DATA_LABELING,
12+
DATA_SYNTHESIS,
13+
DATA_RATIO
14+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.datamate.common.domain.enums;
2+
3+
/**
4+
* 节点类型:DATASOURCE/DATASET/KNOWLEDGE_BASE/MODEL等
5+
*
6+
* @since 2026/1/23
7+
*/
8+
public enum NodeType {
9+
DATASOURCE,
10+
DATASET,
11+
KNOWLEDGE_BASE,
12+
MODEL
13+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package com.datamate.common.domain.model;
2+
3+
import com.baomidou.mybatisplus.annotation.IdType;
4+
import com.baomidou.mybatisplus.annotation.TableId;
5+
import com.baomidou.mybatisplus.annotation.TableName;
6+
import com.datamate.common.domain.enums.EdgeType;
7+
import lombok.Getter;
8+
import lombok.Setter;
9+
10+
/**
11+
* 数据血缘:边表
12+
* 边表示处理流程(归集任务、数据清洗、数据标注、数据合成、数据配比等)
13+
*
14+
* @since 2026/1/23
15+
*/
16+
17+
@Getter
18+
@Setter
19+
@TableName("t_lineage_edge")
20+
public class LineageEdge {
21+
/**
22+
* 边ID
23+
*/
24+
@TableId(type = IdType.ASSIGN_ID)
25+
private String id;
26+
/**
27+
* 图ID
28+
*/
29+
private String graphId;
30+
31+
/**
32+
* 处理流程ID
33+
*/
34+
private String processId;
35+
36+
/**
37+
* 边类型:DATA_COLLECTION/DATA_CLEANING/DATA_LABELING/DATA_SYNTHESIS/DATA_RATIO等
38+
*/
39+
private EdgeType edgeType;
40+
41+
/**
42+
* 边名称
43+
*/
44+
private String name;
45+
46+
/**
47+
* 边描述
48+
*/
49+
private String description;
50+
51+
/**
52+
* 边扩展信息(JSON)
53+
*/
54+
private String edgeMetadata;
55+
56+
/**
57+
* 源节点ID
58+
*/
59+
private String fromNodeId;
60+
61+
/**
62+
* 目标节点ID
63+
*/
64+
private String toNodeId;
65+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.datamate.common.domain.model;
2+
3+
import com.baomidou.mybatisplus.annotation.IdType;
4+
import com.baomidou.mybatisplus.annotation.TableId;
5+
import com.baomidou.mybatisplus.annotation.TableName;
6+
import com.datamate.common.domain.enums.NodeType;
7+
import lombok.Getter;
8+
import lombok.Setter;
9+
10+
/**
11+
* 数据血缘:节点表
12+
* 节点表示实体对象(归集来源、数据集、知识库、模型等)
13+
*
14+
* @since 2026/1/23
15+
*/
16+
@Getter
17+
@Setter
18+
@TableName("t_lineage_node")
19+
public class LineageNode {
20+
/**
21+
* 节点ID
22+
*/
23+
@TableId(type = IdType.ASSIGN_ID)
24+
private String id;
25+
26+
/**
27+
* 图ID
28+
*/
29+
private String graphId;
30+
31+
/**
32+
* 节点类型:DATASOURCE/DATASET/KNOWLEDGE_BASE/MODEL等
33+
*/
34+
private NodeType nodeType;
35+
36+
/**
37+
* 节点名称
38+
*/
39+
private String name;
40+
41+
/**
42+
* 节点描述
43+
*/
44+
private String description;
45+
46+
/**
47+
* 节点扩展信息(JSON)
48+
*/
49+
private String nodeMetadata;
50+
}

0 commit comments

Comments
 (0)