diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/config/CacheConfig.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/config/CacheConfig.java new file mode 100644 index 000000000..b313f6c48 --- /dev/null +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/config/CacheConfig.java @@ -0,0 +1,30 @@ +/* + * Copyright 2024-2026 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.cloud.ai.dataagent.config; + +import com.alibaba.cloud.ai.dataagent.service.cache.ResultCacheService; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class CacheConfig { + + @Bean + public ResultCacheService resultCacheService() { + return new ResultCacheService(); + } + +} diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/SqlExecutor.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/SqlExecutor.java index f47a970ab..6f51d8cb2 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/SqlExecutor.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/SqlExecutor.java @@ -32,7 +32,7 @@ */ public class SqlExecutor { - public static final Integer RESULT_SET_LIMIT = 1000; + public static final Integer RESULT_SET_LIMIT = 1000 * 10; public static final Integer STATEMENT_TIMEOUT = 30; diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/cache/ResultCacheService.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/cache/ResultCacheService.java new file mode 100644 index 000000000..6eb6a272b --- /dev/null +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/cache/ResultCacheService.java @@ -0,0 +1,59 @@ +/* + * Copyright 2024-2026 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.cloud.ai.dataagent.service.cache; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import org.springframework.stereotype.Service; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +// 新增 ResultCacheService 类 +@Service +public class ResultCacheService { + + // 设置最大缓存大小和过期策略(例如,写入后10分钟过期) + Cache cache = CacheBuilder.newBuilder() + .maximumSize(100) // 设置最大缓存大小(仅为示例) + .expireAfterWrite(30, TimeUnit.MINUTES) // 设置过期时间 + .build(); + + /** + * 存储执行结果并返回摘要码 + */ + public String storeResult(String key, String result) { + String cacheKey = generateCacheKey(key); + cache.put(cacheKey, result); + return cacheKey; + } + + /** + * 根据摘要码获取完整结果 + */ + public String getResult(String cacheKey) { + return cache.getIfPresent(cacheKey); + } + + public boolean exists(String cacheKey) { + return cache.getIfPresent(cacheKey) != null; + } + + private String generateCacheKey(String key) { + return "result_" + System.currentTimeMillis() + "_" + UUID.randomUUID().toString().substring(0, 10); + } + +} diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/llm/LlmService.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/llm/LlmService.java index 4b74ec109..953f2d40e 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/llm/LlmService.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/llm/LlmService.java @@ -27,6 +27,8 @@ public interface LlmService { Flux callUser(String user); + Flux callUserWithTools(String user, Object... tools); + @Deprecated default String blockToString(Flux responseFlux) { return toStringFlux(responseFlux).collect(StringBuilder::new, StringBuilder::append) diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/llm/impls/BlockLlmService.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/llm/impls/BlockLlmService.java index d1f3fdea6..bc11d2589 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/llm/impls/BlockLlmService.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/llm/impls/BlockLlmService.java @@ -44,4 +44,10 @@ public Flux callUser(String user) { return Mono.fromCallable(() -> registry.getChatClient().prompt().user(user).call().chatResponse()).flux(); } + @Override + public Flux callUserWithTools(String user, Object... tools) { + return Mono.fromCallable(() -> registry.getChatClient().prompt().user(user).tools(tools).call().chatResponse()) + .flux(); + } + } diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/llm/impls/StreamLlmService.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/llm/impls/StreamLlmService.java index 2e7a14b3a..4ff1a9d59 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/llm/impls/StreamLlmService.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/llm/impls/StreamLlmService.java @@ -41,4 +41,14 @@ public Flux callUser(String user) { return registry.getChatClient().prompt().user(user).stream().chatResponse(); } + @Override + public Flux callUserWithTools(String user, Object... tools) { + return registry.getChatClient() + .prompt() + .user(user) + .tools(tools) // 注册工具 + .stream() + .chatResponse(); + } + } diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/tool/CacheAccessTool.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/tool/CacheAccessTool.java new file mode 100644 index 000000000..c7e4fc27f --- /dev/null +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/tool/CacheAccessTool.java @@ -0,0 +1,65 @@ +/* + * Copyright 2024-2026 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.cloud.ai.dataagent.service.tool; + +import com.alibaba.cloud.ai.dataagent.service.cache.ResultCacheService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.ai.tool.annotation.Tool; +import org.springframework.ai.tool.annotation.ToolParam; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Component +@Slf4j +public class CacheAccessTool { + + private final ResultCacheService resultCacheService; + + public CacheAccessTool(ResultCacheService resultCacheService) { + this.resultCacheService = resultCacheService; + } + + /** + * 工具方法:根据摘要码获取完整数据 + */ + @Tool(name = "getFullData", description = "根据摘要码获取完整数据") + public String getFullData(@ToolParam(description = "摘要码") String cacheKey) { + if (StringUtils.isNotBlank(cacheKey) && resultCacheService.exists(cacheKey)) { + String fullData = resultCacheService.getResult(cacheKey); + log.info("获取完整数据成功: {}", fullData); + return fullData; + } + else { + return "摘要码不存在或已过期"; + } + } + + /** + * 工具方法:批量获取多个摘要码的数据 + */ + public Map getMultipleData(List cacheKeys) { + Map results = new HashMap<>(); + for (String key : cacheKeys) { + results.put(key, getFullData(key)); + } + return results; + } + +} diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/util/ReportTemplateUtil.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/util/ReportTemplateUtil.java index a921f35bf..04bbde263 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/util/ReportTemplateUtil.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/util/ReportTemplateUtil.java @@ -145,6 +145,41 @@ public ReportTemplateUtil(DataAgentProperties dataAgentProperties) { border-radius: 8px; } + /* 8. 表格样式 */ + table { + width: 100%; + border-collapse: collapse; + margin: 1.5rem 0; + background-color: #ffffff; + border-radius: 0.5rem; + overflow: hidden; + box-shadow: 0 1px 3px rgba(0,0,0,0.1); + } + + th, td { + padding: 0.75rem 1rem; + text-align: left; + border: 1px solid #e5e7eb; + } + + th { + background-color: #f9fafb; + font-weight: 600; + color: #374151; + } + + td { + border-top: 1px solid #e5e7eb; + } + + tr:nth-child(even) { + background-color: #f9fafb; + } + + tr:hover { + background-color: #f3f4f6; + } + /* --- 样式结束 --- */ diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/ReportGeneratorNode.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/ReportGeneratorNode.java index 3b25e05d7..b9c3d2eeb 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/ReportGeneratorNode.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/ReportGeneratorNode.java @@ -18,17 +18,21 @@ import com.alibaba.cloud.ai.dataagent.dto.planner.ExecutionStep; import com.alibaba.cloud.ai.dataagent.dto.planner.Plan; import com.alibaba.cloud.ai.dataagent.entity.UserPromptConfig; +import com.alibaba.cloud.ai.dataagent.enums.TextType; import com.alibaba.cloud.ai.dataagent.prompt.PromptHelper; +import com.alibaba.cloud.ai.dataagent.service.cache.ResultCacheService; import com.alibaba.cloud.ai.dataagent.service.llm.LlmService; import com.alibaba.cloud.ai.dataagent.service.prompt.UserPromptService; -import com.alibaba.cloud.ai.dataagent.enums.TextType; +import com.alibaba.cloud.ai.dataagent.service.tool.CacheAccessTool; +import com.alibaba.cloud.ai.dataagent.util.ChatResponseUtil; +import com.alibaba.cloud.ai.dataagent.util.FluxUtil; +import com.alibaba.cloud.ai.dataagent.util.StateUtil; import com.alibaba.cloud.ai.graph.GraphResponse; import com.alibaba.cloud.ai.graph.OverAllState; import com.alibaba.cloud.ai.graph.action.NodeAction; import com.alibaba.cloud.ai.graph.streaming.StreamingOutput; -import com.alibaba.cloud.ai.dataagent.util.ChatResponseUtil; -import com.alibaba.cloud.ai.dataagent.util.FluxUtil; -import com.alibaba.cloud.ai.dataagent.util.StateUtil; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.ai.chat.model.ChatResponse; import org.springframework.ai.converter.BeanOutputConverter; @@ -37,6 +41,7 @@ import reactor.core.publisher.Flux; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -62,11 +67,23 @@ public class ReportGeneratorNode implements NodeAction { private final UserPromptService promptConfigService; - public ReportGeneratorNode(LlmService llmService, UserPromptService promptConfigService) { + private final ResultCacheService resultCacheService; // 注入缓存服务 + + private final CacheAccessTool cacheAccessTool; // 注入 CacheAccessTool + + // 定义最大结果大小常量 + private static final int MAX_RESULT_SIZE = 5000; // 每个结果最大5KB + + // private static final int MAX_TOTAL_SIZE = 20000; // 总体结果最大20KB + + public ReportGeneratorNode(LlmService llmService, UserPromptService promptConfigService, + ResultCacheService resultCacheService) { // 注入缓存服务 this.llmService = llmService; this.converter = new BeanOutputConverter<>(new ParameterizedTypeReference<>() { }); this.promptConfigService = promptConfigService; + this.resultCacheService = resultCacheService; + this.cacheAccessTool = new CacheAccessTool(resultCacheService); } @Override @@ -97,8 +114,11 @@ public Map apply(OverAllState state) throws Exception { // ignore parse error, treat as global config } + // 使用缓存机制处理执行结果 + HashMap processedExecutionResults = processExecutionResults(executionResults, plan); + // Generate report streaming flux - Flux reportGenerationFlux = generateReport(userInput, plan, executionResults, + Flux reportGenerationFlux = generateReport(userInput, plan, processedExecutionResults, summaryAndRecommendations, agentId); TextType reportTextType = TextType.MARK_DOWN; @@ -121,6 +141,242 @@ public Map apply(OverAllState state) throws Exception { return Map.of(RESULT, generator); } + /** + * 处理执行结果,仅在 SQL 执行且结果超限时进行摘要缓存 + */ + private HashMap processExecutionResults(HashMap executionResults, Plan plan) { + HashMap processedResults = new HashMap<>(); + + int totalSize = 0; + List executionPlan = plan.getExecutionPlan(); + + for (Map.Entry entry : executionResults.entrySet()) { + String stepKey = entry.getKey(); + String stepResult = entry.getValue(); + + // if (totalSize >= MAX_TOTAL_SIZE) { + // log.warn("Total execution results size exceeded limit, stopping + // processing"); + // break; + // } + + // 获取当前步骤信息 + ExecutionStep currentStep = getExecutionStepByKey(stepKey, executionPlan); + boolean isSqlExecution = currentStep != null && "SQL_GENERATE_NODE".equals(currentStep.getToolToUse()); + + // 判断是否需要缓存处理 + String processedResult; + if (isSqlExecution && stepResult.length() > MAX_RESULT_SIZE) { + // 仅当是 SQL 执行且结果超限时,进行摘要并缓存 + String cacheKey = resultCacheService.storeResult(stepKey, stepResult); + String summary = getSummaryFromResult(stepResult); + processedResult = createReferenceInfo(stepKey, summary, cacheKey); + } + else { + // 其他情况直接保留原始数据或完整缓存 + // if (stepResult.length() > MAX_RESULT_SIZE) { + // // 超限则完整缓存 + // String cacheKey = resultCacheService.storeResult(stepKey, stepResult); + // processedResult = createReferenceInfo(stepKey, stepResult, cacheKey); + // } else { + // 不超限则保留原始数据 + processedResult = stepResult; + // } + } + + // 更新总大小 + totalSize += processedResult.length(); + processedResults.put(stepKey, processedResult); + } + + return processedResults; + } + + /** + * 根据步骤键获取对应的 ExecutionStep + */ + private ExecutionStep getExecutionStepByKey(String stepKey, List executionPlan) { + try { + int stepIndex = Integer.parseInt(stepKey.replace("step_", "")) - 1; + if (stepIndex >= 0 && stepIndex < executionPlan.size()) { + return executionPlan.get(stepIndex); + } + } + catch (NumberFormatException e) { + log.warn("Failed to parse step key: {}", stepKey); + } + return null; + } + + /** + * 创建结果引用信息 + */ + private String createReferenceInfo(String stepKey, String originalResult, String cacheKey) { + // 提供结果摘要和缓存引用 + String summary = getSummaryFromResult(originalResult); + log.info("originalResult.length():{},summary.length():{}", originalResult.length(), summary.length()); + return String.format( + "**[执行结果过大已进行摘要]**\n" + "- 原始结果大小: %d 字符\n" + "- 摘要: %s\n" + "- 如需完整数据可使用提供的工具根据摘要码 '%s' 获取\n", + originalResult.length(), summary, cacheKey); + } + + /** + * 从结果中提取摘要 + */ + private String getSummaryFromResult(String result) { + if (result == null || result.isEmpty()) { + return "无数据"; + } + + // 如果是JSON格式,尝试提取关键字段 + if (result.startsWith("{") || result.startsWith("[")) { + return extractJsonSummary(result); + } + else { + // 截取前200个字符作为摘要 + return result.length() > 200 ? result.substring(0, 200) + "..." : result; + } + } + + /** + * 提取JSON结果的摘要 + */ + private String extractJsonSummary(String jsonResult) { + try { + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(jsonResult); + + if (rootNode.isObject()) { + // 检查是否是包含 data 数组的对象结构 + JsonNode dataNode = rootNode.get("data"); + JsonNode columnNode = rootNode.get("column"); + + if (dataNode != null && dataNode.isArray() && columnNode != null && columnNode.isArray()) { + // 这是包含列定义和数据的特殊格式 + int totalRows = dataNode.size(); + int columnsCount = columnNode.size(); + + // 获取列名 + StringBuilder columnsBuilder = new StringBuilder(); + for (int i = 0; i < Math.min(columnNode.size(), 10); i++) { // 增加列名显示数量 + if (i > 0) + columnsBuilder.append(", "); + columnsBuilder.append(columnNode.get(i).asText()); + } + String columnsPreview = columnsBuilder.toString(); + + // 获取前100条记录的详细内容 + StringBuilder rowsPreview = new StringBuilder(); + int recordsToShow = Math.min(totalRows, 100); // 最多显示100条记录 + + for (int i = 0; i < recordsToShow; i++) { + JsonNode row = dataNode.get(i); + if (i > 0) + rowsPreview.append("; "); + + if (row.isArray()) { + // 如果每行是数组格式 + StringBuilder rowBuilder = new StringBuilder(); + rowBuilder.append("["); + for (int j = 0; j < row.size(); j++) { + if (j > 0) + rowBuilder.append(","); + String value = row.get(j) != null ? row.get(j).asText() : "null"; + // 限制单个值长度,避免过长 + rowBuilder.append(value.length() > 100 ? value.substring(0, 100) + "..." : value); + } + rowBuilder.append("]"); + rowsPreview.append(rowBuilder.toString()); + } + else if (row.isObject()) { + // 如果每行是对象格式 + Iterator> fields = row.fields(); + StringBuilder rowBuilder = new StringBuilder(); + rowBuilder.append("{"); + int fieldCount = 0; + while (fields.hasNext()) { + Map.Entry field = fields.next(); + if (fieldCount > 0) + rowBuilder.append(","); + + String value = field.getValue() != null ? field.getValue().asText() : "null"; + // 限制单个值长度,避免过长 + String truncatedValue = value.length() > 100 ? value.substring(0, 100) + "..." : value; + rowBuilder.append(field.getKey()).append("=").append(truncatedValue); + fieldCount++; + } + rowBuilder.append("}"); + rowsPreview.append(rowBuilder.toString()); + } + } + + String additionalInfo = recordsToShow < totalRows + ? String.format(", 还有%d条记录未显示", totalRows - recordsToShow) : ""; + + return String.format("表格式数据: %d列×%d行, 列名: [%s], 前%d条记录: %s%s", columnsCount, totalRows, + columnsPreview, recordsToShow, rowsPreview.toString(), additionalInfo); + } + else { + // 普通对象处理逻辑 + Iterator> fields = rootNode.fields(); + StringBuilder summary = new StringBuilder(); + int count = 0; + while (fields.hasNext() && count < 10) { // 显示更多字段 + Map.Entry field = fields.next(); + String fieldName = field.getKey(); + JsonNode fieldValue = field.getValue(); + + String valueSummary; + if (fieldValue.isArray()) { + valueSummary = String.format("[数组, %d项]", fieldValue.size()); + } + else if (fieldValue.isObject()) { + valueSummary = "{对象}"; + } + else { + String textValue = fieldValue.asText(); + valueSummary = textValue.length() > 100 ? textValue.substring(0, 100) + "..." : textValue; + } + + if (count > 0) + summary.append(", "); + summary.append(fieldName).append("=").append(valueSummary); + count++; + } + return summary.toString(); + } + } + else if (rootNode.isArray()) { + // 对于纯数组的情况也保留前100条记录 + int totalItems = rootNode.size(); + StringBuilder itemsPreview = new StringBuilder(); + int itemsToShow = Math.min(totalItems, 100); + + for (int i = 0; i < itemsToShow; i++) { + JsonNode item = rootNode.get(i); + if (i > 0) + itemsPreview.append(", "); + + String itemText = item != null ? item.asText() : "null"; + itemsPreview.append(itemText.length() > 100 ? itemText.substring(0, 100) + "..." : itemText); + } + + String additionalInfo = itemsToShow < totalItems ? String.format(", 还有%d项未显示", totalItems - itemsToShow) + : ""; + + return String.format("数组,共%d个项目,前%d项: [%s]%s", totalItems, itemsToShow, itemsPreview.toString(), + additionalInfo); + } + } + catch (Exception e) { + log.warn("JSON parsing failed for summary extraction: {}", e.getMessage()); + // JSON解析失败,返回前200个字符 + return jsonResult.length() > 200 ? jsonResult.substring(0, 200) + "..." : jsonResult; + } + + return jsonResult.length() > 200 ? jsonResult.substring(0, 200) + "..." : jsonResult; + } + /** * Gets the current execution step from the plan. */ @@ -156,7 +412,7 @@ private Flux generateReport(String userInput, Plan plan, HashMap`, ``, ``, `