Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion docs/en/connector-v2/source/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ delivers the query plan as a parameter to BE nodes, and then obtains data result
| scan_mem_limit | long | no | 2147483648 |
| max_retries | int | no | 3 |
| scan.params.* | string | no | - |

| be_host_port_mapping | array | no | - |
### nodeUrls [list]

`StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]`
Expand Down Expand Up @@ -148,6 +148,12 @@ number of retry requests sent to StarRocks

The parameter of the scan data from be

### be_host_port_mapping [array]

The mapping relationship between the `host:be_port` of `StarRocks` cluster `BE` and the accessible `ip:be_port`.
This configuration is optional, mainly to solve the scenario where the computing cluster cannot directly access the host and `be_port` of `BE`, such as `StarRocks` deployed in k8s, but `Flink` cannot directly access the `host` and `be_port` of `BE`. With this configuration, Flink can access `BE` and `be_port`.
For example, `[{"pingt-7f5cf4cfdc-cn-0.headless.olap:9060"="xx.xx.xx.xx:31088"}]`.

## Example

```
Expand Down Expand Up @@ -244,6 +250,47 @@ source {
}
```

## Example 3: Using 'be_fost_port_mapping' to obtain data

```
source {
StarRocks {
nodeUrls = ["starrocks_e2e:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_source"
scan_batch_rows = 10
max_retries = 3
schema {
fields {
BIGINT_COL = BIGINT
LARGEINT_COL = STRING
SMALLINT_COL = SMALLINT
TINYINT_COL = TINYINT
BOOLEAN_COL = BOOLEAN
DECIMAL_COL = "DECIMAL(20, 1)"
DOUBLE_COL = DOUBLE
FLOAT_COL = FLOAT
INT_COL = INT
CHAR_COL = STRING
VARCHAR_11_COL = STRING
STRING_COL = STRING
DATETIME_COL = TIMESTAMP
DATE_COL = DATE
}
}
scan.params.scanner_thread_pool_thread_num = "3"
be_host_port_mapping = [
{
host_port = "pingt-7f5cf4cfdc-cn-0.headless.olap:9060:9060"
ip_port = "xx.xx.xx.xx:31088"
}
]
}
}
```

## Changelog

<ChangeLog />
48 changes: 48 additions & 0 deletions docs/zh/connector-v2/source/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import ChangeLog from '../changelog/connector-starrocks.md';
| scan_mem_limit | long | 否 | 2147483648 |
| max_retries | int | 否 | 3 |
| scan.params.* | string | 否 | - |
| be_host_port_mapping | array | 否 | - |

### nodeUrls [list]

Expand Down Expand Up @@ -146,6 +147,12 @@ partition[5] 从 be_node_3 读取 tablet 数据:tablet[14,15]

从 `BE` 节点扫描数据相关的参数。

### be_host_port_mapping [array]

`StarRocks`集群`BE`的host:be_port与能够访问的ip:be_port映射关系。
该配置可选的,主要是解决计算集群不能够直接访问`BE`的`host`以及`be_port`的场景,如`StarRocks`部署在k8s中,但是flink不能直接访问`BE`的`host`以及`be_port`,利用此配置,`flink`可以能够访问`BE`以及`be_port`。
例如 `[{"pingt-7f5cf4cfdc-cn-0.headless.olap:9060"="xx.xx.xx.xx:31088"}]`。

## 示例 1

```
Expand Down Expand Up @@ -242,6 +249,47 @@ source {
}
```

## 示例 3: 利用be_host_port_mapping获取数据

```
source {
StarRocks {
nodeUrls = ["starrocks_e2e:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_source"
scan_batch_rows = 10
max_retries = 3
schema {
fields {
BIGINT_COL = BIGINT
LARGEINT_COL = STRING
SMALLINT_COL = SMALLINT
TINYINT_COL = TINYINT
BOOLEAN_COL = BOOLEAN
DECIMAL_COL = "DECIMAL(20, 1)"
DOUBLE_COL = DOUBLE
FLOAT_COL = FLOAT
INT_COL = INT
CHAR_COL = STRING
VARCHAR_11_COL = STRING
STRING_COL = STRING
DATETIME_COL = TIMESTAMP
DATE_COL = DATE
}
}
scan.params.scanner_thread_pool_thread_num = "3"
be_host_port_mapping = [
{
host_port = "pingt-7f5cf4cfdc-cn-0.headless.olap:9060:9060"
ip_port = "xx.xx.xx.xx:31088"
}
]
}
}
```

## 变更日志

<ChangeLog />
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.starrocks.client.source;

import org.apache.seatunnel.shade.org.apache.commons.lang3.tuple.Pair;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.source.arrow.reader.ArrowToSeatunnelRowReader;
Expand All @@ -41,8 +43,10 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode.CLOSE_BE_READER_FAILED;

Expand All @@ -69,8 +73,19 @@ public StarRocksBeReadClient(String beNodeInfo, SourceConfig sourceConfig) {
StarRocksConnectorErrorCode.CREATE_BE_READER_FAILED,
String.format("Format of StarRocks BE address[%s] is illegal", beNodeInfo));
}
this.ip = hostPort[0].trim();
this.port = Integer.parseInt(hostPort[1].trim());

// If the user has configured beHostPortMapping, we need to parse it
Map<String, Pair<String, String>> beHostPortMapping = formatBeHostPortMapping(sourceConfig);

if (beHostPortMapping.containsKey(hostPort[0].trim())) {
Pair<String, String> accessIpPort = beHostPortMapping.get(hostPort[0].trim());
this.ip = accessIpPort.getKey();
this.port = Integer.parseInt(accessIpPort.getValue());
} else {
this.ip = hostPort[0].trim();
this.port = Integer.parseInt(hostPort[1].trim());
}

TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
TSocket socket =
new TSocket(
Expand All @@ -91,6 +106,29 @@ public StarRocksBeReadClient(String beNodeInfo, SourceConfig sourceConfig) {
client = new TStarrocksExternalService.Client(protocol);
}

/**
* If the user has configured beHostPortMapping, we need to parse it
*
* @param sourceConfig sourceConfig
* @return <host, Pair<ip, port>>
*/
private static Map<String, Pair<String, String>> formatBeHostPortMapping(
SourceConfig sourceConfig) {
return sourceConfig.getBeHostPortMapping().stream()
.collect(
Collectors.toMap(
mapping -> {
// host:be_port
String[] hostInfo = mapping.getHost_port().split(":");
return hostInfo[0];
},
mapping -> {
// accessible ip and be_port
String[] accessIpInfo = mapping.getIp_port().split(":");
return Pair.of(accessIpInfo[0], accessIpInfo[1]);
}));
}

public void openScanner(QueryPartition partition, SeaTunnelRowType seaTunnelRowType) {
Set<Long> tabletIds = partition.getTabletIds();
TScanOpenParams params = new TScanOpenParams();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.starrocks.config;

import org.apache.seatunnel.api.configuration.util.OptionMark;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class BeHostPortMapping implements Serializable {
private static final long serialVersionUID = -1L;

@OptionMark(description = "The be host and be_port")
private String host_port;

@OptionMark(description = "The accessible ip and be_port")
private String ip_port;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@
@Getter
public class SourceConfig extends StarRocksConfig {

private int maxRetries = StarRocksSourceOptions.MAX_RETRIES.defaultValue();
private int requestTabletSize = StarRocksSourceOptions.QUERY_TABLET_SIZE.defaultValue();
private String scanFilter = StarRocksSourceOptions.SCAN_FILTER.defaultValue();
private long memLimit = StarRocksSourceOptions.SCAN_MEM_LIMIT.defaultValue();
private int queryTimeoutSec = StarRocksSourceOptions.SCAN_QUERY_TIMEOUT_SEC.defaultValue();
private int keepAliveMin = StarRocksSourceOptions.SCAN_KEEP_ALIVE_MIN.defaultValue();
private int batchRows = StarRocksSourceOptions.SCAN_BATCH_ROWS.defaultValue();
private int connectTimeoutMs = StarRocksSourceOptions.SCAN_CONNECT_TIMEOUT.defaultValue();
private List<StarRocksSourceTableConfig> tableConfigList = new ArrayList<>();
private List<BeHostPortMapping> beHostPortMapping = new ArrayList<>();

private Map<String, String> sourceOptionProps = new HashMap<>();

public SourceConfig(ReadonlyConfig config) {
super(config);
this.maxRetries = config.get(StarRocksSourceOptions.MAX_RETRIES);
Expand All @@ -52,17 +65,6 @@ public SourceConfig(ReadonlyConfig config) {
}
});
this.tableConfigList = StarRocksSourceTableConfig.of(config);
this.beHostPortMapping = config.get(StarRocksSourceOptions.BE_HOST_PORT_MAPPING);
}

private int maxRetries = StarRocksSourceOptions.MAX_RETRIES.defaultValue();
private int requestTabletSize = StarRocksSourceOptions.QUERY_TABLET_SIZE.defaultValue();
private String scanFilter = StarRocksSourceOptions.SCAN_FILTER.defaultValue();
private long memLimit = StarRocksSourceOptions.SCAN_MEM_LIMIT.defaultValue();
private int queryTimeoutSec = StarRocksSourceOptions.SCAN_QUERY_TIMEOUT_SEC.defaultValue();
private int keepAliveMin = StarRocksSourceOptions.SCAN_KEEP_ALIVE_MIN.defaultValue();
private int batchRows = StarRocksSourceOptions.SCAN_BATCH_ROWS.defaultValue();
private int connectTimeoutMs = StarRocksSourceOptions.SCAN_CONNECT_TIMEOUT.defaultValue();
private List<StarRocksSourceTableConfig> tableConfigList = new ArrayList<>();

private Map<String, String> sourceOptionProps = new HashMap<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -83,4 +84,11 @@ public class StarRocksSourceOptions extends StarRocksBaseOptions {
.type(new TypeReference<List<Map<String, Object>>>() {})
.noDefaultValue()
.withDescription("table list config");

public static final Option<List<BeHostPortMapping>> BE_HOST_PORT_MAPPING =
Options.key("be_host_port_mapping")
.type(new TypeReference<List<BeHostPortMapping>>() {})
.defaultValue(new ArrayList<>())
.withDescription(
"The mapping relationship between the host:be_port of the starrocks cluster BE and the accessible ip:be_port. This configuration is optional, mainly to solve scenarios where computing cluster cannot directly access be's host and be_port, such as starrocks deployed in k8s, but Flink cannot directly access be's host and be_port. For example, [{\"pingt-7f5cf4cfdc-cn-0.headless.olap:9060\"=\"xx.xx.xx.xx:31088\"}]");
}