Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@
clearable
filterable
>
<el-option v-for="table in tableList" :key="table" :label="table" :value="table" />
<el-option v-for="table in targetTableList" :key="table" :label="table" :value="table" />
</el-select>
</el-col>

Expand Down Expand Up @@ -859,6 +859,7 @@
description: '',
} as LogicalRelation);
const tableList: Ref<string[]> = ref([]);
const targetTableList: Ref<string[]> = ref([]);
const sourceColumnList: Ref<string[]> = ref([]);
const targetColumnList: Ref<string[]> = ref([]);
const savingForeignKeys: Ref<boolean> = ref(false);
Expand Down Expand Up @@ -1348,7 +1349,11 @@

// 加载表列表
try {
// 左侧主表:仅显示当前点击进入配置环境的当前数据源及子 Schema 列表
tableList.value = await datasourceService.getDatasourceTables(datasourceRow.id);

// 右侧关联表:获取平台上所有 Active 状态数据源下的合法受管表(跨业务库)
targetTableList.value = await logicalRelationService.getAllDatasourceTables(datasourceRow.id);
} catch (error) {
ElMessage.error('加载表列表失败');
console.error('Failed to load table list:', error);
Expand Down Expand Up @@ -1627,6 +1632,7 @@
foreignKeyList,
newForeignKey,
tableList,
targetTableList,
sourceColumnList,
targetColumnList,
savingForeignKeys,
Expand Down
13 changes: 13 additions & 0 deletions data-agent-frontend/src/services/logicalRelation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,19 @@ class LogicalRelationService {
return response.data;
}

// 获取数据源所属数据库的全量表(包含所有有效的 schema)
async getAllDatasourceTables(datasourceId: number): Promise<string[]> {
try {
const response = await axios.get<string[]>(
`${API_BASE_URL}/${datasourceId}/all-tables`,
);
return response.data || [];
} catch (error) {
console.error('Failed to get all datasource tables:', error);
return [];
}
}

// 获取数据源表的字段列表
async getTableColumns(datasourceId: number, tableName: string): Promise<string[]> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

@Data
@Builder
@NoArgsConstructor
Expand All @@ -38,4 +40,5 @@ public class DbConfigBO {

private String dialectType;

private List<String> schemas;
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ public List<TableInfoBO> showTables(Connection connection, String schema, String
sql += "limit 2000;";
List<TableInfoBO> tableInfoList = Lists.newArrayList();
try {
String actualSchema = StringUtils.isNotBlank(schema) ? schema : connection.getCatalog();
String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection,
String.format(sql, connection.getCatalog(), tablePattern));
String.format(sql,actualSchema, tablePattern));
if (resultArr.length <= 1) {
return Lists.newArrayList();
}
Expand Down Expand Up @@ -110,8 +111,9 @@ public List<TableInfoBO> fetchTables(Connection connection, String schema, List<
List<TableInfoBO> tableInfoList = Lists.newArrayList();
String tableListStr = String.join(", ", tables.stream().map(x -> "'" + x + "'").collect(Collectors.toList()));
try {
String actualSchema = StringUtils.isNotBlank(schema) ? schema : connection.getCatalog();
String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection,
String.format(sql, connection.getCatalog(), tableListStr));
String.format(sql,actualSchema, tableListStr));
if (resultArr.length <= 1) {
return Lists.newArrayList();
}
Expand Down Expand Up @@ -139,8 +141,9 @@ public List<ColumnInfoBO> showColumns(Connection connection, String schema, Stri
+ "FROM information_schema.COLUMNS " + "WHERE table_schema='%s' " + "and table_name='%s';";
List<ColumnInfoBO> columnInfoList = Lists.newArrayList();
try {
String actualSchema = StringUtils.isNotBlank(schema) ? schema : connection.getCatalog();
String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, "INFORMATION_SCHEMA",
String.format(sql, connection.getCatalog(), table));
String.format(sql,actualSchema, table));
if (resultArr.length <= 1) {
return Lists.newArrayList();
}
Expand Down Expand Up @@ -176,7 +179,8 @@ public List<ForeignKeyInfoBO> showForeignKeys(Connection connection, String sche
String tableListStr = String.join(", ", tables.stream().map(x -> "'" + x + "'").collect(Collectors.toList()));

try {
sql = String.format(sql, connection.getCatalog(), tableListStr, tableListStr);
String actualSchema = StringUtils.isNotBlank(schema) ? schema : connection.getCatalog();
sql = String.format(sql,actualSchema, tableListStr, tableListStr);
String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, "INFORMATION_SCHEMA", sql);
if (resultArr.length <= 1) {
return Lists.newArrayList();
Expand All @@ -203,10 +207,11 @@ public List<ForeignKeyInfoBO> showForeignKeys(Connection connection, String sche

@Override
public List<String> sampleColumn(Connection connection, String schema, String table, String column) {
String sql = "SELECT \n" + " `%s`\n" + "FROM \n" + " `%s`\n" + "LIMIT 99;";
String qualifiedTable = StringUtils.isNotBlank(schema) ? String.format("`%s`.`%s`", schema, table) :
String.format("`%s`",table);
String sql = String.format("SELECT `%s` FROM %s LIMIT 99", column, qualifiedTable);
List<String> sampleInfo = Lists.newArrayList();
try {
sql = String.format(sql, column, table);
String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, null, sql);
if (resultArr.length <= 1) {
return Lists.newArrayList();
Expand All @@ -231,9 +236,11 @@ public List<String> sampleColumn(Connection connection, String schema, String ta
@Override
public ResultSetBO scanTable(Connection connection, String schema, String table) {
String sql = "SELECT *\n" + "FROM \n" + " `%s`\n" + "LIMIT 20;";
String qualifiedTable = StringUtils.isNotBlank(schema) ? String.format("`%s`.`%s`", schema, table) :
String.format("`%s`",table);
ResultSetBO resultSet = ResultSetBO.builder().build();
try {
resultSet = SqlExecutor.executeSqlAndReturnObject(connection, schema, String.format(sql, table));
resultSet = SqlExecutor.executeSqlAndReturnObject(connection, schema, String.format(sql, qualifiedTable));
}
catch (SQLException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ public List<TableInfoBO> showTables(Connection connection, String schema, String

List<TableInfoBO> tableInfoList = Lists.newArrayList();
try {
String actualSchema = StringUtils.isNotBlank(schema) ? schema : "public";
String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection,
String.format(sql, schema, tablePattern));
String.format(sql,actualSchema, tablePattern));
if (resultArr.length <= 1) {
return Lists.newArrayList();
}
Expand Down Expand Up @@ -216,13 +217,14 @@ public List<ForeignKeyInfoBO> showForeignKeys(Connection connection, String sche
+ " ON tc.constraint_name = kcu.constraint_name\n" + " AND tc.table_schema = kcu.table_schema\n"
+ "JOIN\n" + " information_schema.constraint_column_usage AS ccu\n"
+ " ON ccu.constraint_name = tc.constraint_name\n" + " AND ccu.table_schema = tc.table_schema\n"
+ "WHERE\n" + " tc.constraint_type = 'FOREIGN KEY'\n" + " AND tc.table_schema='public'\n"
+ "WHERE\n" + " tc.constraint_type = 'FOREIGN KEY'\n" + " AND tc.table_schema='%s'\n"
+ " AND tc.table_name in (%s)";
List<ForeignKeyInfoBO> foreignKeyInfoList = Lists.newArrayList();
String tableListStr = String.join(", ", tables.stream().map(x -> "'" + x + "'").collect(Collectors.toList()));

try {
sql = String.format(sql, tableListStr);
String actualSchema = StringUtils.isNotBlank(schema) ? schema : "public";
sql = String.format(sql,actualSchema, tableListStr);
String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, null, sql);
if (resultArr.length <= 1) {
return Lists.newArrayList();
Expand All @@ -249,10 +251,11 @@ public List<ForeignKeyInfoBO> showForeignKeys(Connection connection, String sche

@Override
public List<String> sampleColumn(Connection connection, String schema, String table, String column) {
String sql = "SELECT \n" + " \"%s\"\n" + "FROM \n" + " \"%s\"\n" + "LIMIT 99;";
String actualSchema = StringUtils.isNotBlank(schema) ? schema : "public";
String qualifiedTable = String.format("\"%s\".\"%s\"", actualSchema, table);
String sql = String.format("SELECT \"%s\" FROM %s LIMIT 99", column, qualifiedTable);
List<String> sampleInfo = Lists.newArrayList();
try {
sql = String.format(sql, column, table);
String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, schema, sql);
if (resultArr.length <= 1) {
return Lists.newArrayList();
Expand All @@ -278,10 +281,12 @@ public List<String> sampleColumn(Connection connection, String schema, String ta

@Override
public ResultSetBO scanTable(Connection connection, String schema, String table) {
String sql = "SELECT *\n" + "FROM \n" + " %s\n" + "LIMIT 20;";
String actualSchema = StringUtils.isNotBlank(schema) ? schema : "public";
String qualifiedTable = String.format("\"%s\".\"%s\"", actualSchema, table);
String sql = String.format("SELECT * FROM %s LIMIT 20", qualifiedTable);
ResultSetBO resultSet = ResultSetBO.builder().build();
try {
resultSet = SqlExecutor.executeSqlAndReturnObject(connection, schema, String.format(sql, table));
resultSet = SqlExecutor.executeSqlAndReturnObject(connection, schema, sql);
}
catch (SQLException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ public List<String> getDatasourceTables(@PathVariable Integer id) {
}
}

@GetMapping("/{id}/all-tables")
public List<String> getAllDatasourceTables(@PathVariable Integer id) {
checkDatasourceExists(id);
try {
return datasourceService.getAllSchemasTables(id);
}
catch (Exception e) {
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
}

/**
* Create data source
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class Datasource {

private String testStatus;

private String schemas;

private String description;

private Long creatorId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ public interface AgentDatasourceMapper {
@Select("SELECT * FROM agent_datasource WHERE agent_id = #{agentId} ORDER BY create_time DESC")
List<AgentDatasource> selectByAgentId(@Param("agentId") Long agentId);

/** Query active datasource ID by agent ID */
/** Query active datasource IDs by agent ID */
@Select("SELECT datasource_id FROM agent_datasource WHERE agent_id = #{agentId} AND is_active = 1")
Integer selectActiveDatasourceIdByAgentId(@Param("agentId") Long agentId);
List<Integer> selectActiveDatasourceIdsByAgentId(@Param("agentId") Long agentId);

/** Query association by agent ID and data source ID */
@Select("SELECT * FROM agent_datasource WHERE agent_id = #{agentId} AND datasource_id = #{datasourceId}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public interface DatasourceMapper {

@Insert("""
INSERT INTO datasource
(name, type, host, port, database_name, username, password, connection_url, status, test_status, description, creator_id, create_time, update_time)
VALUES (#{name}, #{type}, #{host}, #{port}, #{databaseName}, #{username}, #{password}, #{connectionUrl}, #{status}, #{testStatus}, #{description}, #{creatorId}, NOW(), NOW())
(name, type, host, port, database_name, username, password, connection_url, status, test_status,`schemas`,description, creator_id, create_time, update_time)
VALUES (#{name}, #{type}, #{host}, #{port}, #{databaseName}, #{username}, #{password}, #{connectionUrl}, #{status}, #{testStatus}, #{schemas}, #{description}, #{creatorId}, NOW(), NOW())
""")
@Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
int insert(Datasource datasource);
Expand All @@ -65,6 +65,7 @@ public interface DatasourceMapper {
<if test="password != null">password = #{password},</if>
<if test="connectionUrl != null">connection_url = #{connectionUrl},</if>
<if test="status != null">status = #{status},</if>
<if test="schemas != null">`schemas` = #{schemas},</if>
<if test="testStatus != null">test_status = #{testStatus},</if>
<if test="description != null">description = #{description},</if>
<if test="creatorId != null">creator_id = #{creatorId},</if>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public interface DatasourceService {

List<String> getDatasourceTables(Integer datasourceId) throws Exception;

List<String> getAllSchemasTables(Integer datasourceId) throws Exception;

/**
* 获取数据源表的字段列表
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.alibaba.cloud.ai.dataagent.entity.Datasource;
import org.springframework.util.StringUtils;

import java.util.Arrays;

public interface DatasourceTypeHandler {

String typeName();
Expand Down Expand Up @@ -64,6 +66,15 @@ default DbConfigBO toDbConfig(Datasource datasource) {
config.setConnectionType(connectionType());
config.setDialectType(dialectType());
config.setSchema(extractSchemaName(datasource));
if (StringUtils.hasText(datasource.getSchemas())) {
config.setSchemas(Arrays.stream(datasource.getSchemas().split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(java.util.stream.Collectors.toList()));
}
else {
config.setSchemas(java.util.List.of(datasource.getDatabaseName()));
}
return config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.alibaba.cloud.ai.dataagent.dto.datasource.SchemaInitRequest;
import com.alibaba.cloud.ai.dataagent.entity.AgentDatasource;
import com.alibaba.cloud.ai.dataagent.entity.Datasource;
import com.alibaba.cloud.ai.dataagent.exception.InternalServerException;
import com.alibaba.cloud.ai.dataagent.exception.InvalidInputException;
import com.alibaba.cloud.ai.dataagent.mapper.AgentDatasourceMapper;
import com.alibaba.cloud.ai.dataagent.mapper.AgentDatasourceTablesMapper;
import com.alibaba.cloud.ai.dataagent.service.datasource.AgentDatasourceService;
Expand Down Expand Up @@ -141,11 +143,27 @@ public void removeDatasourceFromAgent(Long agentId, Integer datasourceId) {

@Override
public AgentDatasource toggleDatasourceForAgent(Long agentId, Integer datasourceId, Boolean isActive) {
// If enabling data source, first check if there are other enabled data sources
// 如果要激活数据源,需要检查该数据源是否与当前激活的数据源属于同一物理实例,防止多数据源情况下跨实例调用报错
if (isActive) {
int activeCount = agentDatasourceMapper.countActiveByAgentIdExcluding(agentId, datasourceId);
if (activeCount > 0) {
throw new RuntimeException("同一智能体下只能启用一个数据源,请先禁用其他数据源后再启用此数据源");
Datasource targetDs = datasourceService.getDatasourceById(datasourceId);
if (targetDs != null) {
String targetHost = (targetDs.getHost() != null ? targetDs.getHost() : "");
Integer targetPort = targetDs.getPort();
String targetType = (targetDs.getType() != null ? targetDs.getType() : "");

List<AgentDatasource> existings = getAgentDatasource(agentId);
for (AgentDatasource existing : existings) {
if (existing.getIsActive() != 0 && !existing.getDatasourceId().equals(datasourceId) && existing.getDatasource() != null) {
Datasource activeDs = existing.getDatasource();
String activeHost = (activeDs.getHost() != null ? activeDs.getHost() : "");
Integer activePort = activeDs.getPort();
String activeType = (activeDs.getType() != null ? activeDs.getType() : "");

if (!targetHost.equals(activeHost) || !java.util.Objects.equals(targetPort, activePort) || !targetType.equals(activeType)) {
throw new InvalidInputException("同一智能体下仅允许激活属于同一数据库实例(主机、端口、引擎一致)的数据源。当前已激活异构源:" + activeHost + ":" + activePort);
}
}
}
}
}

Expand Down
Loading
Loading