diff --git a/.idea/icon.svg b/.idea/icon.svg
deleted file mode 100644
index b4d5df8343..0000000000
--- a/.idea/icon.svg
+++ /dev/null
@@ -1,1400 +0,0 @@
-
-
-
-
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index a6b68d70dc..35eb1ddfbb 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -1,17 +1,6 @@
-
-
-
-
\ No newline at end of file
diff --git a/CLAUDE.md b/CLAUDE.md
new file mode 100644
index 0000000000..07efc4682f
--- /dev/null
+++ b/CLAUDE.md
@@ -0,0 +1,87 @@
+# CLAUDE.md
+
+This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
+
+## Project Overview
+
+Dinky is a real-time data development platform based on Apache Flink, enabling agile data development, deployment and operation. It's a full-stack application with a Java/Spring Boot backend and React/TypeScript frontend.
+
+## Build and Development Commands
+
+### Backend (Java/Maven)
+- **Build**: `./mvnw clean package -Dmaven.test.skip=true -P aliyun,prod,web,flink-1.14`
+- **Build script**: `./build.sh` (uses Flink 1.14 profile by default)
+- **Test**: `./mvnw test`
+- **Format code**: `./mvnw spotless:apply`
+- **Check code style**: `./mvnw spotless:check`
+
+### Frontend (React/TypeScript)
+Navigate to `dinky-web/` directory:
+- **Development server**: `npm run dev` or `npm run start:dev`
+- **Build**: `npm run build`
+- **Lint**: `npm run lint`
+- **Format**: `npm run prettier`
+- **Type check**: `npm run tsc`
+
+### Profiles and Configurations
+- **Flink versions**: Supports Flink 1.14-1.19 via Maven profiles (`flink-1.14`, `flink-1.15`, etc.)
+- **Environment**: Use `-P dev` for development (compile scope) or `-P prod` for production (provided scope)
+- **Repositories**: Default uses Aliyun mirror (`-P aliyun`), can switch to Maven Central (`-P maven-central`)
+
+## Code Architecture
+
+### Backend Structure
+- **dinky-admin**: Main Spring Boot application entry point (`org.dinky.Dinky`)
+- **dinky-core**: Core execution engine and Flink integration
+- **dinky-flink**: Flink version-specific implementations (1.14-1.19)
+- **dinky-gateway**: Gateway and cluster management
+- **dinky-metadata**: Database metadata providers (MySQL, PostgreSQL, ClickHouse, etc.)
+- **dinky-cdc**: Change Data Capture functionality
+- **dinky-alert**: Alert system with multiple providers (DingTalk, WeChat, Email, etc.)
+- **dinky-web**: React frontend application
+
+### Frontend Structure
+- **DataStudio**: Main FlinkSQL development interface with editor, console, and results
+- **DevOps**: Job monitoring, metrics, and operations
+- **RegCenter**: Registration center for clusters, datasources, UDFs, etc.
+- **AuthCenter**: User management, roles, and permissions
+- **SettingCenter**: System configuration and settings
+
+### Key Components
+- **Executor Framework**: Abstracts different Flink execution modes (Local, Standalone, Yarn, Kubernetes)
+- **Multi-version Support**: Supports multiple Flink versions through modular architecture
+- **SQL Enhancement**: Extends FlinkSQL with custom statements (CDC, variables, etc.)
+- **Catalog Integration**: Supports various data catalogs and metadata discovery
+
+## Development Notes
+
+### Multi-module Maven Project
+- Root POM manages all module dependencies and versions
+- Each Flink version has its own module for compatibility
+- Uses dependency management for consistent versioning across modules
+
+### Frontend Technology Stack
+- **Framework**: React 18 + TypeScript + UMI 4
+- **UI Library**: Ant Design + Pro Components
+- **Editor**: Monaco Editor for SQL development
+- **Charts**: ECharts, G2, Ant Design Charts
+- **State Management**: Built-in UMI models
+
+### Code Quality
+- **Java**: Uses Spotless with Palantir Java format
+- **Frontend**: ESLint + Prettier configuration
+- **License**: All files must include Apache 2.0 license header
+
+## Testing
+
+- **Backend**: JUnit 5 + Mockito for unit tests
+- **Integration**: TestContainers for database testing
+- **Frontend**: Jest configuration available
+
+## Database Support
+
+Supports multiple databases with dedicated metadata modules:
+- MySQL, PostgreSQL (primary)
+- ClickHouse, Doris, StarRocks
+- Oracle, SQL Server
+- H2 (for testing/development)
\ No newline at end of file
diff --git a/dinky-admin/pom.xml b/dinky-admin/pom.xml
index e5868759ea..99e603df93 100644
--- a/dinky-admin/pom.xml
+++ b/dinky-admin/pom.xml
@@ -36,6 +36,10 @@
+
+
+
+
org.xerial
sqlite-jdbc
@@ -183,28 +187,28 @@
org.springframework.boot
spring-boot-starter-actuator
-
- org.dinky
- dinky-gateway
-
-
- org.dinky
- dinky-core
-
-
- cn.hutool
- hutool-crypto
-
-
- cn.hutool
- hutool-http
-
-
- cn.hutool
- hutool-json
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
org.dinky
dinky-daemon
@@ -225,10 +229,10 @@
org.dinky
dinky-alert-base
-
- org.dinky
- dinky-client-base
-
+
+
+
+
org.dinky
dinky-client-hadoop
diff --git a/dinky-admin/src/main/java/org/dinky/context/SqlGatewayWsContext.java b/dinky-admin/src/main/java/org/dinky/context/SqlGatewayWsContext.java
index fe1d142a53..c0639f754d 100644
--- a/dinky-admin/src/main/java/org/dinky/context/SqlGatewayWsContext.java
+++ b/dinky-admin/src/main/java/org/dinky/context/SqlGatewayWsContext.java
@@ -18,242 +18,242 @@
*/
package org.dinky.context;
-
-import org.dinky.crypto.CryptoComponent;
-import org.dinky.data.model.FragmentVariable;
-import org.dinky.executor.VariableManager;
-import org.dinky.gateway.SqlCliMode;
-import org.dinky.gateway.SqlClientOptions;
-import org.dinky.gateway.sqlgateway.cli.SqlClientAdapter;
-import org.dinky.utils.CloseUtil;
-import org.dinky.utils.FragmentVariableUtils;
-import org.dinky.utils.JsonUtils;
-import org.dinky.utils.LogUtil;
-import org.dinky.utils.SqlUtil;
-
-import java.io.IOException;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.nio.ByteBuffer;
-import java.sql.SQLException;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-
-import javax.websocket.OnClose;
-import javax.websocket.OnMessage;
-import javax.websocket.OnOpen;
-import javax.websocket.Session;
-import javax.websocket.server.ServerEndpoint;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-import cn.hutool.core.thread.ThreadUtil;
-import cn.hutool.db.Db;
-import cn.hutool.db.Entity;
-import cn.hutool.db.ds.simple.SimpleDataSource;
-import lombok.extern.slf4j.Slf4j;
-
-@Component
-@Slf4j
-@ServerEndpoint("/ws/sql-gateway/")
-public class SqlGatewayWsContext {
-
- private Session session;
-
- private SqlClientAdapter client;
-
- private PipedInputStream in2web;
-
- private long lastHeartTime = System.currentTimeMillis();
- private volatile boolean isRunning = true;
-
- private static String url;
- private static String username;
- private static String password;
- private static Db db;
-
- private static CryptoComponent cryptoComponent;
-
- /**
- * 最大化减少线程占用,默认线程为0,无最大限制,不保持线程,任务直接提交给线程
- * */
- private static final ExecutorService executor = ThreadUtil.newExecutor();
-
- private void startClient(SqlClientOptions options) {
- try {
- PipedInputStream in2client = new PipedInputStream();
-
- in2web = new PipedInputStream();
- PipedOutputStream clientWrite2web = new PipedOutputStream(in2web);
- clientWrite2web.write("Dinky Sql Client\n".getBytes());
-
- client = new SqlClientAdapter(in2client, clientWrite2web, options);
-
- executor.execute(() -> {
- try {
- log.info("Sql Client Start : " + options.getConnectAddress());
- client.startClient();
- } catch (Exception e) {
- sendError(e);
- }
- });
- executor.execute(() -> {
- while (isRunning) {
- try {
- int data;
- byte[] bytes = new byte[1024];
- while ((data = in2web.read(bytes)) != -1) {
- session.getBasicRemote().sendBinary(ByteBuffer.wrap(bytes, 0, data));
- }
- log.info("Sql Client Read Terminal Thread Closed :" + options.getConnectAddress());
- onClose();
- } catch (IOException e) {
- log.error("sql client receive error", e);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException interruptedException) {
- log.error("Sql Client Thread Interrupted Error: ", e);
- }
- }
- }
- });
- } catch (Exception e) {
- sendError(e);
- }
- log.info("Sql Client Start Success : " + options.getConnectAddress());
- }
-
- private void sendError(Throwable err) {
- try {
- log.error("send error to client", err);
- ByteBuffer byteBuffer = ByteBuffer.wrap(LogUtil.getError(err).getBytes());
- session.getBasicRemote().sendBinary(byteBuffer);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @OnOpen
- public void onOpen(Session session) throws UnsupportedEncodingException, SQLException {
- this.session = session;
-
- String cols = getParameter("cols", true);
- String rows = getParameter("rows", true);
- SqlClientOptions.TerminalSize size =
- new SqlClientOptions.TerminalSize(Integer.parseInt(cols), Integer.parseInt(rows));
-
- if (db == null) {
- db = Db.use(new SimpleDataSource(url, username, password));
- }
- Entity option = Entity.create("dinky_fragment").set("enabled", true);
- List entities = db.find(option, FragmentVariable.class);
- Map variableMap = new LinkedHashMap<>();
- if (entities != null) {
- for (FragmentVariable variable : entities) {
- if (FragmentVariableUtils.isSensitive(variable.getName()) && variable.getFragmentValue() != null) {
- variableMap.put(variable.getName(), cryptoComponent.decryptText(variable.getFragmentValue()));
- } else {
- variableMap.put(variable.getName(), variable.getFragmentValue());
- }
- }
- }
- VariableManager variableManager = new VariableManager();
- variableManager.registerVariable(variableMap);
- String initSql = URLDecoder.decode(getParameter("initSql"), "UTF-8");
- initSql = SqlUtil.removeNote(initSql);
- initSql = variableManager.parseVariable(initSql);
-
- SqlClientOptions options = SqlClientOptions.builder()
- .mode(SqlCliMode.fromString(getParameter("mode", true)))
- .sessionId(getParameter("sessionId"))
- .connectAddress(getParameter("connectAddress", true))
- .initSql(initSql)
- .historyFilePath("./tmp/flink-sql-history/history")
- .terminalSize(size)
- .build();
-
- startClient(options);
-
- executor.execute(() -> {
- while (isRunning) {
- try {
- Thread.sleep(1000);
- if (System.currentTimeMillis() - lastHeartTime > 1000 * 60) {
- onClose();
- }
- } catch (Exception e) {
- log.error("SQl Client Heart Thread Error: ", e);
- }
- }
- log.info("Sql Client Heart Thread Closed :");
- });
- }
-
- @OnClose
- public void onClose() {
- isRunning = false;
- CloseUtil.closeNoErrorPrint(client, in2web, session);
- }
-
- @OnMessage
- public void onMessage(String messages) {
- SqlClientAdapter.WsEvent wsEvent = JsonUtils.parseObject(messages, SqlClientAdapter.WsEvent.class);
- if (wsEvent == null) {
- throw new RuntimeException("parse wsEvent error");
- } else {
- SqlClientAdapter.WsEvent.EventType eventType =
- SqlClientAdapter.WsEvent.EventType.getEventType(wsEvent.getType());
- if (eventType == SqlClientAdapter.WsEvent.EventType.TERM_HEART_EVENT) {
- lastHeartTime = System.currentTimeMillis();
- } else {
- try {
- client.onMessage(wsEvent, eventType);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- private String getParameter(String key) {
- return getParameter(key, false);
- }
-
- private String getParameter(String key, boolean required) {
- List list = session.getRequestParameterMap().get(key);
- if (list == null || list.size() == 0) {
- if (required) {
- throw new RuntimeException("parameter " + key + " is required");
- } else {
- return "";
- }
- }
- return list.get(0);
- }
-
- @Autowired
- public void setCryptoComponent(CryptoComponent cryptoComponent) {
- SqlGatewayWsContext.cryptoComponent = cryptoComponent;
- }
-
- @Value("${spring.datasource.url}")
- public void setUrl(String url) {
- SqlGatewayWsContext.url = url;
- }
-
- @Value("${spring.datasource.username}")
- public void setUsername(String username) {
- SqlGatewayWsContext.username = username;
- }
-
- @Value("${spring.datasource.password}")
- public void setPassword(String password) {
- SqlGatewayWsContext.password = password;
- }
-}
+//
+//import org.dinky.crypto.CryptoComponent;
+//import org.dinky.data.model.FragmentVariable;
+//import org.dinky.executor.VariableManager;
+//import org.dinky.gateway.SqlCliMode;
+//import org.dinky.gateway.SqlClientOptions;
+//import org.dinky.gateway.sqlgateway.cli.SqlClientAdapter;
+//import org.dinky.utils.CloseUtil;
+//import org.dinky.utils.FragmentVariableUtils;
+//import org.dinky.utils.JsonUtils;
+//import org.dinky.utils.LogUtil;
+//import org.dinky.utils.SqlUtil;
+//
+//import java.io.IOException;
+//import java.io.PipedInputStream;
+//import java.io.PipedOutputStream;
+//import java.io.UnsupportedEncodingException;
+//import java.net.URLDecoder;
+//import java.nio.ByteBuffer;
+//import java.sql.SQLException;
+//import java.util.LinkedHashMap;
+//import java.util.List;
+//import java.util.Map;
+//import java.util.concurrent.ExecutorService;
+//
+//import javax.websocket.OnClose;
+//import javax.websocket.OnMessage;
+//import javax.websocket.OnOpen;
+//import javax.websocket.Session;
+//import javax.websocket.server.ServerEndpoint;
+//
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.beans.factory.annotation.Value;
+//import org.springframework.stereotype.Component;
+//
+//import cn.hutool.core.thread.ThreadUtil;
+//import cn.hutool.db.Db;
+//import cn.hutool.db.Entity;
+//import cn.hutool.db.ds.simple.SimpleDataSource;
+//import lombok.extern.slf4j.Slf4j;
+//
+//@Component
+//@Slf4j
+//@ServerEndpoint("/ws/sql-gateway/")
+//public class SqlGatewayWsContext {
+//
+// private Session session;
+//
+// private SqlClientAdapter client;
+//
+// private PipedInputStream in2web;
+//
+// private long lastHeartTime = System.currentTimeMillis();
+// private volatile boolean isRunning = true;
+//
+// private static String url;
+// private static String username;
+// private static String password;
+// private static Db db;
+//
+// private static CryptoComponent cryptoComponent;
+//
+// /**
+// * 最大化减少线程占用,默认线程为0,无最大限制,不保持线程,任务直接提交给线程
+// * */
+// private static final ExecutorService executor = ThreadUtil.newExecutor();
+//
+// private void startClient(SqlClientOptions options) {
+// try {
+// PipedInputStream in2client = new PipedInputStream();
+//
+// in2web = new PipedInputStream();
+// PipedOutputStream clientWrite2web = new PipedOutputStream(in2web);
+// clientWrite2web.write("Dinky Sql Client\n".getBytes());
+//
+// client = new SqlClientAdapter(in2client, clientWrite2web, options);
+//
+// executor.execute(() -> {
+// try {
+// log.info("Sql Client Start : " + options.getConnectAddress());
+// client.startClient();
+// } catch (Exception e) {
+// sendError(e);
+// }
+// });
+// executor.execute(() -> {
+// while (isRunning) {
+// try {
+// int data;
+// byte[] bytes = new byte[1024];
+// while ((data = in2web.read(bytes)) != -1) {
+// session.getBasicRemote().sendBinary(ByteBuffer.wrap(bytes, 0, data));
+// }
+// log.info("Sql Client Read Terminal Thread Closed :" + options.getConnectAddress());
+// onClose();
+// } catch (IOException e) {
+// log.error("sql client receive error", e);
+// try {
+// Thread.sleep(1000);
+// } catch (InterruptedException interruptedException) {
+// log.error("Sql Client Thread Interrupted Error: ", e);
+// }
+// }
+// }
+// });
+// } catch (Exception e) {
+// sendError(e);
+// }
+// log.info("Sql Client Start Success : " + options.getConnectAddress());
+// }
+//
+// private void sendError(Throwable err) {
+// try {
+// log.error("send error to client", err);
+// ByteBuffer byteBuffer = ByteBuffer.wrap(LogUtil.getError(err).getBytes());
+// session.getBasicRemote().sendBinary(byteBuffer);
+// } catch (IOException e) {
+// throw new RuntimeException(e);
+// }
+// }
+//
+// @OnOpen
+// public void onOpen(Session session) throws UnsupportedEncodingException, SQLException {
+// this.session = session;
+//
+// String cols = getParameter("cols", true);
+// String rows = getParameter("rows", true);
+// SqlClientOptions.TerminalSize size =
+// new SqlClientOptions.TerminalSize(Integer.parseInt(cols), Integer.parseInt(rows));
+//
+// if (db == null) {
+// db = Db.use(new SimpleDataSource(url, username, password));
+// }
+// Entity option = Entity.create("dinky_fragment").set("enabled", true);
+// List entities = db.find(option, FragmentVariable.class);
+// Map variableMap = new LinkedHashMap<>();
+// if (entities != null) {
+// for (FragmentVariable variable : entities) {
+// if (FragmentVariableUtils.isSensitive(variable.getName()) && variable.getFragmentValue() != null) {
+// variableMap.put(variable.getName(), cryptoComponent.decryptText(variable.getFragmentValue()));
+// } else {
+// variableMap.put(variable.getName(), variable.getFragmentValue());
+// }
+// }
+// }
+// VariableManager variableManager = new VariableManager();
+// variableManager.registerVariable(variableMap);
+// String initSql = URLDecoder.decode(getParameter("initSql"), "UTF-8");
+// initSql = SqlUtil.removeNote(initSql);
+// initSql = variableManager.parseVariable(initSql);
+//
+// SqlClientOptions options = SqlClientOptions.builder()
+// .mode(SqlCliMode.fromString(getParameter("mode", true)))
+// .sessionId(getParameter("sessionId"))
+// .connectAddress(getParameter("connectAddress", true))
+// .initSql(initSql)
+// .historyFilePath("./tmp/flink-sql-history/history")
+// .terminalSize(size)
+// .build();
+//
+// startClient(options);
+//
+// executor.execute(() -> {
+// while (isRunning) {
+// try {
+// Thread.sleep(1000);
+// if (System.currentTimeMillis() - lastHeartTime > 1000 * 60) {
+// onClose();
+// }
+// } catch (Exception e) {
+// log.error("SQl Client Heart Thread Error: ", e);
+// }
+// }
+// log.info("Sql Client Heart Thread Closed :");
+// });
+// }
+//
+// @OnClose
+// public void onClose() {
+// isRunning = false;
+// CloseUtil.closeNoErrorPrint(client, in2web, session);
+// }
+//
+// @OnMessage
+// public void onMessage(String messages) {
+// SqlClientAdapter.WsEvent wsEvent = JsonUtils.parseObject(messages, SqlClientAdapter.WsEvent.class);
+// if (wsEvent == null) {
+// throw new RuntimeException("parse wsEvent error");
+// } else {
+// SqlClientAdapter.WsEvent.EventType eventType =
+// SqlClientAdapter.WsEvent.EventType.getEventType(wsEvent.getType());
+// if (eventType == SqlClientAdapter.WsEvent.EventType.TERM_HEART_EVENT) {
+// lastHeartTime = System.currentTimeMillis();
+// } else {
+// try {
+// client.onMessage(wsEvent, eventType);
+// } catch (IOException e) {
+// throw new RuntimeException(e);
+// }
+// }
+// }
+// }
+//
+// private String getParameter(String key) {
+// return getParameter(key, false);
+// }
+//
+// private String getParameter(String key, boolean required) {
+// List list = session.getRequestParameterMap().get(key);
+// if (list == null || list.size() == 0) {
+// if (required) {
+// throw new RuntimeException("parameter " + key + " is required");
+// } else {
+// return "";
+// }
+// }
+// return list.get(0);
+// }
+//
+// @Autowired
+// public void setCryptoComponent(CryptoComponent cryptoComponent) {
+// SqlGatewayWsContext.cryptoComponent = cryptoComponent;
+// }
+//
+// @Value("${spring.datasource.url}")
+// public void setUrl(String url) {
+// SqlGatewayWsContext.url = url;
+// }
+//
+// @Value("${spring.datasource.username}")
+// public void setUsername(String username) {
+// SqlGatewayWsContext.username = username;
+// }
+//
+// @Value("${spring.datasource.password}")
+// public void setPassword(String password) {
+// SqlGatewayWsContext.password = password;
+// }
+//}
diff --git a/dinky-admin/src/main/java/org/dinky/controller/DownloadController.java b/dinky-admin/src/main/java/org/dinky/controller/DownloadController.java
index 65794a7fb3..023bf04daa 100644
--- a/dinky-admin/src/main/java/org/dinky/controller/DownloadController.java
+++ b/dinky-admin/src/main/java/org/dinky/controller/DownloadController.java
@@ -25,7 +25,6 @@
import org.dinky.data.model.FlinkUdfManifest;
import org.dinky.function.constant.PathConstant;
import org.dinky.function.util.ZipWriter;
-import org.dinky.resource.BaseResourceManager;
import java.io.File;
import java.io.InputStream;
@@ -118,7 +117,8 @@ public void downloadAppJar(@PathVariable String version, HttpServletResponse res
@GetMapping("downloadFromRs")
@ApiOperation("Download From Resource")
public void downloadJavaUDF(String path, HttpServletResponse resp) {
- InputStream inputStream = BaseResourceManager.getInstance().readFile(path);
- ServletUtil.write(resp, inputStream);
+ // TODO: 2024/3/31
+ // InputStream inputStream = BaseResourceManager.getInstance().readFile(path);
+ // ServletUtil.write(resp, inputStream);
}
}
diff --git a/dinky-admin/src/main/java/org/dinky/controller/FlinkController.java b/dinky-admin/src/main/java/org/dinky/controller/FlinkController.java
index f50e2178f2..cfcdd96d35 100644
--- a/dinky-admin/src/main/java/org/dinky/controller/FlinkController.java
+++ b/dinky-admin/src/main/java/org/dinky/controller/FlinkController.java
@@ -22,7 +22,8 @@
import org.dinky.data.model.CheckPointReadTable;
import org.dinky.data.result.Result;
import org.dinky.data.vo.CascaderVO;
-import org.dinky.flink.checkpoint.CheckpointRead;
+import org.dinky.job.JobConfig;
+import org.dinky.job.JobManager;
import org.dinky.service.FlinkService;
import java.util.List;
@@ -46,13 +47,12 @@
@RequiredArgsConstructor
public class FlinkController {
- protected static final CheckpointRead INSTANCE = new CheckpointRead();
private final FlinkService flinkService;
@GetMapping("/readCheckPoint")
@ApiOperation("Read Checkpoint")
public Result