Skip to content

Commit 6416a17

Browse files
authored
[Feature][Engine]Metalake support for data source information storage and management (#9688)
1 parent cfe50ff commit 6416a17

File tree

21 files changed

+999
-9
lines changed

21 files changed

+999
-9
lines changed

config/seatunnel-env.cmd

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,13 @@ REM Home directory of spark distribution.
1818
if "%SPARK_HOME%" == "" set "SPARK_HOME=C:\Program Files\spark"
1919

2020
REM Home directory of flink distribution.
21-
if "%FLINK_HOME%" == "" set "FLINK_HOME=C:\Program Files\flink"
21+
if "%FLINK_HOME%" == "" set "FLINK_HOME=C:\Program Files\flink"
22+
23+
REM Whether to enable metalake (true/false).
24+
if "%METALAKE_ENABLED%" == "" set "META_LAKE_ENABLED=false"
25+
26+
REM Type of metalake implementation.
27+
if "%METALAKE_TYPE%" == "" set "METALAKE_TYPE=gravitino"
28+
29+
REM Metalake service URL, format: http://host:port/api/metalakes/{metalake_name}/catalogs/
30+
if "%METALAKE_URL%" == "" set "METALAKE_URL=http://localhost:8090/api/metalakes/default_metalake_name/catalogs/"

config/seatunnel-env.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,9 @@
2020
SPARK_HOME=${SPARK_HOME:-/opt/spark}
2121
# Home directory of flink distribution.
2222
FLINK_HOME=${FLINK_HOME:-/opt/flink}
23+
# Whether to enable metalake (true/false).
24+
METALAKE_ENABLED=${METALAKE_ENABLED:-false}
25+
# Type of metalake implementation.
26+
METALAKE_TYPE=${METALAKE_TYPE:-gravitino}
27+
# Metalake service URL, format: http://host:port/api/metalakes/{metalake_name}/catalogs/.
28+
METALAKE_URL=${METALAKE_URL:-http://localhost:8090/api/metalakes/default_metalake_name/catalogs/}

docs/en/concept/metalake.md

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# METALAKE
2+
3+
Since Seatunnel requires database usernames, passwords, and other sensitive information to be written in plaintext within scripts when executing tasks, this may lead to information leakage and is also difficult to maintain. When data source information changes, manual modifications are often required.
4+
5+
To address this, Metalake is introduced. Data source information can be stored in Metalake systems such as Apache Gravitino. Task scripts then use `sourceId` and placeholders instead of actual usernames and passwords. At runtime, the Seatunnel engine retrieves the information from Metalake via HTTP requests and replaces the placeholders accordingly.
6+
7+
To enable Metalake, you first need to modify the environment variables in **seatunnel-env.sh**:
8+
9+
* `METALAKE_ENABLED`
10+
* `METALAKE_TYPE`
11+
* `METALAKE_URL`
12+
13+
Set `METALAKE_ENABLED` to `true`. Currently, `METALAKE_TYPE` only supports `gravitino`.
14+
15+
For Apache Gravitino, set `METALAKE_URL` to:
16+
17+
```
18+
http://host:port/api/metalakes/your_metalake_name/catalogs/
19+
```
20+
21+
---
22+
23+
## Usage Example
24+
25+
First, create a catalog in Gravitino, for example:
26+
27+
```bash
28+
curl -L 'http://localhost:8090/api/metalakes/test_metalake/catalogs' \
29+
-H 'Content-Type: application/json' \
30+
-H 'Accept: application/vnd.gravitino.v1+json' \
31+
-d '{
32+
"name": "test_catalog",
33+
"type": "relational",
34+
"provider": "jdbc-mysql",
35+
"comment": "for metalake test",
36+
"properties": {
37+
"jdbc-driver": "com.mysql.cj.jdbc.Driver",
38+
"jdbc-url": "not used",
39+
"jdbc-user": "root",
40+
"jdbc-password": "Abc!@#135_seatunnel"
41+
}
42+
}'
43+
```
44+
45+
This creates a `test_catalog` under `test_metalake` (note: `metalake` itself must be created in advance).
46+
47+
Thus, `METALAKE_URL` can be set to:
48+
49+
```
50+
http://localhost:8090/api/metalakes/test_metalake/catalogs/
51+
```
52+
53+
You can then define the source as:
54+
55+
```hocon
56+
source {
57+
Jdbc {
58+
url = "jdbc:mysql://mysql-e2e:3306/seatunnel?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true"
59+
driver = "${jdbc-driver}"
60+
connection_check_timeout_sec = 100
61+
sourceId = "test_catalog"
62+
user = "${jdbc-user}"
63+
password = "${jdbc-password}"
64+
query = "select * from source"
65+
}
66+
}
67+
```
68+
69+
Here, `sourceId` refers to the catalog name, allowing other fields to use `${}` placeholders. At runtime, they will be automatically replaced. Note that in sinks, the same `sourceId` name is used, and placeholders must always start with `${` and end with `}`. Each item can contain at most one placeholder, and there can be content outside the placeholder as well.

docs/sidebars.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ const sidebars = {
9494
'concept/sql-config',
9595
'concept/speed-limit',
9696
'concept/event-listener',
97-
'concept/schema-evolution'
97+
'concept/schema-evolution',
98+
'concept/metalake'
9899
]
99100
},
100101
{

docs/zh/concept/metalake.md

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# METALAKE
2+
3+
由于Seatunnel在执行任务时,需要将数据库用户名与密码等隐私信息明文写在脚本中,可能会导致信息泄露;并且维护较为困难,数据源信息发生变更时可能需要手动更改。
4+
5+
因此引入了metalake,将数据源的信息存储于Apache Gravitino等metalake中,任务脚本采用`sourceId`和占位符的方法来代替原本的用户名和密码等信息,运行时seatunnel-engine通过http请求从metalake获取信息,根据占位符进行替换。
6+
7+
若要使用metalake,首先要修改**seatunnel-env.sh**中的环境变量:
8+
9+
* `METALAKE_ENABLED`
10+
* `METALAKE_TYPE`
11+
* `METALAKE_URL`
12+
13+
`METALAKE_ENABLED`设为`true``METALAKE_TYPE`当前仅支持设为`gravitino`
14+
15+
对于Apache Gravitino,`METALAKE_URL`设为
16+
17+
```
18+
http://host:port/api/metalakes/your_metalake_name/catalogs/
19+
```
20+
21+
---
22+
23+
## 使用示例:
24+
25+
用户需要先在Gravitino中创建catalog,如
26+
27+
```bash
28+
curl -L 'http://localhost:8090/api/metalakes/test_metalake/catalogs'
29+
-H 'Content-Type: application/json'
30+
-H 'Accept: application/vnd.gravitino.v1+json'
31+
-d '{
32+
"name": "test_catalog",
33+
"type": "relational",
34+
"provider": "jdbc-mysql",
35+
"comment": "for metalake test",
36+
"properties": {
37+
"jdbc-driver": "com.mysql.cj.jdbc.Driver",
38+
"jdbc-url": "not used",
39+
"jdbc-user": "root",
40+
"jdbc-password": "Abc!@#135_seatunnel"
41+
}
42+
}'
43+
```
44+
45+
这样便在`test_metalake`中创建了一个`test_catalog``metalake`需要提前创建)
46+
47+
于是`METALAKE_URL`可以设为
48+
49+
```
50+
http://localhost:8090/api/metalakes/test_metalake/catalogs/
51+
```
52+
53+
source可以写为
54+
55+
```
56+
source {
57+
Jdbc {
58+
url = "jdbc:mysql://mysql-e2e:3306/seatunnel?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true"
59+
driver = "${jdbc-driver}"
60+
connection_check_timeout_sec = 100
61+
sourceId = "test_catalog"
62+
user = "${jdbc-user}"
63+
password = "${jdbc-password}"
64+
query = "select * from source"
65+
}
66+
}
67+
```
68+
69+
其中`sourceId`指代catalog的名称,从而其他项可以使用`${}`占位符,运行时会自动替换。注意,在sink中使用时,同样叫`sourceId`;使用占位符时必须以`${`开头,以`}`结尾,每一项最多只能包含一个占位符,占位符以外也可以有内容

seatunnel-api/pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@
2929
<artifactId>seatunnel-api</artifactId>
3030
<name>SeaTunnel : Api</name>
3131

32+
<properties>
33+
<httpclient.version>4.5.13</httpclient.version>
34+
<httpcore.version>4.4.16</httpcore.version>
35+
</properties>
36+
3237
<dependencies>
3338
<dependency>
3439
<groupId>org.apache.seatunnel</groupId>
@@ -41,5 +46,15 @@
4146
<version>${project.version}</version>
4247
<classifier>optional</classifier>
4348
</dependency>
49+
<dependency>
50+
<groupId>org.apache.httpcomponents</groupId>
51+
<artifactId>httpclient</artifactId>
52+
<version>${httpclient.version}</version>
53+
</dependency>
54+
<dependency>
55+
<groupId>org.apache.httpcomponents</groupId>
56+
<artifactId>httpcore</artifactId>
57+
<version>${httpcore.version}</version>
58+
</dependency>
4459
</dependencies>
4560
</project>
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.metalake;
19+
20+
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
21+
22+
import org.apache.seatunnel.common.utils.JsonUtils;
23+
24+
import org.apache.http.HttpEntity;
25+
import org.apache.http.client.methods.CloseableHttpResponse;
26+
import org.apache.http.client.methods.HttpGet;
27+
import org.apache.http.impl.client.CloseableHttpClient;
28+
import org.apache.http.impl.client.HttpClients;
29+
import org.apache.http.util.EntityUtils;
30+
31+
import java.io.IOException;
32+
33+
public class GravitinoClient implements MetalakeClient {
34+
private final String metalakeUrl;
35+
36+
public GravitinoClient(String metalakeUrl) {
37+
this.metalakeUrl = metalakeUrl;
38+
}
39+
40+
@Override
41+
public String getType() {
42+
return "gravitino";
43+
}
44+
45+
@Override
46+
public JsonNode getMetaInfo(String sourceId) throws IOException {
47+
try (CloseableHttpClient client = HttpClients.createDefault()) {
48+
HttpGet request = new HttpGet(this.metalakeUrl + sourceId);
49+
request.addHeader("Accept", "application/vnd.gravitino.v1+json");
50+
try (CloseableHttpResponse response = client.execute(request)) {
51+
HttpEntity entity = response.getEntity();
52+
if (entity == null) {
53+
throw new RuntimeException("No response entity");
54+
}
55+
JsonNode rootNode = JsonUtils.readTree(entity.getContent());
56+
EntityUtils.consume(entity);
57+
JsonNode catalogNode = rootNode.get("catalog");
58+
if (catalogNode == null) {
59+
throw new RuntimeException("Response JSON has no 'catalog' field");
60+
}
61+
JsonNode propertiesNode = catalogNode.get("properties");
62+
return propertiesNode;
63+
}
64+
}
65+
}
66+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.metalake;
19+
20+
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
21+
22+
import java.io.IOException;
23+
24+
public interface MetalakeClient {
25+
String getType();
26+
27+
JsonNode getMetaInfo(String sourceId) throws IOException;
28+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.metalake;
19+
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
import java.util.function.Function;
23+
24+
public class MetalakeClientFactory {
25+
private static final Map<String, Function<String, MetalakeClient>> REGISTRY = new HashMap<>();
26+
27+
static {
28+
register("gravitino", GravitinoClient::new);
29+
}
30+
31+
private MetalakeClientFactory() {}
32+
33+
public static void register(String type, Function<String, MetalakeClient> constructor) {
34+
REGISTRY.put(type.toLowerCase(), constructor);
35+
}
36+
37+
public static MetalakeClient create(String type, String metalakeUrl) {
38+
Function<String, MetalakeClient> constructor = REGISTRY.get(type.toLowerCase());
39+
if (constructor == null) {
40+
throw new IllegalArgumentException("Unknown MetalakeClient type: " + type);
41+
}
42+
return constructor.apply(metalakeUrl);
43+
}
44+
}

0 commit comments

Comments
 (0)