diff --git a/data-agent-management/pom.xml b/data-agent-management/pom.xml index b2cdd4740..d664c632e 100644 --- a/data-agent-management/pom.xml +++ b/data-agent-management/pom.xml @@ -364,6 +364,13 @@ docker-java-transport-zerodep + + + com.clickhouse + clickhouse-jdbc + 0.9.7 + compile + diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/impls/clickhouse/ClickhouseDBAccessor.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/impls/clickhouse/ClickhouseDBAccessor.java new file mode 100644 index 000000000..87c1d6eef --- /dev/null +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/impls/clickhouse/ClickhouseDBAccessor.java @@ -0,0 +1,49 @@ +/* + * Copyright 2024-2026 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.cloud.ai.dataagent.connector.impls.clickhouse; + +import com.alibaba.cloud.ai.dataagent.connector.accessor.AbstractAccessor; +import com.alibaba.cloud.ai.dataagent.connector.ddl.DdlFactory; +import com.alibaba.cloud.ai.dataagent.connector.pool.DBConnectionPoolFactory; +import com.alibaba.cloud.ai.dataagent.enums.BizDataSourceTypeEnum; +import org.springframework.stereotype.Service; + +/** + * @author yuluo + * @author yuluo + */ + +@Service("clickhouseAccessor") +public class ClickhouseDBAccessor extends AbstractAccessor { + + private final static String ACCESSOR_TYPE = "Clickhouse_Accessor"; + + protected ClickhouseDBAccessor(DdlFactory ddlFactory, DBConnectionPoolFactory poolFactory) { + + super(ddlFactory, poolFactory.getPoolByDbType(BizDataSourceTypeEnum.CLICKHOUSE.getTypeName())); + } + + @Override + public String getAccessorType() { + return ACCESSOR_TYPE; + } + + @Override + public boolean supportedDataSourceType(String type) { + return BizDataSourceTypeEnum.CLICKHOUSE.getTypeName().equalsIgnoreCase(type); + } + +} diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/impls/clickhouse/ClickhouseJdbcConnectionPool.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/impls/clickhouse/ClickhouseJdbcConnectionPool.java new file mode 100644 index 000000000..a1a1b629d --- /dev/null +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/impls/clickhouse/ClickhouseJdbcConnectionPool.java @@ -0,0 +1,61 @@ +/* + * Copyright 2024-2026 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.cloud.ai.dataagent.connector.impls.clickhouse; + +import com.alibaba.cloud.ai.dataagent.connector.pool.AbstractDBConnectionPool; +import com.alibaba.cloud.ai.dataagent.enums.BizDataSourceTypeEnum; +import com.alibaba.cloud.ai.dataagent.enums.ErrorCodeEnum; +import org.springframework.stereotype.Service; + +import static com.alibaba.cloud.ai.dataagent.enums.ErrorCodeEnum.*; + +@Service("clickhouseJdbcConnectionPool") +public class ClickhouseJdbcConnectionPool extends AbstractDBConnectionPool { + + private final static String DRIVER = "com.clickhouse.jdbc.ClickHouseDriver"; + + @Override + public String getDriver() { + return DRIVER; + } + + @Override + public ErrorCodeEnum errorMapping(String sqlState) { + + ErrorCodeEnum ret = ErrorCodeEnum.fromCode(sqlState); + if (ret != null) { + return ret; + } + + return switch (sqlState) { + case "08S01" -> DATASOURCE_CONNECTION_FAILURE_08S01; + case "28000" -> PASSWORD_ERROR_28000; + case "42000" -> DATABASE_NOT_EXIST_42000; + default -> OTHERS; + }; + } + + @Override + public boolean supportedDataSourceType(String type) { + return BizDataSourceTypeEnum.CLICKHOUSE.getTypeName().equals(type); + } + + @Override + public String getConnectionPoolType() { + return BizDataSourceTypeEnum.CLICKHOUSE.getTypeName(); + } + +} diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/impls/clickhouse/ClickhouseJdbcDdl.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/impls/clickhouse/ClickhouseJdbcDdl.java new file mode 100644 index 000000000..8d4eefa78 --- /dev/null +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/connector/impls/clickhouse/ClickhouseJdbcDdl.java @@ -0,0 +1,248 @@ +/* + * Copyright 2024-2026 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.cloud.ai.dataagent.connector.impls.clickhouse; + +import com.alibaba.cloud.ai.dataagent.bo.schema.*; +import com.alibaba.cloud.ai.dataagent.connector.SqlExecutor; +import com.alibaba.cloud.ai.dataagent.connector.ddl.AbstractJdbcDdl; +import com.alibaba.cloud.ai.dataagent.enums.BizDataSourceTypeEnum; +import org.apache.commons.compress.utils.Lists; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Service; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static com.alibaba.cloud.ai.dataagent.util.ColumnTypeUtil.wrapType; + +@Service +public class ClickhouseJdbcDdl extends AbstractJdbcDdl { + + @Override + public List showDatabases(Connection connection) { + String sql = "show databases;"; + List databaseInfoList = Lists.newArrayList(); + try { + String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, sql); + if (resultArr.length <= 1) { + return Lists.newArrayList(); + } + + for (int i = 1; i < resultArr.length; i++) { + if (resultArr[i].length == 0) { + continue; + } + String database = resultArr[i][0]; + databaseInfoList.add(DatabaseInfoBO.builder().name(database).build()); + } + } + catch (SQLException e) { + throw new RuntimeException(e); + } + + return databaseInfoList; + } + + @Override + public List showSchemas(Connection connection) { + return Collections.emptyList(); + } + + @Override + public List showTables(Connection connection, String schema, String tablePattern) { + String sql = "SELECT name, comment \n" + "FROM system.tables \n" + + "WHERE database = '%s' \n"; + if (StringUtils.isNotBlank(tablePattern)) { + sql += "AND name LIKE CONCAT('%%','%s','%%') \n"; + } + sql += "limit 2000;"; + List tableInfoList = Lists.newArrayList(); + try { + String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, + String.format(sql, schema, tablePattern)); + if (resultArr.length <= 1) { + return Lists.newArrayList(); + } + + for (int i = 1; i < resultArr.length; i++) { + if (resultArr[i].length == 0) { + continue; + } + String tableName = resultArr[i][0]; + String tableDesc = resultArr[i][1]; + tableInfoList.add(TableInfoBO.builder().schema(schema).name(tableName).description(tableDesc).build()); + } + } + catch (SQLException e) { + throw new RuntimeException(e); + } + + return tableInfoList; + } + + @Override + public List fetchTables(Connection connection, String schema, List tables) { + String sql = "SELECT name, comment \n" + "FROM system.tables \n" + + "WHERE database = '%s' \n" + "AND name in(%s) \n" + "limit 200;"; + List tableInfoList = Lists.newArrayList(); + String tableListStr = String.join(", ", tables.stream().map(x -> "'" + x + "'").collect(Collectors.toList())); + try { + String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, + String.format(sql, schema, tableListStr)); + if (resultArr.length <= 1) { + return Lists.newArrayList(); + } + + for (int i = 1; i < resultArr.length; i++) { + if (resultArr[i].length == 0) { + continue; + } + String tableName = resultArr[i][0]; + String tableDesc = resultArr[i][1]; + tableInfoList.add(TableInfoBO.builder().name(tableName).description(tableDesc).build()); + } + } + catch (SQLException e) { + throw new RuntimeException(e); + } + + return tableInfoList; + } + + @Override + public List showColumns(Connection connection, String schema, String table) { + String sql = "SELECT a.name, a.comment, a.type, " + + "IF(a.is_in_primary_key=1,'true','false') AS `主键唯一`," + + "IF(b.is_nullable ='0','true','false') AS `非空` " + + "FROM system.columns a " + + "inner join information_schema.COLUMNS b on a.database=b.TABLE_SCHEMA and a.table=b.TABLE_NAME and a.name=b.COLUMN_NAME " + + "WHERE a.database='%s' " + + "and a.table='%s';"; + List columnInfoList = Lists.newArrayList(); + try { + String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, null, + String.format(sql, schema, table)); + if (resultArr.length <= 1) { + return Lists.newArrayList(); + } + + for (int i = 1; i < resultArr.length; i++) { + if (resultArr[i].length == 0) { + continue; + } + columnInfoList.add(ColumnInfoBO.builder() + .name(resultArr[i][0]) + .description(resultArr[i][1]) + .type(wrapType(resultArr[i][2])) + .primary(BooleanUtils.toBoolean(resultArr[i][3])) + .notnull(BooleanUtils.toBoolean(resultArr[i][4])) + .build()); + } + } + catch (SQLException e) { + throw new RuntimeException(e); + } + + return columnInfoList; + } + + @Override + public List showForeignKeys(Connection connection, String schema, List tables) { + String sql = "SELECT \n" + " TABLE_NAME AS `表名`,\n" + " COLUMN_NAME AS `列名`,\n" + + " CONSTRAINT_NAME AS `约束名`,\n" + " REFERENCED_TABLE_NAME AS `引用表名`,\n" + + " REFERENCED_COLUMN_NAME AS `引用列名`\n" + "FROM \n" + " INFORMATION_SCHEMA.KEY_COLUMN_USAGE\n" + + "WHERE \n" + " CONSTRAINT_SCHEMA = '%s' " + " AND CONSTRAINT_NAME != 'PRIMARY'" + + " AND TABLE_NAME in(%s)\n" + " AND REFERENCED_TABLE_NAME in (%s);"; + List foreignKeyInfoList = Lists.newArrayList(); + String tableListStr = String.join(", ", tables.stream().map(x -> "'" + x + "'").collect(Collectors.toList())); + + try { + sql = String.format(sql, schema, tableListStr, tableListStr); + String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, null, sql); + if (resultArr.length <= 1) { + return Lists.newArrayList(); + } + + for (int i = 1; i < resultArr.length; i++) { + if (resultArr[i].length == 0) { + continue; + } + foreignKeyInfoList.add(ForeignKeyInfoBO.builder() + .table(resultArr[i][0]) + .column(resultArr[i][1]) + .referencedTable(resultArr[i][3]) + .referencedColumn(resultArr[i][4]) + .build()); + } + } + catch (SQLException e) { + throw new RuntimeException(e); + } + + return foreignKeyInfoList; + } + + @Override + public List sampleColumn(Connection connection, String schema, String table, String column) { + String sql = "SELECT \n" + " `%s` \n" + "FROM \n" + " `%s`.`%s`\n" + "LIMIT 99;"; + List sampleInfo = Lists.newArrayList(); + try { + sql = String.format(sql, column, schema, table); + String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, null, sql); + if (resultArr.length <= 1) { + return Lists.newArrayList(); + } + + for (int i = 1; i < resultArr.length; i++) { + if (resultArr[i].length == 0 || column.equalsIgnoreCase(resultArr[i][0])) { + continue; + } + sampleInfo.add(resultArr[i][0]); + } + } + catch (SQLException e) { + // throw new RuntimeException(e); + } + + Set siSet = sampleInfo.stream().collect(Collectors.toSet()); + sampleInfo = siSet.stream().collect(Collectors.toList()); + return sampleInfo; + } + + @Override + public ResultSetBO scanTable(Connection connection, String schema, String table) { + String sql = "SELECT *\n" + "FROM \n" + " `%s`.`%s`\n" + "LIMIT 20;"; + ResultSetBO resultSet = ResultSetBO.builder().build(); + try { + resultSet = SqlExecutor.executeSqlAndReturnObject(connection, schema, String.format(sql, schema, table)); + } + catch (SQLException e) { + throw new RuntimeException(e); + } + return resultSet; + } + + @Override + public BizDataSourceTypeEnum getDataSourceType() { + return BizDataSourceTypeEnum.CLICKHOUSE; + } + +} diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/controller/DatasourceController.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/controller/DatasourceController.java index cc19ca59c..1a3dba6c5 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/controller/DatasourceController.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/controller/DatasourceController.java @@ -63,7 +63,7 @@ public ApiResponse> getDatasourceTypes() { // 定义标准的 JDBC 数据源类型 List standardTypes = Arrays.asList(BizDataSourceTypeEnum.MYSQL, BizDataSourceTypeEnum.POSTGRESQL, BizDataSourceTypeEnum.DAMENG, BizDataSourceTypeEnum.SQL_SERVER, - BizDataSourceTypeEnum.ORACLE, BizDataSourceTypeEnum.HIVE); + BizDataSourceTypeEnum.ORACLE, BizDataSourceTypeEnum.HIVE, BizDataSourceTypeEnum.CLICKHOUSE); List types = standardTypes.stream() .map(type -> DatasourceTypeDTO.builder() diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/enums/BizDataSourceTypeEnum.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/enums/BizDataSourceTypeEnum.java index 914b07746..daec8227b 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/enums/BizDataSourceTypeEnum.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/enums/BizDataSourceTypeEnum.java @@ -44,6 +44,7 @@ public enum BizDataSourceTypeEnum { * Hive database */ HIVE(8, "hive", DatabaseDialectEnum.HIVE.getCode(), DbAccessTypeEnum.JDBC.getCode()), + CLICKHOUSE(9, "clickhouse", DatabaseDialectEnum.CLICKHOUSE.getCode(), DbAccessTypeEnum.JDBC.getCode()), HOLOGRESS(10, "hologress", DatabaseDialectEnum.POSTGRESQL.getCode(), DbAccessTypeEnum.JDBC.getCode()), diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/enums/DatabaseDialectEnum.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/enums/DatabaseDialectEnum.java index b1e7545a3..c5e6274f0 100644 --- a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/enums/DatabaseDialectEnum.java +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/enums/DatabaseDialectEnum.java @@ -33,7 +33,8 @@ public enum DatabaseDialectEnum { ORACLE("Oracle"), - HIVE("Hive"); + HIVE("Hive"), + CLICKHOUSE("Clickhouse"); public final String code; diff --git a/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/handler/impl/ClickhouseDatasourceTypeHandler.java b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/handler/impl/ClickhouseDatasourceTypeHandler.java new file mode 100644 index 000000000..3c764e15d --- /dev/null +++ b/data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/datasource/handler/impl/ClickhouseDatasourceTypeHandler.java @@ -0,0 +1,39 @@ +/* + * Copyright 2024-2026 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.cloud.ai.dataagent.service.datasource.handler.impl; + +import com.alibaba.cloud.ai.dataagent.entity.Datasource; +import com.alibaba.cloud.ai.dataagent.enums.BizDataSourceTypeEnum; +import com.alibaba.cloud.ai.dataagent.service.datasource.handler.DatasourceTypeHandler; +import org.springframework.stereotype.Component; + +@Component +public class ClickhouseDatasourceTypeHandler implements DatasourceTypeHandler { + + @Override + public String typeName() { + return BizDataSourceTypeEnum.CLICKHOUSE.getTypeName(); + } + + @Override + public String buildConnectionUrl(Datasource datasource) { + if (!hasRequiredConnectionFields(datasource)) { + return datasource.getConnectionUrl(); + } + return String.format("jdbc:clickhouse://%s:%d", datasource.getHost(), datasource.getPort()); + } + +}