Skip to content

Commit

Permalink
[ISSUE #570] ASoC connect runtime optimization: CLI (#622)
Browse files Browse the repository at this point in the history
feature(rocketmq-runtime) add CLI support for rocketmq-connect-runtime

* Add CLI

* Fix checkstyle

* Optimize CLI structure

* Add README.md

* Rename CLI

* Update pom.xml

* Optimize the connectors and tasks format

* Fix newline format
  • Loading branch information
Dreaouth authored Sep 20, 2020
1 parent 5cbec0c commit 5c48ed1
Show file tree
Hide file tree
Showing 36 changed files with 2,360 additions and 531 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public String verifyAndSetConfig(KeyValue config) {

@Override
public void start() {

log.info("JdbcSourceConnector start");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,23 @@ private List<KeyValue> divideSourceTaskByTopic(DbConnectorConfig dbConnectorConf
int parallelism = tdc.getTaskParallelism();
int id = -1;
Map<String, String> topicRouteMap = ((SourceDbConnectorConfig)dbConnectorConfig).getWhiteTopics();
Map<Integer, Map<String, Map<String, String>>> taskTopicList = new HashMap<>();
Map<Integer, String> taskTopicList = new HashMap<>();
Map<Integer, Map<String, Map<String, String>>> taskWhiteList = new HashMap<>();
for (Map.Entry<String, String> entry : topicRouteMap.entrySet()) {
int ind = ++id % parallelism;
if (!taskTopicList.containsKey(ind)) {
taskTopicList.put(ind, new HashMap<>());
if (!taskWhiteList.containsKey(ind)) {
taskWhiteList.put(ind, new HashMap<>());
}
String dbKey = entry.getKey().split("-")[0];
String tableKey = entry.getKey().split("-")[1];
taskTopicList.put(ind, tableKey);
String filter = entry.getValue();
Map<String, String> tableMap = new HashMap<>();
tableMap.put(tableKey, filter);
if(!taskTopicList.get(ind).containsKey(dbKey)){
taskTopicList.get(ind).put(dbKey, tableMap);
if(!taskWhiteList.get(ind).containsKey(dbKey)){
taskWhiteList.get(ind).put(dbKey, tableMap);
}else {
taskTopicList.get(ind).get(dbKey).putAll(tableMap);
taskWhiteList.get(ind).get(dbKey).putAll(tableMap);
}
}

Expand All @@ -66,7 +68,8 @@ private List<KeyValue> divideSourceTaskByTopic(DbConnectorConfig dbConnectorConf
keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
keyValue.put(Config.CONN_WHITE_LIST, JSONObject.toJSONString(taskTopicList.get(i)));
keyValue.put(Config.CONN_WHITE_LIST, JSONObject.toJSONString(taskWhiteList.get(i)));
keyValue.put(Config.CONN_TOPIC_NAMES, taskTopicList.get(i));
keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
keyValue.put(Config.CONN_DB_MODE, tdc.getMode());
Expand Down
1 change: 1 addition & 0 deletions rocketmq-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<modules>
<module>rocketmq-connect-sample</module>
<module>rocketmq-connect-runtime</module>
<module>rocketmq-connect-cli</module>
</modules>
<name>RocketMQ Connect</name>

Expand Down
69 changes: 69 additions & 0 deletions rocketmq-connect/rocketmq-connect-cli/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# rocketmq-connect-CLI Admin

与RocketMQ中的mqadmin类似,使用简洁的CLI命令实现增加,删除,查看connector等功能

在rocketmq-connect\rocketmq-connect-CLI目录下,运行`sh connectAdmin`

```bash
The most commonly used connectAdmin commands are:
createConnector Create and start a connector by connector's config
stopConnector Stop a specific connector by connector-name
queryConnectorConfig Get configuration information for a connector
queryConnectorStatus Get Status information for a connector
stopAll Stop and delete all Connectors and all configuration information
reloadPlugins Reload the Connector file under the plugin directory
getConfigInfo Get all configuration information
getClusterInfo Get cluster information
getAllocatedInfo Get the load information of the current worker
See 'connectAdmin help <command>' for more information on a specific command.
```
如上所示,其中列出了最常用的命令,并附有简短说明。要获取每个命令的详细手册,请使用`sh connectAdmin help <command>`。例如,命令`sh connectAdmin help stopConnector`将输出如下内容:
```bash
$ sh connectAdmin help stopConnector
usage: connectAdmin stopConnector -c <arg> [-h]
-c,--connectorName <arg> connector name
-h,--help Print help
```
## getAllocatedConnectors&getAllocatedTasks
提供格式化输出当前connectors和tasks
| taskName | connectorName | status | topic | update-timestamp |
| -------------- | ------------------- | ---------- | --------- | ---------------- |
| JdbcSourceTask | jdbcConnectorSource | TERMINATED | jdbcTopic | 1597409102590 |
| FileSourceTask | fileConnectorSource | TERMINATED | fileTopic | 1597409110815 |
| FileSinkTask | fileConnectorSink | STOPPING | fileTopic | 1597409204516 |
## createConnector
其中要说明的一点是,createConnector需要指定配置文件的路径
```bash
$ sh connectAdmin help createConnector
usage: connectAdmin createConnector -c <arg> [-h] -p <arg>
-c,--connectorName <arg> connector name
-h,--help Print help
-p,--path <arg> Configuration file pat
```
所以在启动新的connector时,要用`-p`指定json配置文件的路径,例如
```bash
sh connectAdmin createConnector -c fileConnectorSource -p /root/shell/file-connector.json
```
配置文件格式参考具体的connector,这里给出file-connector的格式
```json
{
"connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector",
"topic":"fileTopic",
"filename":"/opt/source-file/source-file.txt",
"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}
```
18 changes: 18 additions & 0 deletions rocketmq-connect/rocketmq-connect-cli/connectAdmin
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/sh
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


cd target/distribution/ && java -cp .:./conf/:./lib/* org.apache.rocketmq.connect.cli.ConnectAdminStartup -s conf/connect.conf $@
186 changes: 186 additions & 0 deletions rocketmq-connect/rocketmq-connect-cli/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-connect</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>rocketmq-connect-cli</artifactId>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<!-- Compiler settings properties -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>

<!-- RocketMQ Version-->
<rocketmq.version>4.7.0</rocketmq.version>
</properties>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<excludes>
<exclude>*.xml</exclude>
<exclude>connect.conf</exclude>
</excludes>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<finalName>distribution</finalName>
<appendAssemblyId>false</appendAssemblyId>
<attach>false</attach>
<descriptors>
<descriptor>src/main/resources/package.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>


<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>versions-maven-plugin</artifactId>
<version>2.3</version>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>clirr-maven-plugin</artifactId>
<version>2.7</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<compilerVersion>${maven.compiler.source}</compilerVersion>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<argLine>-Xms512m -Xmx1024m</argLine>
<forkMode>always</forkMode>
<includes>
<include>**/*Test.java</include>
</includes>
</configuration>
</plugin>
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.6</version>
<configuration>
<locales>en_US</locales>
<outputEncoding>UTF-8</outputEncoding>
<inputEncoding>UTF-8</inputEncoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>3.0.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.4</version>
<configuration>
<charset>UTF-8</charset>
<locale>en_US</locale>
</configuration>
<executions>
<execution>
<id>aggregate</id>
<goals>
<goal>aggregate</goal>
</goals>
<phase>site</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<version>3.0.4</version>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-connector</artifactId>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>

</project>
Loading

0 comments on commit 5c48ed1

Please sign in to comment.