Skip to content
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
ca2d091
feat:Add metalake configuration option in seatunnel-env.sh and seatun…
wtybxqm Jul 20, 2025
8ceffc9
Feat:Fetch metalake config from environment and merge into task's env
wtybxqm Jul 22, 2025
b5b3cc0
feat:Fetch metalake config by sourceId and replace placeholders when …
wtybxqm Jul 28, 2025
d4ec880
feat:Add MetalakeClient interface & GravitinoClient implementation; I…
wtybxqm Aug 10, 2025
d2a84c7
feat:apply spotless code style
wtybxqm Aug 21, 2025
f32ac80
feat: replace wildcard imports with explicit imports for okhttp3
wtybxqm Aug 21, 2025
d545047
fix: delete useless EnvOption
wtybxqm Aug 22, 2025
84491da
feat: Integration Test for Metalake
wtybxqm Aug 25, 2025
768815b
feat: Integration Test
wtybxqm Aug 27, 2025
a6d2327
feat: apply spotless code style
wtybxqm Aug 27, 2025
a1148f7
fix: fix the error of config structure
wtybxqm Aug 27, 2025
c0b304e
feat: place okhttp3 with apache httpclient
wtybxqm Aug 27, 2025
d66778e
fix: fix the bug of matainfo replace and remove log info
wtybxqm Aug 28, 2025
b741052
feat: apply spotless code style and remove extra code
wtybxqm Aug 28, 2025
50d0158
feat: Add docs of Metalake in zh and en
wtybxqm Aug 29, 2025
bd027c7
Merge branch 'dev' into support_metalake_development
wtybxqm Sep 1, 2025
84a2b0b
push an empty commit to trigger the workflow
wtybxqm Sep 1, 2025
1acfb14
fix: add license header to conf file
wtybxqm Sep 1, 2025
cfeb84a
feat: download gravitino in test container
wtybxqm Sep 2, 2025
ec25bb0
feat: apply spotless codestyle and remove useless code
wtybxqm Sep 2, 2025
1b42050
feat: support metalake for spark and flink engine; use assert connect…
wtybxqm Sep 6, 2025
d67545f
fix: fix the error of the assert connector
wtybxqm Sep 7, 2025
4013ed8
fix: move metalakeIT to seatunnel-connector-v2-e2e
wtybxqm Sep 7, 2025
d0b1ff2
fix: apply spotless code style
wtybxqm Sep 8, 2025
618fa28
fix: remove metalakeIT in seatunnel-engine-e2e
wtybxqm Sep 8, 2025
dc45daa
feat: add metalake integration test for spark and flink engine
wtybxqm Sep 9, 2025
73a4d00
fix: download gravitino in flink container
wtybxqm Sep 9, 2025
b528a98
fix: move the docs to concept directory; remove extra test cases for …
wtybxqm Sep 10, 2025
b26243a
Merge branch 'dev' into support_metalake_development
wtybxqm Sep 11, 2025
4f3f006
fix: add httpcore dependency in known-dependencies.txt
wtybxqm Sep 11, 2025
bba310a
fix: reuse PlaceholderUtils and refactor the getMetalakeConfig method…
wtybxqm Sep 13, 2025
72a7538
fix: add license header
wtybxqm Sep 13, 2025
46d19f7
fix: add capture group in pattern matcher
wtybxqm Sep 13, 2025
8c96119
fix: refactor MetalakeConfigUtils and use PlaceholderUtils and JsonUtils
wtybxqm Sep 15, 2025
4df31f8
fix: unify the version of httpcore
wtybxqm Sep 17, 2025
0f3363a
fix: remove extra version of httpcore in kwown-dependencies.txt
wtybxqm Sep 17, 2025
dffb3b6
feat: get variables from env before from system
wtybxqm Sep 18, 2025
5bb154f
feat: refactor getMetalakeConfig method and support transform
wtybxqm Sep 19, 2025
dde368c
fix: modify the arg name of replacePlaceholders method and refactor g…
wtybxqm Sep 20, 2025
2362402
fix: create metalakeClient only once in getMetalakeConfig method
wtybxqm Sep 21, 2025
788bfda
fix: add metalake in sidebar.js of docs
wtybxqm Sep 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion config/seatunnel-env.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,13 @@ REM Home directory of spark distribution.
if "%SPARK_HOME%" == "" set "SPARK_HOME=C:\Program Files\spark"

REM Home directory of flink distribution.
if "%FLINK_HOME%" == "" set "FLINK_HOME=C:\Program Files\flink"
if "%FLINK_HOME%" == "" set "FLINK_HOME=C:\Program Files\flink"

REM Whether to enable metalake (true/false).
if "%METALAKE_ENABLED%" == "" set "META_LAKE_ENABLED=false"

REM Type of metalake implementation.
if "%METALAKE_TYPE%" == "" set "METALAKE_TYPE=gravitino"

REM Metalake service URL, format: http://host:port/api/metalakes/{metalake_name}/catalogs/
if "%METALAKE_URL%" == "" set "METALAKE_URL=http://localhost:8090/api/metalakes/default_metalake_name/catalogs/"
6 changes: 6 additions & 0 deletions config/seatunnel-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,9 @@
SPARK_HOME=${SPARK_HOME:-/opt/spark}
# Home directory of flink distribution.
FLINK_HOME=${FLINK_HOME:-/opt/flink}
# Whether to enable metalake (true/false).
METALAKE_ENABLED=${METALAKE_ENABLED:-false}
# Type of metalake implementation.
METALAKE_TYPE=${METALAKE_TYPE:-gravitino}
# Metalake service URL, format: http://host:port/api/metalakes/{metalake_name}/catalogs/.
METALAKE_URL=${METALAKE_URL:-http://localhost:8090/api/metalakes/default_metalake_name/catalogs/}
69 changes: 69 additions & 0 deletions docs/en/concept/metalake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# METALAKE

Since Seatunnel requires database usernames, passwords, and other sensitive information to be written in plaintext within scripts when executing tasks, this may lead to information leakage and is also difficult to maintain. When data source information changes, manual modifications are often required.

To address this, Metalake is introduced. Data source information can be stored in Metalake systems such as Apache Gravitino. Task scripts then use `sourceId` and placeholders instead of actual usernames and passwords. At runtime, the Seatunnel engine retrieves the information from Metalake via HTTP requests and replaces the placeholders accordingly.

To enable Metalake, you first need to modify the environment variables in **seatunnel-env.sh**:

* `METALAKE_ENABLED`
* `METALAKE_TYPE`
* `METALAKE_URL`

Set `METALAKE_ENABLED` to `true`. Currently, `METALAKE_TYPE` only supports `gravitino`.

For Apache Gravitino, set `METALAKE_URL` to:

```
http://host:port/api/metalakes/your_metalake_name/catalogs/
```

---

## Usage Example

First, create a catalog in Gravitino, for example:

```bash
curl -L 'http://localhost:8090/api/metalakes/test_metalake/catalogs' \
-H 'Content-Type: application/json' \
-H 'Accept: application/vnd.gravitino.v1+json' \
-d '{
"name": "test_catalog",
"type": "relational",
"provider": "jdbc-mysql",
"comment": "for metalake test",
"properties": {
"jdbc-driver": "com.mysql.cj.jdbc.Driver",
"jdbc-url": "not used",
"jdbc-user": "root",
"jdbc-password": "Abc!@#135_seatunnel"
}
}'
```

This creates a `test_catalog` under `test_metalake` (note: `metalake` itself must be created in advance).

Thus, `METALAKE_URL` can be set to:

```
http://localhost:8090/api/metalakes/test_metalake/catalogs/
```

You can then define the source as:

```hocon
source {
Jdbc {
url = "jdbc:mysql://mysql-e2e:3306/seatunnel?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true"
driver = "${jdbc-driver}"
connection_check_timeout_sec = 100
sourceId = "test_catalog"
user = "${jdbc-user}"
password = "${jdbc-password}"
query = "select * from source"
}
}
```

Here, `sourceId` refers to the catalog name, allowing other fields to use `${}` placeholders. At runtime, they will be automatically replaced. Note that in sinks, the same `sourceId` name is used, and placeholders must always start with `${` and end with `}`. Each item can contain at most one placeholder, and there can be content outside the placeholder as well.
69 changes: 69 additions & 0 deletions docs/zh/concept/metalake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# METALAKE

由于Seatunnel在执行任务时,需要将数据库用户名与密码等隐私信息明文写在脚本中,可能会导致信息泄露;并且维护较为困难,数据源信息发生变更时可能需要手动更改。

因此引入了metalake,将数据源的信息存储于Apache Gravitino等metalake中,任务脚本采用`sourceId`和占位符的方法来代替原本的用户名和密码等信息,运行时seatunnel-engine通过http请求从metalake获取信息,根据占位符进行替换。

若要使用metalake,首先要修改**seatunnel-env.sh**中的环境变量:

* `METALAKE_ENABLED`
* `METALAKE_TYPE`
* `METALAKE_URL`

将`METALAKE_ENABLED`设为`true`,`METALAKE_TYPE`当前仅支持设为`gravitino`。

对于Apache Gravitino,`METALAKE_URL`设为

```
http://host:port/api/metalakes/your_metalake_name/catalogs/
```

---

## 使用示例:

用户需要先在Gravitino中创建catalog,如

```bash
curl -L 'http://localhost:8090/api/metalakes/test_metalake/catalogs'
-H 'Content-Type: application/json'
-H 'Accept: application/vnd.gravitino.v1+json'
-d '{
"name": "test_catalog",
"type": "relational",
"provider": "jdbc-mysql",
"comment": "for metalake test",
"properties": {
"jdbc-driver": "com.mysql.cj.jdbc.Driver",
"jdbc-url": "not used",
"jdbc-user": "root",
"jdbc-password": "Abc!@#135_seatunnel"
}
}'
```

这样便在`test_metalake`中创建了一个`test_catalog`(`metalake`需要提前创建)

于是`METALAKE_URL`可以设为

```
http://localhost:8090/api/metalakes/test_metalake/catalogs/
```

source可以写为

```
source {
Jdbc {
url = "jdbc:mysql://mysql-e2e:3306/seatunnel?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true"
driver = "${jdbc-driver}"
connection_check_timeout_sec = 100
sourceId = "test_catalog"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use sourceId not source_name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source_name is the id of the catalog in Apache Gravitino, but maybe source_name is not used in other metalake type

user = "${jdbc-user}"
password = "${jdbc-password}"
query = "select * from source"
}
}
```

其中`sourceId`指代catalog的名称,从而其他项可以使用`${}`占位符,运行时会自动替换。注意,在sink中使用时,同样叫`sourceId`;使用占位符时必须以`${`开头,以`}`结尾,每一项最多只能包含一个占位符,占位符以外也可以有内容
15 changes: 15 additions & 0 deletions seatunnel-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
<artifactId>seatunnel-api</artifactId>
<name>SeaTunnel : Api</name>

<properties>
<httpclient.version>4.5.13</httpclient.version>
<httpcore.version>4.4.16</httpcore.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
Expand All @@ -41,5 +46,15 @@
<version>${project.version}</version>
<classifier>optional</classifier>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>${httpcore.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.metalake;

import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;

import org.apache.seatunnel.common.utils.JsonUtils;

import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import java.io.IOException;

public class GravitinoClient implements MetalakeClient {
private final String metalakeUrl;

public GravitinoClient(String metalakeUrl) {
this.metalakeUrl = metalakeUrl;
}

@Override
public String getType() {
return "gravitino";
}

@Override
public JsonNode getMetaInfo(String sourceId) throws IOException {
try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpGet request = new HttpGet(this.metalakeUrl + sourceId);
request.addHeader("Accept", "application/vnd.gravitino.v1+json");
try (CloseableHttpResponse response = client.execute(request)) {
HttpEntity entity = response.getEntity();
if (entity == null) {
throw new RuntimeException("No response entity");
}
JsonNode rootNode = JsonUtils.readTree(entity.getContent());
EntityUtils.consume(entity);
JsonNode catalogNode = rootNode.get("catalog");
if (catalogNode == null) {
throw new RuntimeException("Response JSON has no 'catalog' field");
}
JsonNode propertiesNode = catalogNode.get("properties");
return propertiesNode;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.metalake;

import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;

import java.io.IOException;

public interface MetalakeClient {
String getType();

JsonNode getMetaInfo(String sourceId) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.metalake;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

public class MetalakeClientFactory {
private static final Map<String, Function<String, MetalakeClient>> REGISTRY = new HashMap<>();

static {
register("gravitino", GravitinoClient::new);
}

private MetalakeClientFactory() {}

public static void register(String type, Function<String, MetalakeClient> constructor) {
REGISTRY.put(type.toLowerCase(), constructor);
}

public static MetalakeClient create(String type, String metalakeUrl) {
Function<String, MetalakeClient> constructor = REGISTRY.get(type.toLowerCase());
if (constructor == null) {
throw new IllegalArgumentException("Unknown MetalakeClient type: " + type);
}
return constructor.apply(metalakeUrl);
}
}
Loading