Skip to content

Commit 716ccaa

Browse files
committed
增加数据库插入
1 parent dab469f commit 716ccaa

File tree

6 files changed

+168
-6
lines changed

6 files changed

+168
-6
lines changed

frontend/src/pages/DataCleansing/Create/components/ParamConfig.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ const ParamConfig: React.FC<ParamConfigProps> = ({
2828
if (!param) return null;
2929
let defaultVal: any = param.defaultVal;
3030
if (param.type === "range") {
31-
31+
3232
defaultVal = Array.isArray(param.defaultVal)
3333
? param.defaultVal
3434
: [
@@ -219,7 +219,7 @@ const ParamConfig: React.FC<ParamConfigProps> = ({
219219
<Switch
220220
checkedChildren={param.checkedLabel}
221221
unCheckedChildren={param.unCheckedLabel}
222-
defaultChecked={param.defaultVal === 'true'}
222+
defaultChecked={String(param.defaultVal).toLowerCase() === 'true'}
223223
onChange={(checked) => updateValue(checked)}
224224
/>
225225
</Form.Item>
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import os
2+
import uuid
3+
from loguru import logger
4+
import mimetypes
5+
from datetime import datetime
6+
7+
from datamate.sql_manager.persistence_atction import TaskInfoPersistence
8+
9+
class FileScanner:
10+
def __init__(self, dataset_id):
11+
self.dataset_id = dataset_id
12+
self.persistence = TaskInfoPersistence()
13+
14+
def get_existing_paths(self):
15+
"""
16+
优化点1:一次性获取所有已存在的文件路径,存为 Set
17+
"""
18+
logger.info("Fetching existing files from DB...")
19+
existing_files = self.persistence.query_existing_files(self.dataset_id)
20+
# fetchall 返回的是 list of tuples [('path1',), ('path2',)]
21+
# 我们将其转换为 set {'path1', 'path2'} 以实现 O(1) 查找速度
22+
existing_set = {row[0] for row in existing_files}
23+
logger.info(f"Found {len(existing_set)} existing files in DB.")
24+
return existing_set
25+
26+
def prepare_file_data(self, sample, file_id):
27+
"""
28+
优化点2:这部分逻辑从原来的 update_file_result 剥离出来
29+
只负责数据组装,不负责插入数据库
30+
"""
31+
file_size = str(sample.get("fileSize"))
32+
file_type = str(sample.get("fileType"))
33+
file_name = str(sample.get("fileName"))
34+
dataset_id = str(sample.get("dataset_id"))
35+
file_path = str(sample.get("filePath"))
36+
create_time = datetime.now()
37+
38+
# 获取最后访问时间,增加异常处理
39+
try:
40+
last_access_time = datetime.fromtimestamp(os.path.getmtime(file_path))
41+
except (FileNotFoundError, OSError):
42+
last_access_time = create_time
43+
44+
# 返回字典,供 executemany 使用
45+
return {
46+
"id": file_id,
47+
"dataset_id": dataset_id,
48+
"file_name": file_name,
49+
"file_path": file_path,
50+
"file_type": file_type,
51+
"file_size": file_size,
52+
"status": "COMPLETED",
53+
"upload_time": create_time,
54+
"last_access_time": last_access_time,
55+
"created_at": create_time,
56+
"updated_at": create_time
57+
}
58+
59+
def scan_and_process(self, root_dir, batch_size=5000):
60+
logger.info(f"Scanning directory: {root_dir}")
61+
62+
# 1. 内存中收集扫描到的所有文件 {path: metadata_dict}
63+
scanned_files_map = {}
64+
65+
for root, dirs, files in os.walk(root_dir):
66+
for file in files:
67+
if file.startswith('.'): continue
68+
69+
full_path = os.path.join(root, file)
70+
71+
# 预先收集元数据
72+
try:
73+
stats = os.stat(full_path)
74+
f_type, _ = mimetypes.guess_type(full_path)
75+
if not f_type: f_type = os.path.splitext(file)[1]
76+
77+
# 构造 sample 格式
78+
scanned_files_map[full_path] = {
79+
"fileSize": stats.st_size,
80+
"fileType": f_type,
81+
"fileName": file,
82+
"dataset_id": self.dataset_id,
83+
"filePath": full_path
84+
}
85+
except OSError:
86+
continue
87+
88+
logger.info(f"Scanned {len(scanned_files_map)} files on disk.")
89+
90+
# 2. 获取数据库中已有的路径
91+
existing_paths = self.get_existing_paths()
92+
93+
# 3. 内存做差集 (Set Difference) -> 找出需要新增的路径
94+
scanned_paths_set = set(scanned_files_map.keys())
95+
new_paths = list(scanned_paths_set - existing_paths)
96+
97+
logger.info(f"Need to insert {len(new_paths)} new files.")
98+
99+
if not new_paths:
100+
logger.info("No new files to insert.")
101+
return
102+
103+
# 4. 准备批量插入的数据
104+
insert_batch = []
105+
total_inserted = 0
106+
107+
for path in new_paths:
108+
sample_data = scanned_files_map[path]
109+
new_file_id = str(uuid.uuid4())
110+
111+
# 调用转换逻辑
112+
record = self.prepare_file_data(sample_data, new_file_id)
113+
insert_batch.append(record)
114+
115+
# 优化点3:分批执行,防止一次性数据包过大导致 SQL 报错
116+
if len(insert_batch) >= batch_size:
117+
self.persistence.batch_insert_files(insert_batch)
118+
total_inserted += len(insert_batch)
119+
logger.info(f"Progress: {total_inserted}/{len(new_paths)} inserted...")
120+
insert_batch = [] # 清空列表
121+
122+
# 插入剩余的数据
123+
if insert_batch:
124+
self.persistence.batch_insert_files(insert_batch)
125+
total_inserted += len(insert_batch)
126+
127+
logger.info(f"Done. Total inserted: {total_inserted}")

runtime/python-executor/datamate/sql_manager/persistence_atction.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# -*- coding: utf-8 -*-
22

33
import json
4-
import time
54
import os
5+
import time
66
import uuid
77
from datetime import datetime
88
from pathlib import Path
@@ -78,6 +78,18 @@ def update_file_result(self, sample, file_id):
7878
}
7979
self.insert_result(file_data, str(self.sql_dict.get("insert_dataset_file_sql")))
8080

81+
def query_existing_files(self, dataset_id: str):
82+
result = None
83+
query_sql = str(self.sql_dict.get("query_dataset_files_sql"))
84+
with SQLManager.create_connect() as conn:
85+
execute_result = conn.execute(text(query_sql), {"dataset_id": dataset_id})
86+
result = execute_result.fetchall()
87+
return result
88+
89+
def batch_insert_files(self, samples):
90+
insert_sql = str(self.sql_dict.get("insert_dataset_file_sql"))
91+
self.batch_execute(insert_sql, samples)
92+
8193
def persistence_task_info(self, sample: Dict[str, Any]):
8294
file_id = str(uuid.uuid4())
8395
self.update_task_result(sample, file_id)
@@ -102,6 +114,22 @@ def insert_result(data, sql):
102114
raise RuntimeError(82000, str(e)) from None
103115
raise Exception("Max retries exceeded")
104116

117+
@staticmethod
118+
def batch_execute(sql, args_list):
119+
"""
120+
批量执行 SQL
121+
:param sql: SQL 语句 (例如: "INSERT INTO table (a, b) VALUES (%s, %s)")
122+
:param args_list: 参数列表 (例如: [(1, 2), (3, 4), ...])
123+
"""
124+
# 获取连接
125+
with SQLManager.create_connect() as conn:
126+
try:
127+
conn.execute(text(sql), args_list)
128+
except Exception as e:
129+
conn.rollback()
130+
logger.error(f"批量插入失败: {e}")
131+
raise e
132+
105133
def update_result(self, dataset_id, instance_id, status):
106134
dataset_data = {
107135
"dataset_id": dataset_id

runtime/python-executor/datamate/sql_manager/sql/sql_config.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,6 @@
1313
"create_similar_img_tables_sql": "CREATE TABLE IF NOT EXISTS operator_similar_img_features (id SERIAL PRIMARY KEY, task_uuid VARCHAR(255), p_hash TEXT, des_matrix BYTEA, matrix_shape TEXT, file_name TEXT, timestamp TIMESTAMP);",
1414
"delete_similar_img_tables_sql": "DELETE FROM operator_similar_img_features WHERE flow_id = :flow_id",
1515
"create_similar_text_tables_sql": "CREATE TABLE IF NOT EXISTS operators_similar_text_features (id SERIAL PRIMARY KEY, task_uuid VARCHAR(255), file_feature TEXT, file_name TEXT, timestamp TIMESTAMP);",
16-
"delete_similar_text_tables_sql": "DELETE FROM operators_similar_text_features WHERE flow_id = :flow_id"
17-
}
16+
"delete_similar_text_tables_sql": "DELETE FROM operators_similar_text_features WHERE flow_id = :flow_id",
17+
"query_dataset_files_sql": "SELECT file_path FROM t_dm_dataset_files WHERE dataset_id = :dataset_id"
18+
}

runtime/python-executor/datamate/wrappers/datamate_executor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def run(self):
4747
for _ in dataset.data.iter_batches():
4848
pass
4949

50+
self.scan_files()
5051

5152
if __name__ == '__main__':
5253

runtime/python-executor/datamate/wrappers/executor.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import time
33
from typing import Dict
44

5+
from datamate.common.utils.file_scanner import FileScanner
56
import ray
67
from jsonargparse import dict_to_namespace
78
from loguru import logger
@@ -77,4 +78,8 @@ def load_dataset(self, jsonl_file_path = None):
7778

7879
def update_db(self, status):
7980
task_info = TaskInfoPersistence()
80-
task_info.update_result(self.cfg.dataset_id, self.cfg.instance_id, status)
81+
task_info.update_result(self.cfg.dataset_id, self.cfg.instance_id, status)
82+
83+
def scan_files(self):
84+
scanner = FileScanner(self.cfg.dataset_id)
85+
scanner.scan_and_process(self.cfg.export_path)

0 commit comments

Comments
 (0)