Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions data-agent-management/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,13 @@
<artifactId>docker-java-transport-zerodep</artifactId>
</dependency>

<!-- clickhouse -->
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.9.7</version>
<scope>compile</scope>
</dependency>
</dependencies>
<repositories>
<repository>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2024-2026 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.ai.dataagent.connector.impls.clickhouse;

import com.alibaba.cloud.ai.dataagent.connector.accessor.AbstractAccessor;
import com.alibaba.cloud.ai.dataagent.connector.ddl.DdlFactory;
import com.alibaba.cloud.ai.dataagent.connector.pool.DBConnectionPoolFactory;
import com.alibaba.cloud.ai.dataagent.enums.BizDataSourceTypeEnum;
import org.springframework.stereotype.Service;

/**
* @author yuluo
* @author <a href="mailto:yuluo08290126@gmail.com">yuluo</a>
*/

@Service("clickhouseAccessor")
public class ClickhouseDBAccessor extends AbstractAccessor {

private final static String ACCESSOR_TYPE = "Clickhouse_Accessor";

protected ClickhouseDBAccessor(DdlFactory ddlFactory, DBConnectionPoolFactory poolFactory) {

super(ddlFactory, poolFactory.getPoolByDbType(BizDataSourceTypeEnum.CLICKHOUSE.getTypeName()));
}

@Override
public String getAccessorType() {
return ACCESSOR_TYPE;
}

@Override
public boolean supportedDataSourceType(String type) {
return BizDataSourceTypeEnum.CLICKHOUSE.getTypeName().equalsIgnoreCase(type);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2024-2026 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.ai.dataagent.connector.impls.clickhouse;

import com.alibaba.cloud.ai.dataagent.connector.pool.AbstractDBConnectionPool;
import com.alibaba.cloud.ai.dataagent.enums.BizDataSourceTypeEnum;
import com.alibaba.cloud.ai.dataagent.enums.ErrorCodeEnum;
import org.springframework.stereotype.Service;

import static com.alibaba.cloud.ai.dataagent.enums.ErrorCodeEnum.*;

@Service("clickhouseJdbcConnectionPool")
public class ClickhouseJdbcConnectionPool extends AbstractDBConnectionPool {

private final static String DRIVER = "com.clickhouse.jdbc.ClickHouseDriver";

@Override
public String getDriver() {
return DRIVER;
}

@Override
public ErrorCodeEnum errorMapping(String sqlState) {

ErrorCodeEnum ret = ErrorCodeEnum.fromCode(sqlState);
if (ret != null) {
return ret;
}

return switch (sqlState) {
case "08S01" -> DATASOURCE_CONNECTION_FAILURE_08S01;
case "28000" -> PASSWORD_ERROR_28000;
case "42000" -> DATABASE_NOT_EXIST_42000;
default -> OTHERS;
};
}

@Override
public boolean supportedDataSourceType(String type) {
return BizDataSourceTypeEnum.CLICKHOUSE.getTypeName().equals(type);
}

@Override
public String getConnectionPoolType() {
return BizDataSourceTypeEnum.CLICKHOUSE.getTypeName();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* Copyright 2024-2026 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.ai.dataagent.connector.impls.clickhouse;

import com.alibaba.cloud.ai.dataagent.bo.schema.*;
import com.alibaba.cloud.ai.dataagent.connector.SqlExecutor;
import com.alibaba.cloud.ai.dataagent.connector.ddl.AbstractJdbcDdl;
import com.alibaba.cloud.ai.dataagent.enums.BizDataSourceTypeEnum;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static com.alibaba.cloud.ai.dataagent.util.ColumnTypeUtil.wrapType;

@Service
public class ClickhouseJdbcDdl extends AbstractJdbcDdl {

@Override
public List<DatabaseInfoBO> showDatabases(Connection connection) {
String sql = "show databases;";
List<DatabaseInfoBO> databaseInfoList = Lists.newArrayList();
try {
String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, sql);
if (resultArr.length <= 1) {
return Lists.newArrayList();
}

for (int i = 1; i < resultArr.length; i++) {
if (resultArr[i].length == 0) {
continue;
}
String database = resultArr[i][0];
databaseInfoList.add(DatabaseInfoBO.builder().name(database).build());
}
}
catch (SQLException e) {
throw new RuntimeException(e);
}

return databaseInfoList;
}

@Override
public List<SchemaInfoBO> showSchemas(Connection connection) {
return Collections.emptyList();
}

@Override
public List<TableInfoBO> showTables(Connection connection, String schema, String tablePattern) {
String sql = "SELECT name, comment \n" + "FROM system.tables \n"
+ "WHERE database = '%s' \n";
if (StringUtils.isNotBlank(tablePattern)) {
sql += "AND name LIKE CONCAT('%%','%s','%%') \n";
}
sql += "limit 2000;";
List<TableInfoBO> tableInfoList = Lists.newArrayList();
try {
String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection,
String.format(sql, schema, tablePattern));
if (resultArr.length <= 1) {
return Lists.newArrayList();
}

for (int i = 1; i < resultArr.length; i++) {
if (resultArr[i].length == 0) {
continue;
}
String tableName = resultArr[i][0];
String tableDesc = resultArr[i][1];
tableInfoList.add(TableInfoBO.builder().schema(schema).name(tableName).description(tableDesc).build());
}
}
catch (SQLException e) {
throw new RuntimeException(e);
}

return tableInfoList;
}

@Override
public List<TableInfoBO> fetchTables(Connection connection, String schema, List<String> tables) {
String sql = "SELECT name, comment \n" + "FROM system.tables \n"
+ "WHERE database = '%s' \n" + "AND name in(%s) \n" + "limit 200;";
List<TableInfoBO> tableInfoList = Lists.newArrayList();
String tableListStr = String.join(", ", tables.stream().map(x -> "'" + x + "'").collect(Collectors.toList()));
try {
String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection,
String.format(sql, schema, tableListStr));
if (resultArr.length <= 1) {
return Lists.newArrayList();
}

for (int i = 1; i < resultArr.length; i++) {
if (resultArr[i].length == 0) {
continue;
}
String tableName = resultArr[i][0];
String tableDesc = resultArr[i][1];
tableInfoList.add(TableInfoBO.builder().name(tableName).description(tableDesc).build());
}
}
catch (SQLException e) {
throw new RuntimeException(e);
}

return tableInfoList;
}

@Override
public List<ColumnInfoBO> showColumns(Connection connection, String schema, String table) {
String sql = "SELECT a.name, a.comment, a.type, "
+ "IF(a.is_in_primary_key=1,'true','false') AS `主键唯一`,"
+ "IF(b.is_nullable ='0','true','false') AS `非空` "
+ "FROM system.columns a "
+ "inner join information_schema.COLUMNS b on a.database=b.TABLE_SCHEMA and a.table=b.TABLE_NAME and a.name=b.COLUMN_NAME "
+ "WHERE a.database='%s' "
+ "and a.table='%s';";
List<ColumnInfoBO> columnInfoList = Lists.newArrayList();
try {
String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, null,
String.format(sql, schema, table));
if (resultArr.length <= 1) {
return Lists.newArrayList();
}

for (int i = 1; i < resultArr.length; i++) {
if (resultArr[i].length == 0) {
continue;
}
columnInfoList.add(ColumnInfoBO.builder()
.name(resultArr[i][0])
.description(resultArr[i][1])
.type(wrapType(resultArr[i][2]))
.primary(BooleanUtils.toBoolean(resultArr[i][3]))
.notnull(BooleanUtils.toBoolean(resultArr[i][4]))
.build());
}
}
catch (SQLException e) {
throw new RuntimeException(e);
}

return columnInfoList;
}

@Override
public List<ForeignKeyInfoBO> showForeignKeys(Connection connection, String schema, List<String> tables) {
String sql = "SELECT \n" + " TABLE_NAME AS `表名`,\n" + " COLUMN_NAME AS `列名`,\n"
+ " CONSTRAINT_NAME AS `约束名`,\n" + " REFERENCED_TABLE_NAME AS `引用表名`,\n"
+ " REFERENCED_COLUMN_NAME AS `引用列名`\n" + "FROM \n" + " INFORMATION_SCHEMA.KEY_COLUMN_USAGE\n"
+ "WHERE \n" + " CONSTRAINT_SCHEMA = '%s' " + " AND CONSTRAINT_NAME != 'PRIMARY'"
+ " AND TABLE_NAME in(%s)\n" + " AND REFERENCED_TABLE_NAME in (%s);";
List<ForeignKeyInfoBO> foreignKeyInfoList = Lists.newArrayList();
String tableListStr = String.join(", ", tables.stream().map(x -> "'" + x + "'").collect(Collectors.toList()));

try {
sql = String.format(sql, schema, tableListStr, tableListStr);
String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, null, sql);
if (resultArr.length <= 1) {
return Lists.newArrayList();
}

for (int i = 1; i < resultArr.length; i++) {
if (resultArr[i].length == 0) {
continue;
}
foreignKeyInfoList.add(ForeignKeyInfoBO.builder()
.table(resultArr[i][0])
.column(resultArr[i][1])
.referencedTable(resultArr[i][3])
.referencedColumn(resultArr[i][4])
.build());
}
}
catch (SQLException e) {
throw new RuntimeException(e);
}

return foreignKeyInfoList;
}

@Override
public List<String> sampleColumn(Connection connection, String schema, String table, String column) {
String sql = "SELECT \n" + " `%s` \n" + "FROM \n" + " `%s`.`%s`\n" + "LIMIT 99;";
List<String> sampleInfo = Lists.newArrayList();
try {
sql = String.format(sql, column, schema, table);
String[][] resultArr = SqlExecutor.executeSqlAndReturnArr(connection, null, sql);
if (resultArr.length <= 1) {
return Lists.newArrayList();
}

for (int i = 1; i < resultArr.length; i++) {
if (resultArr[i].length == 0 || column.equalsIgnoreCase(resultArr[i][0])) {
continue;
}
sampleInfo.add(resultArr[i][0]);
}
}
catch (SQLException e) {
// throw new RuntimeException(e);
}

Set<String> siSet = sampleInfo.stream().collect(Collectors.toSet());
sampleInfo = siSet.stream().collect(Collectors.toList());
return sampleInfo;
}

@Override
public ResultSetBO scanTable(Connection connection, String schema, String table) {
String sql = "SELECT *\n" + "FROM \n" + " `%s`.`%s`\n" + "LIMIT 20;";
ResultSetBO resultSet = ResultSetBO.builder().build();
try {
resultSet = SqlExecutor.executeSqlAndReturnObject(connection, schema, String.format(sql, schema, table));
}
catch (SQLException e) {
throw new RuntimeException(e);
}
return resultSet;
}

@Override
public BizDataSourceTypeEnum getDataSourceType() {
return BizDataSourceTypeEnum.CLICKHOUSE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public ApiResponse<List<DatasourceTypeDTO>> getDatasourceTypes() {
// 定义标准的 JDBC 数据源类型
List<BizDataSourceTypeEnum> standardTypes = Arrays.asList(BizDataSourceTypeEnum.MYSQL,
BizDataSourceTypeEnum.POSTGRESQL, BizDataSourceTypeEnum.DAMENG, BizDataSourceTypeEnum.SQL_SERVER,
BizDataSourceTypeEnum.ORACLE, BizDataSourceTypeEnum.HIVE);
BizDataSourceTypeEnum.ORACLE, BizDataSourceTypeEnum.HIVE, BizDataSourceTypeEnum.CLICKHOUSE);

List<DatasourceTypeDTO> types = standardTypes.stream()
.map(type -> DatasourceTypeDTO.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public enum BizDataSourceTypeEnum {
* Hive database
*/
HIVE(8, "hive", DatabaseDialectEnum.HIVE.getCode(), DbAccessTypeEnum.JDBC.getCode()),
CLICKHOUSE(9, "clickhouse", DatabaseDialectEnum.CLICKHOUSE.getCode(), DbAccessTypeEnum.JDBC.getCode()),

HOLOGRESS(10, "hologress", DatabaseDialectEnum.POSTGRESQL.getCode(), DbAccessTypeEnum.JDBC.getCode()),

Expand Down
Loading