diff --git a/data-agent-frontend/src/components/agent/DataSourceConfig.vue b/data-agent-frontend/src/components/agent/DataSourceConfig.vue index da448cde8..7905e613b 100644 --- a/data-agent-frontend/src/components/agent/DataSourceConfig.vue +++ b/data-agent-frontend/src/components/agent/DataSourceConfig.vue @@ -699,7 +699,7 @@ clearable filterable > - + @@ -859,6 +859,7 @@ description: '', } as LogicalRelation); const tableList: Ref = ref([]); + const targetTableList: Ref = ref([]); const sourceColumnList: Ref = ref([]); const targetColumnList: Ref = ref([]); const savingForeignKeys: Ref = ref(false); @@ -1348,7 +1349,11 @@ // 加载表列表 try { + // 左侧主表:仅显示当前点击进入配置环境的当前数据源及子 Schema 列表 tableList.value = await datasourceService.getDatasourceTables(datasourceRow.id); + + // 右侧关联表:获取平台上所有 Active 状态数据源下的合法受管表(跨业务库) + targetTableList.value = await logicalRelationService.getAllDatasourceTables(datasourceRow.id); } catch (error) { ElMessage.error('加载表列表失败'); console.error('Failed to load table list:', error); @@ -1627,6 +1632,7 @@ foreignKeyList, newForeignKey, tableList, + targetTableList, sourceColumnList, targetColumnList, savingForeignKeys, diff --git a/data-agent-frontend/src/services/logicalRelation.ts b/data-agent-frontend/src/services/logicalRelation.ts index 0354a4619..0db50cec8 100644 --- a/data-agent-frontend/src/services/logicalRelation.ts +++ b/data-agent-frontend/src/services/logicalRelation.ts @@ -107,6 +107,19 @@ class LogicalRelationService { return response.data; } + // 获取数据源所属数据库的全量表(包含所有有效的 schema) + async getAllDatasourceTables(datasourceId: number): Promise { + try { + const response = await axios.get( + `${API_BASE_URL}/${datasourceId}/all-tables`, + ); + return response.data || []; + } catch (error) { + console.error('Failed to get all datasource tables:', error); + return []; + } + } + // 获取数据源表的字段列表 async getTableColumns(datasourceId: number, tableName: string): Promise { try { diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/bo/DbConfigBO.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/bo/DbConfigBO.java index 4a3adb759..2a915d012 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/bo/DbConfigBO.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/bo/DbConfigBO.java @@ -20,6 +20,8 @@ import lombok.Data; import lombok.NoArgsConstructor; +import java.util.List; + @Data @Builder @NoArgsConstructor @@ -38,4 +40,5 @@ public class DbConfigBO { private String dialectType; + private List schemas; } diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/impls/mysql/MysqlJdbcDdl.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/impls/mysql/MysqlJdbcDdl.java index 41feb8531..9cb32b7af 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/impls/mysql/MysqlJdbcDdl.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/impls/mysql/MysqlJdbcDdl.java @@ -81,8 +81,9 @@ public List showTables(Connection connection, String schema, String sql += "limit 2000;"; List tableInfoList = Lists.newArrayList(); try { + String actualSchema = StringUtils.isNotBlank(schema) ? schema : connection.getCatalog(); String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, - String.format(sql, connection.getCatalog(), tablePattern)); + String.format(sql,actualSchema, tablePattern)); if (resultArr.length <= 1) { return Lists.newArrayList(); } @@ -110,8 +111,9 @@ public List fetchTables(Connection connection, String schema, List< List tableInfoList = Lists.newArrayList(); String tableListStr = String.join(", ", tables.stream().map(x -> "'" + x + "'").collect(Collectors.toList())); try { + String actualSchema = StringUtils.isNotBlank(schema) ? schema : connection.getCatalog(); String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, - String.format(sql, connection.getCatalog(), tableListStr)); + String.format(sql,actualSchema, tableListStr)); if (resultArr.length <= 1) { return Lists.newArrayList(); } @@ -139,8 +141,9 @@ public List showColumns(Connection connection, String schema, Stri + "FROM information_schema.COLUMNS " + "WHERE table_schema='%s' " + "and table_name='%s';"; List columnInfoList = Lists.newArrayList(); try { + String actualSchema = StringUtils.isNotBlank(schema) ? schema : connection.getCatalog(); String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, "INFORMATION_SCHEMA", - String.format(sql, connection.getCatalog(), table)); + String.format(sql,actualSchema, table)); if (resultArr.length <= 1) { return Lists.newArrayList(); } @@ -176,7 +179,8 @@ public List showForeignKeys(Connection connection, String sche String tableListStr = String.join(", ", tables.stream().map(x -> "'" + x + "'").collect(Collectors.toList())); try { - sql = String.format(sql, connection.getCatalog(), tableListStr, tableListStr); + String actualSchema = StringUtils.isNotBlank(schema) ? schema : connection.getCatalog(); + sql = String.format(sql,actualSchema, tableListStr, tableListStr); String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, "INFORMATION_SCHEMA", sql); if (resultArr.length <= 1) { return Lists.newArrayList(); @@ -203,10 +207,11 @@ public List showForeignKeys(Connection connection, String sche @Override public List sampleColumn(Connection connection, String schema, String table, String column) { - String sql = "SELECT \n" + " `%s`\n" + "FROM \n" + " `%s`\n" + "LIMIT 99;"; + String qualifiedTable = StringUtils.isNotBlank(schema) ? String.format("`%s`.`%s`", schema, table) : + String.format("`%s`",table); + String sql = String.format("SELECT `%s` FROM %s LIMIT 99", column, qualifiedTable); List sampleInfo = Lists.newArrayList(); try { - sql = String.format(sql, column, table); String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, null, sql); if (resultArr.length <= 1) { return Lists.newArrayList(); @@ -231,9 +236,11 @@ public List sampleColumn(Connection connection, String schema, String ta @Override public ResultSetBO scanTable(Connection connection, String schema, String table) { String sql = "SELECT *\n" + "FROM \n" + " `%s`\n" + "LIMIT 20;"; + String qualifiedTable = StringUtils.isNotBlank(schema) ? String.format("`%s`.`%s`", schema, table) : + String.format("`%s`",table); ResultSetBO resultSet = ResultSetBO.builder().build(); try { - resultSet = SqlExecutor.executeSqlAndReturnObject(connection, schema, String.format(sql, table)); + resultSet = SqlExecutor.executeSqlAndReturnObject(connection, schema, String.format(sql, qualifiedTable)); } catch (SQLException e) { throw new RuntimeException(e); diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/impls/postgre/PostgreJdbcDdl.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/impls/postgre/PostgreJdbcDdl.java index 85d557182..08990d387 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/impls/postgre/PostgreJdbcDdl.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/impls/postgre/PostgreJdbcDdl.java @@ -112,8 +112,9 @@ public List showTables(Connection connection, String schema, String List tableInfoList = Lists.newArrayList(); try { + String actualSchema = StringUtils.isNotBlank(schema) ? schema : "public"; String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, - String.format(sql, schema, tablePattern)); + String.format(sql,actualSchema, tablePattern)); if (resultArr.length <= 1) { return Lists.newArrayList(); } @@ -216,13 +217,14 @@ public List showForeignKeys(Connection connection, String sche + " ON tc.constraint_name = kcu.constraint_name\n" + " AND tc.table_schema = kcu.table_schema\n" + "JOIN\n" + " information_schema.constraint_column_usage AS ccu\n" + " ON ccu.constraint_name = tc.constraint_name\n" + " AND ccu.table_schema = tc.table_schema\n" - + "WHERE\n" + " tc.constraint_type = 'FOREIGN KEY'\n" + " AND tc.table_schema='public'\n" + + "WHERE\n" + " tc.constraint_type = 'FOREIGN KEY'\n" + " AND tc.table_schema='%s'\n" + " AND tc.table_name in (%s)"; List foreignKeyInfoList = Lists.newArrayList(); String tableListStr = String.join(", ", tables.stream().map(x -> "'" + x + "'").collect(Collectors.toList())); try { - sql = String.format(sql, tableListStr); + String actualSchema = StringUtils.isNotBlank(schema) ? schema : "public"; + sql = String.format(sql,actualSchema, tableListStr); String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, null, sql); if (resultArr.length <= 1) { return Lists.newArrayList(); @@ -249,10 +251,11 @@ public List showForeignKeys(Connection connection, String sche @Override public List sampleColumn(Connection connection, String schema, String table, String column) { - String sql = "SELECT \n" + " \"%s\"\n" + "FROM \n" + " \"%s\"\n" + "LIMIT 99;"; + String actualSchema = StringUtils.isNotBlank(schema) ? schema : "public"; + String qualifiedTable = String.format("\"%s\".\"%s\"", actualSchema, table); + String sql = String.format("SELECT \"%s\" FROM %s LIMIT 99", column, qualifiedTable); List sampleInfo = Lists.newArrayList(); try { - sql = String.format(sql, column, table); String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, schema, sql); if (resultArr.length <= 1) { return Lists.newArrayList(); @@ -278,10 +281,12 @@ public List sampleColumn(Connection connection, String schema, String ta @Override public ResultSetBO scanTable(Connection connection, String schema, String table) { - String sql = "SELECT *\n" + "FROM \n" + " %s\n" + "LIMIT 20;"; + String actualSchema = StringUtils.isNotBlank(schema) ? schema : "public"; + String qualifiedTable = String.format("\"%s\".\"%s\"", actualSchema, table); + String sql = String.format("SELECT * FROM %s LIMIT 20", qualifiedTable); ResultSetBO resultSet = ResultSetBO.builder().build(); try { - resultSet = SqlExecutor.executeSqlAndReturnObject(connection, schema, String.format(sql, table)); + resultSet = SqlExecutor.executeSqlAndReturnObject(connection, schema, sql); } catch (SQLException e) { throw new RuntimeException(e); diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/controller/DatasourceController.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/controller/DatasourceController.java index cc19ca59c..e12650cbe 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/controller/DatasourceController.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/controller/DatasourceController.java @@ -119,6 +119,17 @@ public List getDatasourceTables(@PathVariable Integer id) { } } + @GetMapping("/{id}/all-tables") + public List getAllDatasourceTables(@PathVariable Integer id) { + checkDatasourceExists(id); + try { + return datasourceService.getAllSchemasTables(id); + } + catch (Exception e) { + throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, e.getMessage()); + } + } + /** * Create data source */ diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/entity/Datasource.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/entity/Datasource.java index d2d482702..3b5ea1532 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/entity/Datasource.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/entity/Datasource.java @@ -52,6 +52,8 @@ public class Datasource { private String testStatus; + private String schemas; + private String description; private Long creatorId; diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/mapper/AgentDatasourceMapper.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/mapper/AgentDatasourceMapper.java index f6cdb0b41..44e2b6b20 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/mapper/AgentDatasourceMapper.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/mapper/AgentDatasourceMapper.java @@ -38,9 +38,9 @@ public interface AgentDatasourceMapper { @Select("SELECT * FROM agent_datasource WHERE agent_id = #{agentId} ORDER BY create_time DESC") List selectByAgentId(@Param("agentId") Long agentId); - /** Query active datasource ID by agent ID */ + /** Query active datasource IDs by agent ID */ @Select("SELECT datasource_id FROM agent_datasource WHERE agent_id = #{agentId} AND is_active = 1") - Integer selectActiveDatasourceIdByAgentId(@Param("agentId") Long agentId); + List selectActiveDatasourceIdsByAgentId(@Param("agentId") Long agentId); /** Query association by agent ID and data source ID */ @Select("SELECT * FROM agent_datasource WHERE agent_id = #{agentId} AND datasource_id = #{datasourceId}") diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/mapper/DatasourceMapper.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/mapper/DatasourceMapper.java index ccf0313ef..83eb50d39 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/mapper/DatasourceMapper.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/mapper/DatasourceMapper.java @@ -43,8 +43,8 @@ public interface DatasourceMapper { @Insert(""" INSERT INTO datasource - (name, type, host, port, database_name, username, password, connection_url, status, test_status, description, creator_id, create_time, update_time) - VALUES (#{name}, #{type}, #{host}, #{port}, #{databaseName}, #{username}, #{password}, #{connectionUrl}, #{status}, #{testStatus}, #{description}, #{creatorId}, NOW(), NOW()) + (name, type, host, port, database_name, username, password, connection_url, status, test_status,`schemas`,description, creator_id, create_time, update_time) + VALUES (#{name}, #{type}, #{host}, #{port}, #{databaseName}, #{username}, #{password}, #{connectionUrl}, #{status}, #{testStatus}, #{schemas}, #{description}, #{creatorId}, NOW(), NOW()) """) @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id") int insert(Datasource datasource); @@ -65,6 +65,7 @@ public interface DatasourceMapper { password = #{password}, connection_url = #{connectionUrl}, status = #{status}, + `schemas` = #{schemas}, test_status = #{testStatus}, description = #{description}, creator_id = #{creatorId}, diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/DatasourceService.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/DatasourceService.java index 6af2dd570..2f6535fe5 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/DatasourceService.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/DatasourceService.java @@ -77,6 +77,8 @@ public interface DatasourceService { List getDatasourceTables(Integer datasourceId) throws Exception; + List getAllSchemasTables(Integer datasourceId) throws Exception; + /** * 获取数据源表的字段列表 */ diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/handler/DatasourceTypeHandler.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/handler/DatasourceTypeHandler.java index a134a7d92..90359d834 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/handler/DatasourceTypeHandler.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/handler/DatasourceTypeHandler.java @@ -20,6 +20,8 @@ import com.alibaba.cloud.ai.dataagent.entity.Datasource; import org.springframework.util.StringUtils; +import java.util.Arrays; + public interface DatasourceTypeHandler { String typeName(); @@ -64,6 +66,15 @@ default DbConfigBO toDbConfig(Datasource datasource) { config.setConnectionType(connectionType()); config.setDialectType(dialectType()); config.setSchema(extractSchemaName(datasource)); + if (StringUtils.hasText(datasource.getSchemas())) { + config.setSchemas(Arrays.stream(datasource.getSchemas().split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(java.util.stream.Collectors.toList())); + } + else { + config.setSchemas(java.util.List.of(datasource.getDatabaseName())); + } return config; } diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/impl/AgentDatasourceServiceImpl.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/impl/AgentDatasourceServiceImpl.java index 47a7b9678..1bf171f4f 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/impl/AgentDatasourceServiceImpl.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/impl/AgentDatasourceServiceImpl.java @@ -19,6 +19,8 @@ import com.alibaba.cloud.ai.dataagent.dto.datasource.SchemaInitRequest; import com.alibaba.cloud.ai.dataagent.entity.AgentDatasource; import com.alibaba.cloud.ai.dataagent.entity.Datasource; +import com.alibaba.cloud.ai.dataagent.exception.InternalServerException; +import com.alibaba.cloud.ai.dataagent.exception.InvalidInputException; import com.alibaba.cloud.ai.dataagent.mapper.AgentDatasourceMapper; import com.alibaba.cloud.ai.dataagent.mapper.AgentDatasourceTablesMapper; import com.alibaba.cloud.ai.dataagent.service.datasource.AgentDatasourceService; @@ -141,11 +143,27 @@ public void removeDatasourceFromAgent(Long agentId, Integer datasourceId) { @Override public AgentDatasource toggleDatasourceForAgent(Long agentId, Integer datasourceId, Boolean isActive) { - // If enabling data source, first check if there are other enabled data sources + // 如果要激活数据源,需要检查该数据源是否与当前激活的数据源属于同一物理实例,防止多数据源情况下跨实例调用报错 if (isActive) { - int activeCount = agentDatasourceMapper.countActiveByAgentIdExcluding(agentId, datasourceId); - if (activeCount > 0) { - throw new RuntimeException("同一智能体下只能启用一个数据源,请先禁用其他数据源后再启用此数据源"); + Datasource targetDs = datasourceService.getDatasourceById(datasourceId); + if (targetDs != null) { + String targetHost = (targetDs.getHost() != null ? targetDs.getHost() : ""); + Integer targetPort = targetDs.getPort(); + String targetType = (targetDs.getType() != null ? targetDs.getType() : ""); + + List existings = getAgentDatasource(agentId); + for (AgentDatasource existing : existings) { + if (existing.getIsActive() != 0 && !existing.getDatasourceId().equals(datasourceId) && existing.getDatasource() != null) { + Datasource activeDs = existing.getDatasource(); + String activeHost = (activeDs.getHost() != null ? activeDs.getHost() : ""); + Integer activePort = activeDs.getPort(); + String activeType = (activeDs.getType() != null ? activeDs.getType() : ""); + + if (!targetHost.equals(activeHost) || !java.util.Objects.equals(targetPort, activePort) || !targetType.equals(activeType)) { + throw new InvalidInputException("同一智能体下仅允许激活属于同一数据库实例(主机、端口、引擎一致)的数据源。当前已激活异构源:" + activeHost + ":" + activePort); + } + } + } } } diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/impl/DatasourceServiceImpl.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/impl/DatasourceServiceImpl.java index e8100959a..b10ee579d 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/impl/DatasourceServiceImpl.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/impl/DatasourceServiceImpl.java @@ -34,6 +34,7 @@ import com.alibaba.cloud.ai.dataagent.service.datasource.handler.DatasourceTypeHandler; import com.alibaba.cloud.ai.dataagent.service.datasource.handler.registry.DatasourceTypeHandlerRegistry; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -228,27 +229,62 @@ public List getDatasourceTables(Integer datasourceId) throws Exception { // Create database configuration DbConfigBO dbConfig = getDbConfig(datasource); - // Create query parameters - DbQueryParameter queryParam = DbQueryParameter.from(dbConfig); - - // 提取schema名称 - DatasourceTypeHandler handler = datasourceTypeHandlerRegistry.getRequired(datasource.getType()); - String schemaName = handler.extractSchemaName(datasource); - queryParam.setSchema(schemaName); - // Query table list Accessor dbAccessor = accessorFactory.getAccessorByDbConfig(dbConfig); - List tableInfoList = dbAccessor.showTables(dbConfig, queryParam); + List allTableNames = new ArrayList<>(); + + // 遍历所有 schema,使用 schema.table 格式与向量库保持一致 + List schemasToProcess = (dbConfig.getSchemas() != null && !dbConfig.getSchemas().isEmpty()) + ? dbConfig.getSchemas() + : java.util.List.of(dbConfig.getSchema()); + + for (String schema : schemasToProcess) { + DbQueryParameter queryParam = DbQueryParameter.from(dbConfig).setSchema(schema); + List tableInfoList = dbAccessor.showTables(dbConfig, queryParam); + List tableNames = tableInfoList.stream() + .map(TableInfoBO::getName) + .filter(name -> name != null && !name.trim().isEmpty()) + .map(name -> schema + "." + name) + .sorted() + .toList(); + allTableNames.addAll(tableNames); + } - // Extract table names - List tableNames = tableInfoList.stream() - .map(TableInfoBO::getName) - .filter(name -> name != null && !name.trim().isEmpty()) - .sorted() - .toList(); + log.info("Found {} tables for datasource: {}", allTableNames.size(), datasourceId); + return allTableNames; + } - log.info("Found {} tables for datasource: {}", tableNames.size(), datasourceId); - return tableNames; + @Override + public List getAllSchemasTables(Integer datasourceId) throws Exception { + log.info("Getting aggregated tables from all active datasources to act as cross-schema targets."); + + List allTableNames = new ArrayList<>(); + + // 1. 查找所有开启了活跃状态的业务配置数据源 + List activeDatasources = this.getDatasourceByStatus("active"); + if (activeDatasources == null || activeDatasources.isEmpty()) { + return this.getDatasourceTables(datasourceId); + } + + Set uniqueTables = new HashSet<>(); + // 2. 借用原生针对单数据源的安全拉取功能,遍历拼装出平台级白名单可用的多库大集合 + for (Datasource ds : activeDatasources) { + try { + List dsTables = this.getDatasourceTables(ds.getId()); + if (dsTables != null) { + uniqueTables.addAll(dsTables); + } + } catch (Exception e) { + log.warn("Failed to get tables for datasource: {} - {} when aggregating active tables", + ds.getId(), ds.getName(), e); + } + } + + allTableNames.addAll(uniqueTables); + Collections.sort(allTableNames); + + log.info("Aggregated {} distinct tables across {} active datasources.", allTableNames.size(), activeDatasources.size()); + return allTableNames; } @Override @@ -274,11 +310,15 @@ public List getTableColumns(Integer datasourceId, String tableName) thro DbQueryParameter queryParam = DbQueryParameter.from(dbConfig); // 提取schema名称 - DatasourceTypeHandler handler = datasourceTypeHandlerRegistry.getRequired(datasource.getType()); - String schemaName = handler.extractSchemaName(datasource); - queryParam.setSchema(schemaName); - queryParam.setTable(tableName); - + String actualTableName = tableName; + String actualSchema = datasource.getDatabaseName(); + if (tableName.contains(".")){ + String[] split = tableName.split("\\.", 2); + actualSchema = split[0]; + actualTableName = split[1]; + } + queryParam.setSchema(actualSchema); + queryParam.setTable(actualTableName); // 查询字段列表 Accessor dbAccessor = accessorFactory.getAccessorByDbConfig(dbConfig); List columnInfoList = dbAccessor.showColumns(dbConfig, queryParam); // 提取字段名称 diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/schema/SchemaService.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/schema/SchemaService.java index d6a579fc9..3d985e24c 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/schema/SchemaService.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/schema/SchemaService.java @@ -31,7 +31,7 @@ public interface SchemaService { void extractDatabaseName(SchemaDTO schemaDTO, DbConfigBO dbConfig); void buildSchemaFromDocuments(String agentId, List columnDocumentList, List tableDocuments, - SchemaDTO schemaDTO); + SchemaDTO schemaDTO, List extraForeignKeys); List getTableDocuments(Integer datasourceId, List tableNames); diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/schema/SchemaServiceImpl.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/schema/SchemaServiceImpl.java index 2512845dd..5ff6576d2 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/schema/SchemaServiceImpl.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/schema/SchemaServiceImpl.java @@ -15,6 +15,7 @@ */ package com.alibaba.cloud.ai.dataagent.service.schema; +import com.alibaba.cloud.ai.dataagent.bo.schema.ColumnInfoBO; import com.alibaba.cloud.ai.dataagent.connector.DbQueryParameter; import com.alibaba.cloud.ai.dataagent.bo.schema.ForeignKeyInfoBO; import com.alibaba.cloud.ai.dataagent.bo.schema.TableInfoBO; @@ -42,6 +43,7 @@ import org.springframework.ai.vectorstore.SearchRequest; import org.springframework.ai.vectorstore.filter.Filter; import org.springframework.ai.vectorstore.filter.FilterExpressionBuilder; +import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; import org.springframework.util.Assert; @@ -83,7 +85,7 @@ public class SchemaServiceImpl implements SchemaService { @Override public void buildSchemaFromDocuments(String agentId, List currentColumnDocuments, - List tableDocuments, SchemaDTO schemaDTO) { + List tableDocuments, SchemaDTO schemaDTO, List extraForeignKeys) { // 创建可变列表副本,避免不可变集合异常 List mutableColumnDocuments = new ArrayList<>(currentColumnDocuments); @@ -102,6 +104,21 @@ public void buildSchemaFromDocuments(String agentId, List currentColum // 将包含"订单表.订单ID"和"订单详情表.订单ID" Set relatedNamesFromForeignKeys = extractRelatedNamesFromForeignKeys(mutableTableDocuments); + // 将额外的外键信息(例如逻辑虚拟外键)一并纳入提取 + if (extraForeignKeys != null) { + for (String fk : extraForeignKeys) { + if (StringUtils.isNotBlank(fk)) { + Arrays.stream(fk.split("、")).forEach(pair -> { + String[] parts = pair.split("="); + if (parts.length == 2) { + relatedNamesFromForeignKeys.add(parts[0].trim()); + relatedNamesFromForeignKeys.add(parts[1].trim()); + } + }); + } + } + } + // 通过外键加载缺失的表和列 List missingTables = getMissingTableNamesWithForeignKeySet(mutableTableDocuments, relatedNamesFromForeignKeys); @@ -119,7 +136,10 @@ public void buildSchemaFromDocuments(String agentId, List currentColum schemaDTO.setTable(tableList); Set foreignKeys = tableDocuments.stream() - .map(doc -> (String) doc.getMetadata().getOrDefault("foreignKey", "")) + .map(doc -> { + Object fk = doc.getMetadata().getOrDefault("foreignKey", ""); + return fk != null ? fk.toString() : ""; + }) .flatMap(fk -> Arrays.stream(fk.split("、"))) .filter(StringUtils::isNotBlank) .collect(Collectors.toSet()); @@ -130,48 +150,82 @@ public void buildSchemaFromDocuments(String agentId, List currentColum public Boolean schema(Integer datasourceId, SchemaInitRequest schemaInitRequest) throws Exception { log.info("Starting schema initialization for datasource: {}", datasourceId); DbConfigBO config = schemaInitRequest.getDbConfig(); - DbQueryParameter dqp = DbQueryParameter.from(config) - .setSchema(config.getSchema()) - .setTables(schemaInitRequest.getTables()); + // 将前端传入的表按 schema 归类 + Map> schemaToTables = new HashMap<>(); + // 解析 "schema.table" 格式,按 schema 分组 + for (String rawTable : schemaInitRequest.getTables()) { + String schemaName = config.getSchema(); + String tableName = rawTable; + if (rawTable.contains(".")) { + String[] split = rawTable.split("\\.", 2); + schemaName = split[0]; + tableName = split[1]; + } + schemaToTables.computeIfAbsent(schemaName, k -> new ArrayList<>()).add(tableName); + } try { // 根据当前DbConfig获取Accessor Accessor dbAccessor = accessorFactory.getAccessorByDbConfig(config); - // 清理旧数据 log.info("Clearing existing schema data for datasource: {}", datasourceId); clearSchemaDataForDatasource(datasourceId); log.debug("Successfully cleared existing schema data for datasource: {}", datasourceId); - - // 处理外键 - log.debug("Fetching foreign keys for datasource: {}", datasourceId); - List foreignKeys = dbAccessor.showForeignKeys(config, dqp); - log.info("Found {} foreign keys for datasource: {}", foreignKeys.size(), datasourceId); - - Map> foreignKeyMap = buildForeignKeyMap(foreignKeys); - log.debug("Built foreign key map with {} entries for datasource: {}", foreignKeyMap.size(), datasourceId); - - // 处理表和列 - log.debug("Fetching tables for datasource: {}", datasourceId); - List tables = dbAccessor.fetchTables(config, dqp); - log.info("Found tables for datasource: {}", tables.size(), datasourceId); - - if (tables.size() > 5) { - // 对于大量表,使用并行处理 - log.info("Processing {} tables in parallel mode for datasource: {}", tables.size(), datasourceId); - processTablesInParallel(tables, config, foreignKeyMap); - } - else { - // 对于少量表,使用批量处理 - log.info("Processing {} tables in batch mode for datasource: {}", tables.size(), datasourceId); - tableMetadataService.batchEnrichTableMetadata(tables, config, foreignKeyMap); + List allTables = new ArrayList<>(); + for (Map.Entry> entry : schemaToTables.entrySet()) { + String schemaName = entry.getKey(); + List tableNamesForSchema = entry.getValue(); + + // 复制一个带有特定 schema 的配置对象 + DbConfigBO currentSchemaConfig = new DbConfigBO(); + BeanUtils.copyProperties(config, currentSchemaConfig); + currentSchemaConfig.setSchema(schemaName); + DbQueryParameter dqp = DbQueryParameter.from(currentSchemaConfig) + .setSchema(schemaName) + .setTables(tableNamesForSchema); + // 处理外键 + log.debug("Fetching foreign keys for datasource: {}", datasourceId); + List foreignKeys = dbAccessor.showForeignKeys(currentSchemaConfig, dqp); + log.info("Found {} foreign keys for datasource: {}", foreignKeys.size(), datasourceId); + // 外键 map 的 key 需要使用 schema.table 格式,与后续表名前缀保持一致 + Map> foreignKeyMap = buildForeignKeyMap(foreignKeys, schemaName); + log.debug("Built foreign key map with {} entries for datasource: {}", foreignKeyMap.size(), datasourceId); + + // 处理表和列 + log.debug("Fetching tables for datasource: {}", datasourceId); + List tables = dbAccessor.fetchTables(currentSchemaConfig, dqp); + log.info("Found {} tables for datasource: {}", tables.size(), datasourceId); + + if (tables.size() > 5) { + // 对于大量表,使用并行处理 + log.info("Processing {} tables in parallel mode for datasource: {}", tables.size(), datasourceId); + processTablesInParallel(tables, currentSchemaConfig, foreignKeyMap); + } + else { + // 对于少量表,使用批量处理 + log.info("Processing {} tables in batch mode for datasource: {}", tables.size(), datasourceId); + tableMetadataService.batchEnrichTableMetadata(tables, currentSchemaConfig, foreignKeyMap); + } + // 将带有 schema 的表名和相关列的归属表名更新,方便存储到向量库中 + if (StringUtils.isNotBlank(schemaName)) { + for (TableInfoBO table : tables) { + String fullName = schemaName + "." + table.getName(); + table.setName(fullName); + if (table.getColumns() != null) { + for (ColumnInfoBO column : table.getColumns()) { + column.setTableName(fullName); + } + } + } + } + allTables.addAll(tables); } log.info("Successfully processed all tables for datasource: {}", datasourceId); // 转换为文档 - List columnDocs = convertColumnsToDocuments(datasourceId, tables); - List tableDocs = convertTablesToDocuments(datasourceId, tables); + List columnDocs = convertColumnsToDocuments(datasourceId,allTables); + List tableDocs = convertTablesToDocuments(datasourceId, allTables); // 存储文档 log.info("Storing columns and {} tables for datasource: {}", columnDocs.size(), tableDocs.size(), @@ -260,13 +314,24 @@ protected void storeSchemaDocuments(Integer datasourceId, List columns } protected Map> buildForeignKeyMap(List foreignKeys) { + return buildForeignKeyMap(foreignKeys, null); + } + + /** + * 构建外键映射,key 使用 schema.table 格式(与向量库表名保持一致) + * @param foreignKeys 外键列表 + * @param schemaName schema 名称(为非空时,table 名前加 schema. 前缀) + */ + protected Map> buildForeignKeyMap(List foreignKeys, String schemaName) { Map> map = new HashMap<>(); + boolean hasSchema = StringUtils.isNotBlank(schemaName); for (ForeignKeyInfoBO fk : foreignKeys) { - String key = fk.getTable() + "." + fk.getColumn() + "=" + fk.getReferencedTable() + "." - + fk.getReferencedColumn(); + String table = hasSchema ? schemaName + "." + fk.getTable() : fk.getTable(); + String refTable = hasSchema ? schemaName + "." + fk.getReferencedTable() : fk.getReferencedTable(); + String key = table + "." + fk.getColumn() + "=" + refTable + "." + fk.getReferencedColumn(); - map.computeIfAbsent(fk.getTable(), k -> new ArrayList<>()).add(key); - map.computeIfAbsent(fk.getReferencedTable(), k -> new ArrayList<>()).add(key); + map.computeIfAbsent(table, k -> new ArrayList<>()).add(key); + map.computeIfAbsent(refTable, k -> new ArrayList<>()).add(key); } return map; } @@ -317,9 +382,11 @@ private List getMissingTableNamesWithForeignKeySet(List tableD Set missingTables = new HashSet<>(); for (String key : foreignKeySet) { - String[] parts = key.split("\\."); - if (parts.length == 2) { - String tableName = parts[0]; + // key 格式: schema.table.column 或 table.column + // 从最后一个点拆分,取左边得到表名(可能是 schema.table 或 table) + int lastDot = key.lastIndexOf('.'); + if (lastDot > 0) { + String tableName = key.substring(0, lastDot); if (!uniqueTableNames.contains(tableName)) { missingTables.add(tableName); } @@ -483,7 +550,11 @@ public void extractDatabaseName(SchemaDTO schemaDTO, DbConfigBO dbConfig) { } } else if (BizDataSourceTypeEnum.isPgDialect(dbConfig.getDialectType())) { - schemaDTO.setName(dbConfig.getSchema()); + if (dbConfig.getSchemas() != null && !dbConfig.getSchemas().isEmpty()) { + schemaDTO.setName(String.join(",", dbConfig.getSchemas())); + }else { + schemaDTO.setName(dbConfig.getSchema()); + } } } diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/schema/TableMetadataService.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/schema/TableMetadataService.java index a75c591d9..a38a1d2e2 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/schema/TableMetadataService.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/schema/TableMetadataService.java @@ -28,6 +28,7 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import org.apache.commons.lang3.StringUtils; import java.util.*; import java.util.stream.Collectors; @@ -63,7 +64,7 @@ public void batchEnrichTableMetadata(List tables, DbConfigBO dbConf tableColumnsMap); // 3. 处理每个表的元数据 - enrichTablesWithMetadata(tables, tableColumnsMap, allTablesSampleData, foreignKeyMap); + enrichTablesWithMetadata(tables, tableColumnsMap, allTablesSampleData, foreignKeyMap, dbConfig.getSchema()); } /** @@ -97,7 +98,8 @@ private Map> fetchTableColumns(List tabl * @param foreignKeyMap 外键映射 */ private void enrichTablesWithMetadata(List tables, Map> tableColumnsMap, - Map>> allTablesSampleData, Map> foreignKeyMap) { + Map>> allTablesSampleData, Map> foreignKeyMap, + String schemaName) { for (TableInfoBO table : tables) { List columnInfoBOS = tableColumnsMap.get(table.getName()); @@ -111,7 +113,7 @@ private void enrichTablesWithMetadata(List tables, Map columnInf * @param table 表信息 * @param foreignKeyMap 外键映射 */ - private void setTableForeignKeys(TableInfoBO table, Map> foreignKeyMap) { - List foreignKeys = foreignKeyMap.getOrDefault(table.getName(), new ArrayList<>()); + private void setTableForeignKeys(TableInfoBO table, Map> foreignKeyMap, String schemaName) { + // foreignKeyMap 在 SchemaServiceImpl 中通常使用 schema.table 作为 key,这里需要对齐查询口径 + String rawTableName = table.getName(); + List foreignKeys = new ArrayList<>(); + if (foreignKeyMap != null) { + if (StringUtils.isNotBlank(schemaName)) { + foreignKeys = foreignKeyMap.getOrDefault(schemaName + "." + rawTableName, + foreignKeyMap.getOrDefault(rawTableName, new ArrayList<>())); + } + else { + foreignKeys = foreignKeyMap.getOrDefault(rawTableName, new ArrayList<>()); + } + } table.setForeignKey(String.join("、", foreignKeys)); } diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/SchemaRecallNode.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/SchemaRecallNode.java index dccba83b5..253e6555d 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/SchemaRecallNode.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/SchemaRecallNode.java @@ -67,10 +67,10 @@ public Map apply(OverAllState state) throws Exception { String input = queryEnhanceOutputDTO.getCanonicalQuery(); String agentId = StateUtil.getStringValue(state, AGENT_ID); - // 查询 Agent 的激活数据源 - Integer datasourceId = agentDatasourceMapper.selectActiveDatasourceIdByAgentId(Long.valueOf(agentId)); + // 查询 Agent 的所有激活数据源 + List datasourceIds = agentDatasourceMapper.selectActiveDatasourceIdsByAgentId(Long.valueOf(agentId)); - if (datasourceId == null) { + if (datasourceIds == null || datasourceIds.isEmpty()) { log.warn("Agent {} has no active datasource", agentId); // 返回空结果 String noDataSourceMessage = """ @@ -98,11 +98,22 @@ public Map apply(OverAllState state) throws Exception { } // Execute business logic first - recall schema information immediately - List tableDocuments = new ArrayList<>( - schemaService.getTableDocumentsByDatasource(datasourceId, input)); - // extract table names - List recalledTableNames = extractTableName(tableDocuments); - List columnDocuments = schemaService.getColumnDocumentsByTableName(datasourceId, recalledTableNames); + List tableDocuments = new ArrayList<>(); + List recalledTableNames = new ArrayList<>(); + List columnDocuments = new ArrayList<>(); + + for (Integer datasourceId : datasourceIds) { + List currentTableDocs = schemaService.getTableDocumentsByDatasource(datasourceId, input); + tableDocuments.addAll(currentTableDocs); + + List currentTableNames = extractTableName(currentTableDocs); + recalledTableNames.addAll(currentTableNames); + + if (!currentTableNames.isEmpty()) { + List currentColumnDocs = schemaService.getColumnDocumentsByTableName(datasourceId, currentTableNames); + columnDocuments.addAll(currentColumnDocs); + } + } String failMessage = """ \n 未检索到相关数据表 diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/TableRelationNode.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/TableRelationNode.java index 4d657d7de..12ab16abe 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/TableRelationNode.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/workflow/node/TableRelationNode.java @@ -164,7 +164,7 @@ private SchemaDTO buildInitialSchema(String agentId, List columnDocume SchemaDTO schemaDTO = new SchemaDTO(); schemaService.extractDatabaseName(schemaDTO, agentDbConfig); - schemaService.buildSchemaFromDocuments(agentId, columnDocuments, tableDocuments, schemaDTO); + schemaService.buildSchemaFromDocuments(agentId, columnDocuments, tableDocuments, schemaDTO, logicalForeignKeys); // 将逻辑外键信息合并到 schemaDTO 的 foreignKeys 字段 if (logicalForeignKeys != null && !logicalForeignKeys.isEmpty()) { diff --git a/data-agent-management/src/main/resources/sql/schema.sql b/data-agent-management/src/main/resources/sql/schema.sql index 160bcedf0..95bad0c1e 100644 --- a/data-agent-management/src/main/resources/sql/schema.sql +++ b/data-agent-management/src/main/resources/sql/schema.sql @@ -106,6 +106,7 @@ CREATE TABLE IF NOT EXISTS datasource ( connection_url VARCHAR(1000) COMMENT '完整连接URL', status VARCHAR(50) DEFAULT 'inactive' COMMENT '状态:active-启用,inactive-禁用', test_status VARCHAR(50) DEFAULT 'unknown' COMMENT '连接测试状态:success-成功,failed-失败,unknown-未知', + schemas VARCHAR(1000) COMMENT '配置的schemas(逗号隔开)' description TEXT COMMENT '描述', creator_id BIGINT COMMENT '创建者ID', create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',