diff --git a/pom.xml b/pom.xml
index 1262d7abf..833cdcb0f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,6 +25,7 @@
rocketmq-connect-sample
rocketmq-connect-runtime
+ rocketmq-connect-cli
RocketMQ Connect
diff --git a/rocketmq-connect-cli/README.md b/rocketmq-connect-cli/README.md
new file mode 100644
index 000000000..4c28bf6c8
--- /dev/null
+++ b/rocketmq-connect-cli/README.md
@@ -0,0 +1,69 @@
+# rocketmq-connect-CLI Admin
+
+与RocketMQ中的mqadmin类似,使用简洁的CLI命令实现增加,删除,查看connector等功能
+
+在rocketmq-connect\rocketmq-connect-CLI目录下,运行`sh connectAdmin`
+
+```bash
+The most commonly used connectAdmin commands are:
+ createConnector Create and start a connector by connector's config
+ stopConnector Stop a specific connector by connector-name
+ queryConnectorConfig Get configuration information for a connector
+ queryConnectorStatus Get Status information for a connector
+ stopAll Stop and delete all Connectors and all configuration information
+ reloadPlugins Reload the Connector file under the plugin directory
+ getConfigInfo Get all configuration information
+ getClusterInfo Get cluster information
+ getAllocatedInfo Get the load information of the current worker
+
+See 'connectAdmin help ' for more information on a specific command.
+```
+
+如上所示,其中列出了最常用的命令,并附有简短说明。要获取每个命令的详细手册,请使用`sh connectAdmin help `。例如,命令`sh connectAdmin help stopConnector`将输出如下内容:
+
+```bash
+$ sh connectAdmin help stopConnector
+usage: connectAdmin stopConnector -c [-h]
+ -c,--connectorName connector name
+ -h,--help Print help
+```
+
+## getAllocatedConnectors&getAllocatedTasks
+
+提供格式化输出当前connectors和tasks
+
+| taskName | connectorName | status | topic | update-timestamp |
+| -------------- | ------------------- | ---------- | --------- | ---------------- |
+| JdbcSourceTask | jdbcConnectorSource | TERMINATED | jdbcTopic | 1597409102590 |
+| FileSourceTask | fileConnectorSource | TERMINATED | fileTopic | 1597409110815 |
+| FileSinkTask | fileConnectorSink | STOPPING | fileTopic | 1597409204516 |
+
+## createConnector
+
+其中要说明的一点是,createConnector需要指定配置文件的路径
+
+```bash
+$ sh connectAdmin help createConnector
+usage: connectAdmin createConnector -c [-h] -p
+ -c,--connectorName connector name
+ -h,--help Print help
+ -p,--path Configuration file pat
+```
+
+所以在启动新的connector时,要用`-p`指定json配置文件的路径,例如
+
+```bash
+sh connectAdmin createConnector -c fileConnectorSource -p /root/shell/file-connector.json
+```
+
+配置文件格式参考具体的connector,这里给出file-connector的格式
+
+```json
+{
+ "connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector",
+ "topic":"fileTopic",
+ "filename":"/opt/source-file/source-file.txt",
+ "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
+}
+```
+
diff --git a/rocketmq-connect-cli/connectAdmin b/rocketmq-connect-cli/connectAdmin
new file mode 100644
index 000000000..5ded525c7
--- /dev/null
+++ b/rocketmq-connect-cli/connectAdmin
@@ -0,0 +1,18 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cd target/distribution/ && java -cp .:./conf/:./lib/* org.apache.rocketmq.connect.cli.ConnectAdminStartup -s conf/connect.conf $@
diff --git a/rocketmq-connect-cli/pom.xml b/rocketmq-connect-cli/pom.xml
new file mode 100644
index 000000000..047398265
--- /dev/null
+++ b/rocketmq-connect-cli/pom.xml
@@ -0,0 +1,186 @@
+
+
+
+
+ rocketmq-connect
+ org.apache.rocketmq
+ 0.0.1-SNAPSHOT
+
+ 4.0.0
+
+ rocketmq-connect-cli
+
+
+ UTF-8
+ UTF-8
+
+
+ 1.8
+ 1.8
+
+
+ 4.7.0
+
+
+
+
+
+ src/main/resources
+
+ *.xml
+ connect.conf
+
+ true
+
+
+
+
+ maven-assembly-plugin
+ 2.2.1
+
+ distribution
+ false
+ false
+
+ src/main/resources/package.xml
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
+ org.codehaus.mojo
+ versions-maven-plugin
+ 2.3
+
+
+ org.codehaus.mojo
+ clirr-maven-plugin
+ 2.7
+
+
+ maven-compiler-plugin
+ 3.6.1
+
+
+ ${maven.compiler.target}
+ ${maven.compiler.source}
+ true
+ true
+
+
+
+ maven-surefire-plugin
+ 2.19.1
+
+ -Xms512m -Xmx1024m
+ always
+
+ **/*Test.java
+
+
+
+
+ maven-site-plugin
+ 3.6
+
+ en_US
+ UTF-8
+ UTF-8
+
+
+
+ maven-source-plugin
+ 3.0.1
+
+
+ attach-sources
+
+ jar
+
+
+
+
+
+ maven-javadoc-plugin
+ 2.10.4
+
+ UTF-8
+ en_US
+
+
+
+ aggregate
+
+ aggregate
+
+ site
+
+
+
+
+ maven-resources-plugin
+ 3.0.2
+
+ ${project.build.sourceEncoding}
+
+
+
+ org.codehaus.mojo
+ findbugs-maven-plugin
+ 3.0.4
+
+
+
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+ io.openmessaging
+ openmessaging-connector
+
+
+ commons-cli
+ commons-cli
+
+
+ org.apache.rocketmq
+ rocketmq-tools
+ ${rocketmq.version}
+
+
+
+
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/ConnectAdminStartup.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/ConnectAdminStartup.java
new file mode 100644
index 000000000..cee747643
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/ConnectAdminStartup.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli;
+
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.connect.cli.command.SubCommand;
+import org.apache.rocketmq.connect.cli.command.GetAllocatedConnectors;
+import org.apache.rocketmq.connect.cli.command.GetAllocatedTasks;
+import org.apache.rocketmq.connect.cli.command.CreateConnectorSubCommand;
+import org.apache.rocketmq.connect.cli.command.GetAllocatedInfoCommand;
+import org.apache.rocketmq.connect.cli.command.GetClusterInfoSubCommand;
+import org.apache.rocketmq.connect.cli.command.GetConfigInfoSubCommand;
+import org.apache.rocketmq.connect.cli.command.QueryConnectorConfigSubCommand;
+import org.apache.rocketmq.connect.cli.command.QueryConnectorStatusSubCommand;
+import org.apache.rocketmq.connect.cli.command.ReloadPluginsSubCommand;
+import org.apache.rocketmq.connect.cli.command.StopAllSubCommand;
+import org.apache.rocketmq.connect.cli.command.StopConnectorSubCommand;
+import org.apache.rocketmq.connect.cli.commom.Config;
+import org.apache.rocketmq.connect.cli.utils.FileAndPropertyUtil;
+import org.apache.rocketmq.connect.cli.utils.ServerUtil;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+public class ConnectAdminStartup {
+
+ protected static List subCommandList = new ArrayList();
+
+ public static CommandLine commandLine = null;
+
+ public static String configFile = null;
+
+ public static Properties properties = null;
+
+ public static void main(String[] args) {
+ System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+
+ String[] configArgs = Arrays.copyOfRange(args, 0, 2);
+ args = Arrays.copyOfRange(args, 2, args.length);
+
+ try {
+
+ // Build the command line options.
+ Options option = ServerUtil.buildCommandlineOptions(new Options());
+ commandLine = ServerUtil.parseCmdLine("connect", configArgs, buildCommandlineOptions(option),
+ new PosixParser());
+ if (null == commandLine) {
+ System.exit(-1);
+ }
+
+ // Load configs from command line.
+ Config config = new Config();
+ if (commandLine.hasOption('s')) {
+ String file = commandLine.getOptionValue('s');
+ if (file != null) {
+ configFile = file;
+ InputStream in = new BufferedInputStream(new FileInputStream(file));
+ properties = new Properties();
+ properties.load(in);
+ FileAndPropertyUtil.properties2Object(properties, config);
+ in.close();
+ }
+ }
+
+ initCommand(config);
+
+ switch (args.length) {
+ case 0:
+ printHelp();
+ break;
+ case 2:
+ if (args[0].equals("help")) {
+ SubCommand cmd = findSubCommand(args[1]);
+ if (cmd != null) {
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ options = cmd.buildCommandlineOptions(options);
+ if (options != null) {
+ ServerUtil.printCommandLineHelp("connectAdmin " + cmd.commandName(), options);
+ }
+ } else {
+ System.out.printf("The sub command %s not exist.%n", args[1]);
+ }
+ break;
+ }
+ case 1:
+ default:
+ SubCommand cmd = findSubCommand(args[0]);
+ if (cmd != null) {
+ String[] subargs = parseSubArgs(args);
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ final CommandLine commandLine =
+ ServerUtil.parseCmdLine("connectAdmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
+ new PosixParser());
+ if (null == commandLine) {
+ return;
+ }
+
+ cmd.execute(commandLine, options);
+ } else {
+ System.out.printf("The sub command %s not exist.%n", args[0]);
+ }
+ break;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static void initCommand(Config config) {
+ initCommand(new CreateConnectorSubCommand(config));
+ initCommand(new StopConnectorSubCommand(config));
+ initCommand(new QueryConnectorConfigSubCommand(config));
+ initCommand(new QueryConnectorStatusSubCommand(config));
+ initCommand(new StopAllSubCommand(config));
+ initCommand(new ReloadPluginsSubCommand(config));
+ initCommand(new GetConfigInfoSubCommand(config));
+ initCommand(new GetClusterInfoSubCommand(config));
+ initCommand(new GetAllocatedInfoCommand(config));
+ initCommand(new GetAllocatedConnectors(config));
+ initCommand(new GetAllocatedTasks(config));
+ }
+
+
+ private static void printHelp() {
+ System.out.printf("The most commonly used connectAdmin commands are:%n");
+ for (SubCommand cmd : subCommandList) {
+ System.out.printf(" %-25s %s%n", cmd.commandName(), cmd.commandDesc());
+ }
+
+ System.out.printf("%nSee 'connectAdmin help ' for more information on a specific command.%n");
+ }
+
+ private static SubCommand findSubCommand(final String name) {
+ for (SubCommand cmd : subCommandList) {
+ if (cmd.commandName().toUpperCase().equals(name.toUpperCase())) {
+ return cmd;
+ }
+ }
+
+ return null;
+ }
+
+ private static String[] parseSubArgs(String[] args) {
+ if (args.length > 1) {
+ String[] result = new String[args.length - 1];
+ for (int i = 0; i < args.length - 1; i++) {
+ result[i] = args[i + 1];
+ }
+ return result;
+ }
+ return null;
+ }
+
+ public static void initCommand(SubCommand command) {
+ subCommandList.add(command);
+ }
+
+ private static Options buildCommandlineOptions(Options options) {
+
+ Option opt = new Option("s", "configFile", true, "connect config properties file");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+}
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/CreateConnectorSubCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/CreateConnectorSubCommand.java
new file mode 100644
index 000000000..6cc7b356c
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/CreateConnectorSubCommand.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine;
+import org.apache.rocketmq.connect.cli.commom.Config;
+import org.apache.rocketmq.connect.cli.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+
+public class CreateConnectorSubCommand implements SubCommand {
+
+ private final Config config;
+
+ public CreateConnectorSubCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "createConnector";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Create and start a connector by connector's config";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("c", "connectorName", true, "connector name");
+ opt.setRequired(true);
+ options.addOption(opt);
+ opt = new Option("p", "path", true, "Configuration file path");
+ opt.setRequired(true);
+ options.addOption(opt);
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws SubCommandException {
+ try {
+ String connectorName = commandLine.getOptionValue('c').trim();
+ String request = "/connectors/" + connectorName + "?config=";
+ URL baseUrl = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request);
+ String filePath = commandLine.getOptionValue('p').trim();
+ String config = readFile(filePath);
+ String result = new RestSender().sendHttpRequest(baseUrl.toString(), config);
+ System.out.printf(result + "%n");
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ }
+ }
+
+ private String readFile(String filePath) {
+ StringBuilder sb = new StringBuilder();
+ try {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), StandardCharsets.UTF_8));
+ int index = 0;
+ while ((index = reader.read()) != -1) {
+ sb.append((char) index);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return sb.toString();
+ }
+}
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedConnectors.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedConnectors.java
new file mode 100644
index 000000000..95b618ba1
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedConnectors.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli.command;
+
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine;
+import org.apache.rocketmq.connect.cli.commom.Config;
+import org.apache.rocketmq.connect.cli.commom.ConnectKeyValue;
+import org.apache.rocketmq.connect.cli.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.net.URL;
+import java.util.Map;
+
+import static org.apache.rocketmq.connect.cli.utils.FileAndPropertyUtil.stringToKeyValue;
+
+public class GetAllocatedConnectors implements SubCommand {
+
+ private final Config config;
+
+ public GetAllocatedConnectors(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "getAllocatedConnectors";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Get allocated connectors information";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws SubCommandException {
+ try {
+ String request = "/getAllocatedConnectors";
+ URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request);
+ String result = new RestSender().sendHttpRequest(url.toString(), "");
+ showConnectorInfo(result);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ }
+ }
+
+ private void showConnectorInfo(String result) {
+ Map connectMap = JSON.parseObject(result, Map.class);
+ System.out.printf("%-25s %-15s %-17s %-25s%n", "connectorName", "status", "topic", "update-timestamp");
+ for (Map.Entry entry : connectMap.entrySet()) {
+ String json = entry.getValue().getString("properties");
+ ConnectKeyValue keyValue = stringToKeyValue(json);
+ String connectorName = entry.getKey();
+ String topic;
+ if (keyValue.getString("topic") != null) {
+ topic = keyValue.getString("topic");
+ } else {
+ topic = keyValue.getString("topicNames");
+ }
+ String updateTimestamp = keyValue.getString("update-timestamp");
+ System.out.printf("%-25s %-15s %-17s %-25s%n", connectorName, "RUNNING", topic, updateTimestamp);
+ }
+ }
+}
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedInfoCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedInfoCommand.java
new file mode 100644
index 000000000..49f1a7729
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedInfoCommand.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine;
+import org.apache.rocketmq.connect.cli.commom.Config;
+import org.apache.rocketmq.connect.cli.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.net.URL;
+
+public class GetAllocatedInfoCommand implements SubCommand {
+
+ private final Config config;
+
+ public GetAllocatedInfoCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "getAllocatedInfo";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Get the load information of the current worker";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws SubCommandException {
+ try {
+ String request = "/" + commandName();
+ URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request);
+ String result = new RestSender().sendHttpRequest(url.toString(), "");
+ System.out.printf("%s%n", result);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ }
+ }
+}
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedTasks.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedTasks.java
new file mode 100644
index 000000000..6e4d790a3
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedTasks.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli.command;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine;
+import org.apache.rocketmq.connect.cli.commom.Config;
+import org.apache.rocketmq.connect.cli.commom.ConnectKeyValue;
+import org.apache.rocketmq.connect.cli.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.rocketmq.connect.cli.utils.FileAndPropertyUtil.stringToKeyValue;
+
+public class GetAllocatedTasks implements SubCommand {
+
+ private final Config config;
+
+ public GetAllocatedTasks(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "getAllocatedTasks";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Get allocated tasks information";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws SubCommandException {
+ try {
+ String request = "/getAllocatedTasks";
+ URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request);
+ String result = new RestSender().sendHttpRequest(url.toString(), "");
+ showTaskInfo(result);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ }
+ }
+
+ private void showTaskInfo(String result) {
+ Map> taskMap = JSON.parseObject(result, Map.class);
+ System.out.printf("%-22s %-22s %-12s %-15s %-20s %-30s%n", "taskName", "connectorName", "status", "topic", "update-timestamp", "taskId");
+ for (Map.Entry> entry : taskMap.entrySet()) {
+ for (JSONObject jsonObject: entry.getValue()) {
+ String connectorName = jsonObject.getString("connectorName");
+ String status = jsonObject.getString("state");
+ ConnectKeyValue keyValue = stringToKeyValue(jsonObject.getJSONObject("configs").getString("properties"));
+ String[] taskNameTmp = keyValue.getString("task-class").split("\\.");
+ String taskName = taskNameTmp[taskNameTmp.length - 1];
+ String taskId = keyValue.getString("task-id");
+ String topic;
+ if (keyValue.getString("topic") != null) {
+ topic = keyValue.getString("topic");
+ } else {
+ topic = keyValue.getString("topicNames");
+ }
+ String updateTimestamp = keyValue.getString("update-timestamp");
+ System.out.printf("%-22s %-22s %-12s %-15s %-20s %-30s%n", taskName, connectorName, status, topic, updateTimestamp, taskId);
+ }
+
+ }
+ }
+}
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetClusterInfoSubCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetClusterInfoSubCommand.java
new file mode 100644
index 000000000..4698883c9
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetClusterInfoSubCommand.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine;
+import org.apache.rocketmq.connect.cli.commom.Config;
+import org.apache.rocketmq.connect.cli.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.net.URL;
+
+public class GetClusterInfoSubCommand implements SubCommand {
+
+ private final Config config;
+
+ public GetClusterInfoSubCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "getClusterInfo";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Get cluster information";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws SubCommandException {
+ try {
+ String request = "/" + commandName();
+ URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request);
+ String result = new RestSender().sendHttpRequest(url.toString(), "");
+ System.out.printf("%s%n", result);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ }
+ }
+}
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetConfigInfoSubCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetConfigInfoSubCommand.java
new file mode 100644
index 000000000..8ded0def0
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetConfigInfoSubCommand.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine;
+import org.apache.rocketmq.connect.cli.commom.Config;
+import org.apache.rocketmq.connect.cli.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.net.URL;
+
+public class GetConfigInfoSubCommand implements SubCommand {
+
+ private final Config config;
+
+ public GetConfigInfoSubCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "getConfigInfo";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Get all connectors and tasks configuration information";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws SubCommandException {
+ try {
+ String request = "/" + commandName();
+ URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request);
+ String result = new RestSender().sendHttpRequest(url.toString(), "");
+ System.out.printf("%s%n", result);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ }
+ }
+}
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/QueryConnectorConfigSubCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/QueryConnectorConfigSubCommand.java
new file mode 100644
index 000000000..04f11b3c0
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/QueryConnectorConfigSubCommand.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine;
+import org.apache.rocketmq.connect.cli.commom.Config;
+import org.apache.rocketmq.connect.cli.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.net.URL;
+
+public class QueryConnectorConfigSubCommand implements SubCommand {
+
+ private final Config config;
+
+ public QueryConnectorConfigSubCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "queryConnectorConfig";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Get configuration information for a connector";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("c", "connectorName", true, "connector name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws SubCommandException {
+ try {
+ String connectorName = commandLine.getOptionValue('c').trim();
+ String request = "/connectors/" + connectorName + "/config";
+ URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request);
+ String result = new RestSender().sendHttpRequest(url.toString(), "");
+ System.out.printf("%s%n", result);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ }
+ }
+}
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/QueryConnectorStatusSubCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/QueryConnectorStatusSubCommand.java
new file mode 100644
index 000000000..9817a125f
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/QueryConnectorStatusSubCommand.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine;
+import org.apache.rocketmq.connect.cli.commom.Config;
+import org.apache.rocketmq.connect.cli.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.net.URL;
+
+public class QueryConnectorStatusSubCommand implements SubCommand {
+
+ private final Config config;
+
+ public QueryConnectorStatusSubCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "queryConnectorStatus";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Get Status information for a connector";
+ }
+
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("c", "connectorName", true, "connector name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws SubCommandException {
+ try {
+ String connectorName = commandLine.getOptionValue('c').trim();
+ String request = "/connectors/" + connectorName + "/config";
+ URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request);
+ String result = new RestSender().sendHttpRequest(url.toString(), "");
+ System.out.printf("%s%n", result);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ }
+ }
+}
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/ReloadPluginsSubCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/ReloadPluginsSubCommand.java
new file mode 100644
index 000000000..d70c57442
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/ReloadPluginsSubCommand.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine;
+import org.apache.rocketmq.connect.cli.commom.Config;
+import org.apache.rocketmq.connect.cli.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.net.URL;
+
+public class ReloadPluginsSubCommand implements SubCommand {
+
+ private final Config config;
+
+ public ReloadPluginsSubCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "reloadPlugins";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Reload the Connector file under the plugin directory";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws SubCommandException {
+ try {
+ String request = "/plugin/reload";
+ URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request);
+ String result = new RestSender().sendHttpRequest(url.toString(), "");
+ System.out.printf("%s%n", result);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ }
+ }
+}
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/StopAllSubCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/StopAllSubCommand.java
new file mode 100644
index 000000000..adee95fe5
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/StopAllSubCommand.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine;
+import org.apache.rocketmq.connect.cli.commom.Config;
+import org.apache.rocketmq.connect.cli.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.net.URL;
+
+public class StopAllSubCommand implements SubCommand {
+
+ private final Config config;
+
+ public StopAllSubCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "stopAll";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Stop and delete all Connectors and all configuration information";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws SubCommandException {
+ try {
+ String request = "/connectors/" + commandName();
+ URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request);
+ String result = new RestSender().sendHttpRequest(url.toString(), "");
+ System.out.printf("%s%n", result);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ }
+ }
+}
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/StopConnectorSubCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/StopConnectorSubCommand.java
new file mode 100644
index 000000000..497625ed9
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/StopConnectorSubCommand.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine;
+import org.apache.rocketmq.connect.cli.commom.Config;
+import org.apache.rocketmq.connect.cli.utils.RestSender;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.net.URL;
+
+
+public class StopConnectorSubCommand implements SubCommand {
+
+ private final Config config;
+
+ public StopConnectorSubCommand(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public String commandName() {
+ return "stopConnector";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Stop a specific connector by connector-name";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("c", "connectorName", true, "connector name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options) throws SubCommandException {
+ try {
+ String connectorName = commandLine.getOptionValue('c').trim();
+ String request = "/connectors/" + connectorName + "/stop";
+ URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request);
+ String result = new RestSender().sendHttpRequest(url.toString(), "");
+ System.out.printf("%s%n", result);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ }
+ }
+}
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/SubCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/SubCommand.java
new file mode 100644
index 000000000..44759a106
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/SubCommand.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public interface SubCommand {
+ String commandName();
+
+ String commandDesc();
+
+ Options buildCommandlineOptions(Options options);
+
+ void execute(CommandLine commandLine, Options options) throws SubCommandException;
+}
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/CLIConfigDefine.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/CLIConfigDefine.java
new file mode 100644
index 000000000..706ffbd23
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/CLIConfigDefine.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli.commom;
+
+/**
+ * Define all the logger name of the runtime.
+ */
+public class CLIConfigDefine {
+ public static final String PROTOCOL = "http";
+}
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/Config.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/Config.java
new file mode 100644
index 000000000..de94a8877
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/Config.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli.commom;
+
+/**
+ * Configurations for tools.
+ */
+public class Config {
+
+ private String httpAddr;
+
+ private int httpPort = 8081;
+
+ public String getHttpAddr() {
+ return httpAddr;
+ }
+
+ public void setHttpAddr(String httpAddr) {
+ this.httpAddr = httpAddr;
+ }
+
+ public int getHttpPort() {
+ return httpPort;
+ }
+
+ public void setHttpPort(int httpPort) {
+ this.httpPort = httpPort;
+ }
+}
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/ConnectKeyValue.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/ConnectKeyValue.java
new file mode 100644
index 000000000..320b218b8
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/ConnectKeyValue.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli.commom;
+
+import io.openmessaging.KeyValue;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Default Implements of {@link KeyValue} for runtime, which can be parsed by fastJson.
+ */
+public class ConnectKeyValue implements KeyValue, Serializable {
+
+ /**
+ * All data are reserved in this map.
+ */
+ private Map properties;
+
+ public ConnectKeyValue() {
+ properties = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public KeyValue put(String key, int value) {
+ properties.put(key, String.valueOf(value));
+ return this;
+ }
+
+ @Override
+ public KeyValue put(String key, long value) {
+ properties.put(key, String.valueOf(value));
+ return this;
+ }
+
+ @Override
+ public KeyValue put(String key, double value) {
+ properties.put(key, String.valueOf(value));
+ return this;
+ }
+
+ @Override
+ public KeyValue put(String key, String value) {
+ properties.put(key, String.valueOf(value));
+ return this;
+ }
+
+ @Override
+ public int getInt(String key) {
+ if (!properties.containsKey(key))
+ return 0;
+ return Integer.valueOf(properties.get(key));
+ }
+
+ @Override
+ public int getInt(final String key, final int defaultValue) {
+ return properties.containsKey(key) ? getInt(key) : defaultValue;
+ }
+
+ @Override
+ public long getLong(String key) {
+ if (!properties.containsKey(key))
+ return 0;
+ return Long.valueOf(properties.get(key));
+ }
+
+ @Override
+ public long getLong(final String key, final long defaultValue) {
+ return properties.containsKey(key) ? getLong(key) : defaultValue;
+ }
+
+ @Override
+ public double getDouble(String key) {
+ if (!properties.containsKey(key))
+ return 0;
+ return Double.valueOf(properties.get(key));
+ }
+
+ @Override
+ public double getDouble(final String key, final double defaultValue) {
+ return properties.containsKey(key) ? getDouble(key) : defaultValue;
+ }
+
+ @Override
+ public String getString(String key) {
+ return properties.get(key);
+ }
+
+ @Override
+ public String getString(final String key, final String defaultValue) {
+ return properties.containsKey(key) ? getString(key) : defaultValue;
+ }
+
+ @Override
+ public Set keySet() {
+ return properties.keySet();
+ }
+
+ @Override
+ public boolean containsKey(String key) {
+ return properties.containsKey(key);
+ }
+
+ public Map getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Map properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+
+ if (obj != null && obj.getClass() == this.getClass()) {
+ ConnectKeyValue keyValue = (ConnectKeyValue) obj;
+ return this.properties.equals(keyValue.getProperties());
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return properties.hashCode();
+ }
+
+ @Override public String toString() {
+ return "ConnectKeyValue{" +
+ "properties=" + properties +
+ '}';
+ }
+}
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/FileAndPropertyUtil.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/FileAndPropertyUtil.java
new file mode 100644
index 000000000..12a0fe381
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/FileAndPropertyUtil.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli.utils;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.rocketmq.connect.cli.commom.ConnectKeyValue;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utils for file and property.
+ */
+public class FileAndPropertyUtil {
+
+ public static String file2String(final String fileName) throws IOException {
+ File file = new File(fileName);
+ return file2String(file);
+ }
+
+ public static String file2String(final File file) throws IOException {
+ if (file.exists()) {
+ byte[] data = new byte[(int) file.length()];
+ boolean result;
+
+ FileInputStream inputStream = null;
+ try {
+ inputStream = new FileInputStream(file);
+ int len = inputStream.read(data);
+ result = len == data.length;
+ } finally {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ }
+
+ if (result) {
+ return new String(data);
+ }
+ }
+ return null;
+ }
+
+ public static ConnectKeyValue stringToKeyValue(String json) {
+
+ if (null == json) {
+ return new ConnectKeyValue();
+ }
+ Map map = JSON.parseObject(json, Map.class);
+ ConnectKeyValue keyValue = new ConnectKeyValue();
+ for (String key : map.keySet()) {
+ keyValue.put(key, map.get(key));
+ }
+ return keyValue;
+ }
+
+ public static void properties2Object(final Properties p, final Object object) {
+ Method[] methods = object.getClass().getMethods();
+ for (Method method : methods) {
+ String mn = method.getName();
+ if (mn.startsWith("set")) {
+ try {
+ String tmp = mn.substring(4);
+ String first = mn.substring(3, 4);
+
+ String key = first.toLowerCase() + tmp;
+ String property = p.getProperty(key);
+ if (property != null) {
+ Class>[] pt = method.getParameterTypes();
+ if (pt != null && pt.length > 0) {
+ String cn = pt[0].getSimpleName();
+ Object arg = null;
+ if (cn.equals("int") || cn.equals("Integer")) {
+ arg = Integer.parseInt(property);
+ } else if (cn.equals("long") || cn.equals("Long")) {
+ arg = Long.parseLong(property);
+ } else if (cn.equals("double") || cn.equals("Double")) {
+ arg = Double.parseDouble(property);
+ } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+ arg = Boolean.parseBoolean(property);
+ } else if (cn.equals("float") || cn.equals("Float")) {
+ arg = Float.parseFloat(property);
+ } else if (cn.equals("String")) {
+ arg = property;
+ } else {
+ continue;
+ }
+ method.invoke(object, arg);
+ }
+ }
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/RestSender.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/RestSender.java
new file mode 100644
index 000000000..19201b4b2
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/RestSender.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli.utils;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+
+import java.net.URLEncoder;
+
+public class RestSender {
+
+ public String sendHttpRequest(String baseUrl, String configs) {
+
+ try {
+ CloseableHttpClient client = null;
+ CloseableHttpResponse response = null;
+ try {
+ String encodedConfigs = URLEncoder.encode(configs, "utf-8");
+ HttpGet httpGet = new HttpGet(baseUrl + encodedConfigs);
+ client = HttpClients.createDefault();
+ response = client.execute(httpGet);
+ HttpEntity entity = response.getEntity();
+ String result = EntityUtils.toString(entity);
+ return result;
+ } finally {
+ if (response != null) {
+ response.close();
+ }
+ if (client != null) {
+ client.close();
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return "";
+ }
+}
diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/ServerUtil.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/ServerUtil.java
new file mode 100644
index 000000000..81dd58b98
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/ServerUtil.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cli.utils;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.ParseException;
+
+import java.util.Properties;
+
+public class ServerUtil {
+
+ public static Options buildCommandlineOptions(final Options options) {
+ Option opt = new Option("h", "help", false, "Print help");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ public static CommandLine parseCmdLine(final String appName, String[] args, Options options,
+ CommandLineParser parser) {
+ HelpFormatter hf = new HelpFormatter();
+ hf.setWidth(110);
+ CommandLine commandLine = null;
+ try {
+ commandLine = parser.parse(options, args);
+ if (commandLine.hasOption('h')) {
+ hf.printHelp(appName, options, true);
+ return null;
+ }
+ } catch (ParseException e) {
+ hf.printHelp(appName, options, true);
+ }
+
+ return commandLine;
+ }
+
+ public static void printCommandLineHelp(final String appName, final Options options) {
+ HelpFormatter hf = new HelpFormatter();
+ hf.setWidth(110);
+ hf.printHelp(appName, options, true);
+ }
+
+ public static Properties commandLine2Properties(final CommandLine commandLine) {
+ Properties properties = new Properties();
+ Option[] opts = commandLine.getOptions();
+
+ if (opts != null) {
+ for (Option opt : opts) {
+ String name = opt.getLongOpt();
+ String value = commandLine.getOptionValue(name);
+ if (value != null) {
+ properties.setProperty(name, value);
+ }
+ }
+ }
+
+ return properties;
+ }
+
+}
diff --git a/rocketmq-connect-cli/src/main/resources/connect.conf b/rocketmq-connect-cli/src/main/resources/connect.conf
new file mode 100644
index 000000000..9b8c36ffb
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/resources/connect.conf
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+## Http port for user to access REST API
+httpPort=8081
+
+# RocketMQ Connect Runtime Addr
+httpAddr=localhost
\ No newline at end of file
diff --git a/rocketmq-connect-cli/src/main/resources/package.xml b/rocketmq-connect-cli/src/main/resources/package.xml
new file mode 100644
index 000000000..91fce8549
--- /dev/null
+++ b/rocketmq-connect-cli/src/main/resources/package.xml
@@ -0,0 +1,40 @@
+
+
+
+ package
+
+ dir
+
+ false
+
+
+ src/main/resources
+ conf
+ false
+
+
+
+
+ lib
+ runtime
+
+
+
diff --git a/rocketmq-connect-runtime/pom.xml b/rocketmq-connect-runtime/pom.xml
index 32da8e3a4..d341bf65e 100644
--- a/rocketmq-connect-runtime/pom.xml
+++ b/rocketmq-connect-runtime/pom.xml
@@ -43,7 +43,7 @@
1.8
- 4.5.2
+ 4.7.0
@@ -186,7 +186,6 @@
org.apache.httpcomponents
httpclient
- test
io.openmessaging
@@ -231,4 +230,4 @@
reflections
-
\ No newline at end of file
+
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index a7a535594..171d394ca 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -240,6 +240,7 @@ private void sendRecord(Collection sourceDataEntries) {
try {
producer.send(sourceMessage, new SendCallback() {
@Override public void onSuccess(org.apache.rocketmq.client.producer.SendResult result) {
+ log.info("Successful send message to RocketMQ:{}", result.getMsgId());
try {
if (null != partition && null != position) {
positionData.put(partition, position);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
index 3b25738cb..1aff87d92 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
@@ -69,24 +69,16 @@ public RestHandler(ConnectController connectController) {
app.get("/plugin/reload", this::reloadPlugins);
}
- /**
- * We need to refactor this method to use json output format
- * @param context
- */
+
private void getAllocatedConnectors(Context context) {
Set workerConnectors = connectController.getWorker().getWorkingConnectors();
Set workerTasks = connectController.getWorker().getWorkingTasks();
- StringBuilder sb = new StringBuilder();
- sb.append("working connectors:\n");
+ Map connectors = new HashMap<>();
for (WorkerConnector workerConnector : workerConnectors) {
- sb.append(workerConnector.toString() + "\n");
+ connectors.put(workerConnector.getConnectorName(), workerConnector.getKeyValue());
}
- sb.append("working tasks:\n");
- for (Runnable runnable : workerTasks) {
- sb.append(runnable.toString() + "\n");
- }
- context.result(sb.toString());
+ context.result(JSON.toJSONString(connectors));
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index 227d37198..1e973ab2d 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -195,11 +195,11 @@ public void recomputeTaskConfigs(String connectorName, Connector connector, Long
@Override
public void removeConnectorConfig(String connectorName) {
- ConnectKeyValue config = new ConnectKeyValue();
+ ConnectKeyValue config = connectorKeyValueStore.get(connectorName);
config.put(RuntimeConfigDefine.UPDATE_TIMESATMP, System.currentTimeMillis());
config.put(RuntimeConfigDefine.CONFIG_DELETED, 1);
- List taskConfigList = new ArrayList<>();
+ List taskConfigList = taskKeyValueStore.get(connectorName);
taskConfigList.add(config);
connectorKeyValueStore.put(connectorName, config);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
index cb86aefb6..37b328661 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
@@ -80,7 +80,6 @@ public void doRebalance() {
List curAliveWorkers = clusterManagementService.getAllAliveWorkers();
Map curConnectorConfigs = configManagementService.getConnectorConfigs();
Map> curTaskConfigs = configManagementService.getTaskConfigs();
- log.info("[ISSUE #2027] The connectorConfigs are:" + curConnectorConfigs.toString() + " with timestamp :" + System.currentTimeMillis());
ConnAndTaskConfigs allocateResult = allocateConnAndTaskStrategy.allocate(curAliveWorkers, clusterManagementService.getCurrentWorker(), curConnectorConfigs, curTaskConfigs);
log.info("Allocated connector:{}", allocateResult.getConnectorConfigs());
log.info("Allocated task:{}", allocateResult.getTaskConfigs());
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
index 2474fe5f9..97856b35f 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
@@ -1,200 +1,200 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.connect.runtime.utils.datasync;
-
-import com.alibaba.fastjson.JSON;
-import io.openmessaging.connector.api.data.Converter;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendCallback;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
-import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine.MAX_MESSAGE_SIZE;
-
-/**
- * A Broker base data synchronizer, synchronize data between workers.
- *
- * @param
- * @param
- */
-public class BrokerBasedLog implements DataSynchronizer {
-
- private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
-
- /**
- * A callback to receive data from other workers.
- */
- private DataSynchronizerCallback dataSynchronizerCallback;
-
- /**
- * Producer to send data to broker.
- */
- private DefaultMQProducer producer;
-
- /**
- * Consumer to receive synchronize data from broker.
- */
- private DefaultMQPushConsumer consumer;
-
- /**
- * A queue to send or consume message.
- */
- private String topicName;
-
- /**
- * Used to convert key to byte[].
- */
- private Converter keyConverter;
-
- /**
- * Used to convert value to byte[].
- */
- private Converter valueConverter;
-
- public BrokerBasedLog(ConnectConfig connectConfig,
- String topicName,
- String workId,
- DataSynchronizerCallback dataSynchronizerCallback,
- Converter keyConverter,
- Converter valueConverter) {
-
- this.topicName = topicName;
- this.dataSynchronizerCallback = dataSynchronizerCallback;
- this.producer = new DefaultMQProducer();
- this.producer.setNamesrvAddr(connectConfig.getNamesrvAddr());
- this.producer.setInstanceName(ConnectUtil.createInstance(connectConfig.getNamesrvAddr()));
- this.producer.setProducerGroup(workId);
- this.producer.setSendMsgTimeout(connectConfig.getOperationTimeout());
- this.producer.setMaxMessageSize(MAX_MESSAGE_SIZE);
-
- this.consumer = new DefaultMQPushConsumer();
- this.consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
- this.consumer.setInstanceName(ConnectUtil.createInstance(connectConfig.getNamesrvAddr()));
- this.consumer.setConsumerGroup(workId);
- this.consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes());
- this.consumer.setConsumeTimeout((long) connectConfig.getRmqMessageConsumeTimeout());
- this.consumer.setConsumeThreadMin(connectConfig.getRmqMinConsumeThreadNums());
- this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- this.keyConverter = keyConverter;
- this.valueConverter = valueConverter;
- }
-
- @Override
- public void start() {
- try {
- producer.start();
- consumer.subscribe(topicName, "*");
- consumer.registerMessageListener(new MessageListenerImpl());
- consumer.start();
- } catch (MQClientException e) {
- log.error("Start error.", e);
- }
- }
-
- @Override
- public void stop() {
- producer.shutdown();
- consumer.shutdown();
- }
-
- @Override
- public void send(K key, V value) {
-
- try {
- byte[] messageBody = encodeKeyValue(key, value);
- if (messageBody.length > MAX_MESSAGE_SIZE) {
- log.error("Message size is greater than {} bytes, key: {}, value {}", MAX_MESSAGE_SIZE, key, value);
- return;
- }
- producer.send(new Message(topicName, messageBody), new SendCallback() {
- @Override public void onSuccess(org.apache.rocketmq.client.producer.SendResult result) {
- log.info("Send async message OK, msgId: {}", result.getMsgId());
- }
-
- @Override public void onException(Throwable throwable) {
- if (null != throwable) {
- log.error("Send async message Failed, error: {}", throwable);
- }
- }
- });
- } catch (Exception e) {
- log.error("BrokerBaseLog send async message Failed.", e);
- }
- }
-
- private byte[] encodeKeyValue(K key, V value) throws Exception {
-
- byte[] keyByte = keyConverter.objectToByte(key);
- byte[] valueByte = valueConverter.objectToByte(value);
- Map map = new HashMap<>();
- map.put(Base64.getEncoder().encodeToString(keyByte), Base64.getEncoder().encodeToString(valueByte));
-
- return JSON.toJSONString(map).getBytes("UTF-8");
- }
-
- private Map decodeKeyValue(byte[] bytes) throws Exception {
-
- Map resultMap = new HashMap<>();
- String rawString = new String(bytes, "UTF-8");
- Map map = JSON.parseObject(rawString, Map.class);
- for (String key : map.keySet()) {
- K decodeKey = (K) keyConverter.byteToObject(Base64.getDecoder().decode(key));
- V decodeValue = (V) valueConverter.byteToObject(Base64.getDecoder().decode(map.get(key)));
- resultMap.put(decodeKey, decodeValue);
- }
- return resultMap;
- }
-
- class MessageListenerImpl implements MessageListenerConcurrently {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List rmqMsgList,
- ConsumeConcurrentlyContext context) {
- for (MessageExt messageExt : rmqMsgList) {
- log.info("Received one message: {}", messageExt.getMsgId() + "\n");
- byte[] bytes = messageExt.getBody();
- Map map;
- try {
- map = decodeKeyValue(bytes);
- } catch (Exception e) {
- log.error("Decode message data error. message: {}, error info: {}", messageExt, e);
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- for (K key : map.keySet()) {
- dataSynchronizerCallback.onCompletion(null, key, map.get(key));
- }
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.utils.datasync;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.Converter;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine.MAX_MESSAGE_SIZE;
+
+/**
+ * A Broker base data synchronizer, synchronize data between workers.
+ *
+ * @param
+ * @param
+ */
+public class BrokerBasedLog implements DataSynchronizer {
+
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
+
+ /**
+ * A callback to receive data from other workers.
+ */
+ private DataSynchronizerCallback dataSynchronizerCallback;
+
+ /**
+ * Producer to send data to broker.
+ */
+ private DefaultMQProducer producer;
+
+ /**
+ * Consumer to receive synchronize data from broker.
+ */
+ private DefaultMQPushConsumer consumer;
+
+ /**
+ * A queue to send or consume message.
+ */
+ private String topicName;
+
+ /**
+ * Used to convert key to byte[].
+ */
+ private Converter keyConverter;
+
+ /**
+ * Used to convert value to byte[].
+ */
+ private Converter valueConverter;
+
+ public BrokerBasedLog(ConnectConfig connectConfig,
+ String topicName,
+ String workId,
+ DataSynchronizerCallback dataSynchronizerCallback,
+ Converter keyConverter,
+ Converter valueConverter) {
+
+ this.topicName = topicName;
+ this.dataSynchronizerCallback = dataSynchronizerCallback;
+ this.producer = new DefaultMQProducer();
+ this.producer.setNamesrvAddr(connectConfig.getNamesrvAddr());
+ this.producer.setInstanceName(ConnectUtil.createInstance(connectConfig.getNamesrvAddr()));
+ this.producer.setProducerGroup(workId);
+ this.producer.setSendMsgTimeout(connectConfig.getOperationTimeout());
+ this.producer.setMaxMessageSize(MAX_MESSAGE_SIZE);
+
+ this.consumer = new DefaultMQPushConsumer();
+ this.consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
+ this.consumer.setInstanceName(ConnectUtil.createInstance(connectConfig.getNamesrvAddr()));
+ this.consumer.setConsumerGroup(workId);
+ this.consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes());
+ this.consumer.setConsumeTimeout((long) connectConfig.getRmqMessageConsumeTimeout());
+ this.consumer.setConsumeThreadMin(connectConfig.getRmqMinConsumeThreadNums());
+ this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+ this.keyConverter = keyConverter;
+ this.valueConverter = valueConverter;
+ }
+
+ @Override
+ public void start() {
+ try {
+ producer.start();
+ consumer.subscribe(topicName, "*");
+ consumer.registerMessageListener(new MessageListenerImpl());
+ consumer.start();
+ } catch (MQClientException e) {
+ log.error("Start error.", e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ producer.shutdown();
+ consumer.shutdown();
+ }
+
+ @Override
+ public void send(K key, V value) {
+
+ try {
+ byte[] messageBody = encodeKeyValue(key, value);
+ if (messageBody.length > MAX_MESSAGE_SIZE) {
+ log.error("Message size is greater than {} bytes, key: {}, value {}", MAX_MESSAGE_SIZE, key, value);
+ return;
+ }
+ producer.send(new Message(topicName, messageBody), new SendCallback() {
+ @Override public void onSuccess(org.apache.rocketmq.client.producer.SendResult result) {
+ log.info("Send async message OK, msgId: {},topic:{}", result.getMsgId(), topicName);
+ }
+
+ @Override public void onException(Throwable throwable) {
+ if (null != throwable) {
+ log.error("Send async message Failed, error: {}", throwable);
+ }
+ }
+ });
+ } catch (Exception e) {
+ log.error("BrokerBaseLog send async message Failed.", e);
+ }
+ }
+
+ private byte[] encodeKeyValue(K key, V value) throws Exception {
+
+ byte[] keyByte = keyConverter.objectToByte(key);
+ byte[] valueByte = valueConverter.objectToByte(value);
+ Map map = new HashMap<>();
+ map.put(Base64.getEncoder().encodeToString(keyByte), Base64.getEncoder().encodeToString(valueByte));
+
+ return JSON.toJSONString(map).getBytes("UTF-8");
+ }
+
+ private Map decodeKeyValue(byte[] bytes) throws Exception {
+
+ Map resultMap = new HashMap<>();
+ String rawString = new String(bytes, "UTF-8");
+ Map map = JSON.parseObject(rawString, Map.class);
+ for (String key : map.keySet()) {
+ K decodeKey = (K) keyConverter.byteToObject(Base64.getDecoder().decode(key));
+ V decodeValue = (V) valueConverter.byteToObject(Base64.getDecoder().decode(map.get(key)));
+ resultMap.put(decodeKey, decodeValue);
+ }
+ return resultMap;
+ }
+
+ class MessageListenerImpl implements MessageListenerConcurrently {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List rmqMsgList,
+ ConsumeConcurrentlyContext context) {
+ for (MessageExt messageExt : rmqMsgList) {
+ log.info("Received one message: {}, topic is {}", messageExt.getMsgId() + "\n", topicName);
+ byte[] bytes = messageExt.getBody();
+ Map map;
+ try {
+ map = decodeKeyValue(bytes);
+ } catch (Exception e) {
+ log.error("Decode message data error. message: {}, error info: {}", messageExt, e);
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ }
+ for (K key : map.keySet()) {
+ dataSynchronizerCallback.onCompletion(null, key, map.get(key));
+ }
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ }
+
+}
diff --git a/rocketmq-connect-runtime/src/main/resources/connect.conf b/rocketmq-connect-runtime/src/main/resources/connect.conf
index bbbbb4ac4..5ec7a795e 100644
--- a/rocketmq-connect-runtime/src/main/resources/connect.conf
+++ b/rocketmq-connect-runtime/src/main/resources/connect.conf
@@ -13,11 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-## Http prot for user to access REST API
+## Http port for user to access REST API
httpPort=8081
# Rocketmq namesrvAddr
namesrvAddr=localhost:9876
# Source or sink connector jar file dir,The default value is rocketmq-connect-sample
-pluginPaths=/home/connect/rocketmq-connect/rocketmq-connect-sample/target
\ No newline at end of file
+pluginPaths=/usr/local/connector-plugins
diff --git a/rocketmq-connect-runtime/src/main/resources/package.xml b/rocketmq-connect-runtime/src/main/resources/package.xml
index e682f4a7c..91fce8549 100644
--- a/rocketmq-connect-runtime/src/main/resources/package.xml
+++ b/rocketmq-connect-runtime/src/main/resources/package.xml
@@ -1,40 +1,40 @@
-
-
-
- package
-
- dir
-
- false
-
-
- src/main/resources
- conf
- false
-
-
-
-
- lib
- runtime
-
-
-
\ No newline at end of file
+
+
+
+ package
+
+ dir
+
+ false
+
+
+ src/main/resources
+ conf
+ false
+
+
+
+
+ lib
+ runtime
+
+
+
diff --git a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java
index 078bc2d48..bda470d47 100644
--- a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java
+++ b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java
@@ -1,263 +1,267 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.connect.file;
-
-import com.alibaba.fastjson.JSONObject;
-import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.DataEntryBuilder;
-import io.openmessaging.connector.api.data.EntryType;
-import io.openmessaging.connector.api.data.Field;
-import io.openmessaging.connector.api.data.FieldType;
-import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SinkDataEntry;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import io.openmessaging.connector.api.exception.ConnectException;
-import io.openmessaging.connector.api.source.SourceTask;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.rocketmq.connect.file.FileConstants.LINE;
-
-public class FileSourceTask extends SourceTask {
-
- private Logger log = LoggerFactory.getLogger(LoggerName.FILE_CONNECTOR);
-
- private FileConfig fileConfig;
-
- private InputStream stream;
- private BufferedReader reader = null;
- private char[] buffer = new char[1024];
- private int offset = 0;
- private int batchSize = FileSourceConnector.DEFAULT_TASK_BATCH_SIZE;
-
- private Long streamOffset;
-
- @Override public Collection poll() {
- if (stream == null) {
- try {
- stream = Files.newInputStream(Paths.get(fileConfig.getFilename()));
- ByteBuffer positionInfo;
- positionInfo = this.context.positionStorageReader().getPosition(ByteBuffer.wrap(FileConstants.getPartition(fileConfig.getFilename()).getBytes(Charset.defaultCharset())));
- if (positionInfo != null) {
- String positionJson = new String(positionInfo.array(), Charset.defaultCharset());
- JSONObject jsonObject = JSONObject.parseObject(positionJson);
- Object lastRecordedOffset = jsonObject.getLong(FileConstants.NEXT_POSITION);
- if (lastRecordedOffset != null && !(lastRecordedOffset instanceof Long))
- throw new ConnectException(-1, "Offset position is the incorrect type");
- if (lastRecordedOffset != null) {
- log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
- long skipLeft = (Long) lastRecordedOffset;
- while (skipLeft > 0) {
- try {
- long skipped = stream.skip(skipLeft);
- skipLeft -= skipped;
- } catch (IOException e) {
- log.error("Error while trying to seek to previous offset in file {}: ", fileConfig.getFilename(), e);
- throw new ConnectException(-1, e);
- }
- }
- log.debug("Skipped to offset {}", lastRecordedOffset);
- }
- streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L;
- } else {
- streamOffset = 0L;
- }
- reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
- log.debug("Opened {} for reading", logFilename());
- } catch (NoSuchFileException e) {
- log.warn("Couldn't find file {} for FileStreamSourceTask, sleeping to wait for it to be created", logFilename());
- synchronized (this) {
- try {
- this.wait(1000);
- } catch (InterruptedException e1) {
- log.error("Interrupt error .", e1);
- }
- }
- return null;
- } catch (IOException e) {
- log.error("Error while trying to open file {}: ", fileConfig.getFilename(), e);
- throw new ConnectException(-1, e);
- }
- }
-
- try {
- final BufferedReader readerCopy;
- synchronized (this) {
- readerCopy = reader;
- }
- if (readerCopy == null) {
- return null;
- }
-
- Collection records = null;
-
- int nread = 0;
- while (readerCopy.ready()) {
- nread = readerCopy.read(buffer, offset, buffer.length - offset);
- log.trace("Read {} bytes from {}", nread, logFilename());
-
- if (nread > 0) {
- offset += nread;
- if (offset == buffer.length) {
- char[] newbuf = new char[buffer.length * 2];
- System.arraycopy(buffer, 0, newbuf, 0, buffer.length);
- buffer = newbuf;
- }
-
- String line;
- do {
- line = extractLine();
- if (line != null) {
- log.trace("Read a line from {}", logFilename());
- if (records == null) {
- records = new ArrayList<>();
- }
- Schema schema = new Schema();
- schema.setDataSource(fileConfig.getFilename());
- schema.setName(fileConfig.getFilename() + LINE);
- final Field field = new Field(0, FileConstants.FILE_LINE_CONTENT, FieldType.STRING);
- List fields = new ArrayList() {
- {
- add(field);
- }
- };
- schema.setFields(fields);
- DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema)
- .entryType(EntryType.CREATE)
- .queue(fileConfig.getTopic())
- .timestamp(System.currentTimeMillis())
- .putFiled(FileConstants.FILE_LINE_CONTENT, line);
- final SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(offsetKey(FileConstants.getPartition(fileConfig.getFilename())), offsetValue(streamOffset));
- records.add(sourceDataEntry);
- if (records.size() >= batchSize) {
- return records;
- }
- }
- }
- while (line != null);
- }
- }
-
- if (nread <= 0) {
- synchronized (this) {
- this.wait(1000);
- }
- }
-
- return records;
- } catch (IOException e) {
- } catch (InterruptedException e) {
- log.error("Interrupt error .", e);
- }
- return null;
- }
-
- private String extractLine() {
- int until = -1, newStart = -1;
- for (int i = 0; i < offset; i++) {
- if (buffer[i] == '\n') {
- until = i;
- newStart = i + 1;
- break;
- } else if (buffer[i] == '\r') {
- // We need to check for \r\n, so we must skip this if we can't check the next char
- if (i + 1 >= offset)
- return null;
-
- until = i;
- newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1;
- break;
- }
- }
-
- if (until != -1) {
- String result = new String(buffer, 0, until);
- System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart);
- offset = offset - newStart;
- if (streamOffset != null)
- streamOffset += newStart;
- return result;
- } else {
- return null;
- }
- }
-
- public void commitRecord(SourceDataEntry sourceDataEntry, SinkDataEntry sinkDataEntry) {
- log.info("commit sink queueOffset: {} ", sinkDataEntry.getQueueOffset());
- }
-
- @Override public void start(KeyValue props) {
- fileConfig = new FileConfig();
- fileConfig.load(props);
- if (fileConfig.getFilename() == null || fileConfig.getFilename().isEmpty()) {
- stream = System.in;
- streamOffset = null;
- reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
- }
- }
-
- @Override public void stop() {
- log.trace("Stopping");
- synchronized (this) {
- try {
- if (stream != null && stream != System.in) {
- stream.close();
- log.trace("Closed input stream");
- }
- } catch (IOException e) {
- log.error("Failed to close FileStreamSourceTask stream: ", e);
- }
- this.notify();
- }
- }
-
- @Override public void pause() {
-
- }
-
- @Override public void resume() {
-
- }
-
- private String logFilename() {
- return fileConfig.getFilename() == null ? "stdin" : fileConfig.getFilename();
- }
-
- private ByteBuffer offsetKey(String filename) {
- return ByteBuffer.wrap(filename.getBytes(Charset.defaultCharset()));
- }
-
- private ByteBuffer offsetValue(Long pos) {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(FileConstants.NEXT_POSITION, pos);
- return ByteBuffer.wrap(jsonObject.toJSONString().getBytes(Charset.defaultCharset()));
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.file;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.DataEntryBuilder;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SinkDataEntry;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.exception.ConnectException;
+import io.openmessaging.connector.api.source.SourceTask;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.connect.file.FileConstants.LINE;
+
+public class FileSourceTask extends SourceTask {
+
+ private Logger log = LoggerFactory.getLogger(LoggerName.FILE_CONNECTOR);
+
+ private FileConfig fileConfig;
+
+ private InputStream stream;
+ private BufferedReader reader = null;
+ private char[] buffer = new char[1024];
+ private int offset = 0;
+ private int batchSize = FileSourceConnector.DEFAULT_TASK_BATCH_SIZE;
+
+ private Long streamOffset;
+
+ @Override public Collection poll() {
+ log.info("Start a poll stream is null:{}", stream == null);
+ if (stream == null) {
+ try {
+ stream = Files.newInputStream(Paths.get(fileConfig.getFilename()));
+ ByteBuffer positionInfo;
+ positionInfo = this.context.positionStorageReader().getPosition(ByteBuffer.wrap(FileConstants.getPartition(fileConfig.getFilename()).getBytes(Charset.defaultCharset())));
+ if (positionInfo != null) {
+ log.info("positionInfo is not null!");
+ String positionJson = new String(positionInfo.array(), Charset.defaultCharset());
+ JSONObject jsonObject = JSONObject.parseObject(positionJson);
+ Object lastRecordedOffset = jsonObject.getLong(FileConstants.NEXT_POSITION);
+ if (lastRecordedOffset != null && !(lastRecordedOffset instanceof Long))
+ throw new ConnectException(-1, "Offset position is the incorrect type");
+ if (lastRecordedOffset != null) {
+ log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
+ long skipLeft = (Long) lastRecordedOffset;
+ while (skipLeft > 0) {
+ try {
+ long skipped = stream.skip(skipLeft);
+ skipLeft -= skipped;
+ } catch (IOException e) {
+ log.error("Error while trying to seek to previous offset in file {}: ", fileConfig.getFilename(), e);
+ throw new ConnectException(-1, e);
+ }
+ }
+ log.debug("Skipped to offset {}", lastRecordedOffset);
+ }
+ streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L;
+ } else {
+ log.info("positionInfo is null!");
+ streamOffset = 0L;
+ }
+ reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
+ log.debug("Opened {} for reading", logFilename());
+ } catch (NoSuchFileException e) {
+ log.warn("Couldn't find file {} for FileStreamSourceTask, sleeping to wait for it to be created", logFilename());
+ synchronized (this) {
+ try {
+ this.wait(1000);
+ } catch (InterruptedException e1) {
+ log.error("Interrupt error .", e1);
+ }
+ }
+ return null;
+ } catch (IOException e) {
+ log.error("Error while trying to open file {}: ", fileConfig.getFilename(), e);
+ throw new ConnectException(-1, e);
+ }
+ }
+
+ try {
+ final BufferedReader readerCopy;
+ synchronized (this) {
+ readerCopy = reader;
+ }
+ if (readerCopy == null) {
+ return null;
+ }
+
+ Collection records = null;
+
+ int nread = 0;
+ while (readerCopy.ready()) {
+ nread = readerCopy.read(buffer, offset, buffer.length - offset);
+ log.trace("Read {} bytes from {}", nread, logFilename());
+
+ if (nread > 0) {
+ offset += nread;
+ if (offset == buffer.length) {
+ char[] newbuf = new char[buffer.length * 2];
+ System.arraycopy(buffer, 0, newbuf, 0, buffer.length);
+ buffer = newbuf;
+ }
+
+ String line;
+ do {
+ line = extractLine();
+ if (line != null) {
+ log.trace("Read a line from {}", logFilename());
+ if (records == null) {
+ records = new ArrayList<>();
+ }
+ Schema schema = new Schema();
+ schema.setDataSource(fileConfig.getFilename());
+ schema.setName(fileConfig.getFilename() + LINE);
+ final Field field = new Field(0, FileConstants.FILE_LINE_CONTENT, FieldType.STRING);
+ List fields = new ArrayList() {
+ {
+ add(field);
+ }
+ };
+ schema.setFields(fields);
+ DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema)
+ .entryType(EntryType.CREATE)
+ .queue(fileConfig.getTopic())
+ .timestamp(System.currentTimeMillis())
+ .putFiled(FileConstants.FILE_LINE_CONTENT, line);
+ final SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(offsetKey(FileConstants.getPartition(fileConfig.getFilename())), offsetValue(streamOffset));
+ records.add(sourceDataEntry);
+ if (records.size() >= batchSize) {
+ return records;
+ }
+ }
+ }
+ while (line != null);
+ }
+ }
+
+ if (nread <= 0) {
+ synchronized (this) {
+ this.wait(1000);
+ }
+ }
+
+ return records;
+ } catch (IOException e) {
+ } catch (InterruptedException e) {
+ log.error("Interrupt error .", e);
+ }
+ return null;
+ }
+
+ private String extractLine() {
+ int until = -1, newStart = -1;
+ for (int i = 0; i < offset; i++) {
+ if (buffer[i] == '\n') {
+ until = i;
+ newStart = i + 1;
+ break;
+ } else if (buffer[i] == '\r') {
+ // We need to check for \r\n, so we must skip this if we can't check the next char
+ if (i + 1 >= offset)
+ return null;
+
+ until = i;
+ newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1;
+ break;
+ }
+ }
+
+ if (until != -1) {
+ String result = new String(buffer, 0, until);
+ System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart);
+ offset = offset - newStart;
+ if (streamOffset != null)
+ streamOffset += newStart;
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ public void commitRecord(SourceDataEntry sourceDataEntry, SinkDataEntry sinkDataEntry) {
+ log.info("commit sink queueOffset: {} ", sinkDataEntry.getQueueOffset());
+ }
+
+ @Override public void start(KeyValue props) {
+ fileConfig = new FileConfig();
+ fileConfig.load(props);
+ log.info("fileName is:{}", fileConfig.getFilename());
+ if (fileConfig.getFilename() == null || fileConfig.getFilename().isEmpty()) {
+ stream = System.in;
+ streamOffset = null;
+ reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
+ }
+ }
+
+ @Override public void stop() {
+ log.trace("Stopping");
+ synchronized (this) {
+ try {
+ if (stream != null && stream != System.in) {
+ stream.close();
+ log.trace("Closed input stream");
+ }
+ } catch (IOException e) {
+ log.error("Failed to close FileStreamSourceTask stream: ", e);
+ }
+ this.notify();
+ }
+ }
+
+ @Override public void pause() {
+
+ }
+
+ @Override public void resume() {
+
+ }
+
+ private String logFilename() {
+ return fileConfig.getFilename() == null ? "stdin" : fileConfig.getFilename();
+ }
+
+ private ByteBuffer offsetKey(String filename) {
+ return ByteBuffer.wrap(filename.getBytes(Charset.defaultCharset()));
+ }
+
+ private ByteBuffer offsetValue(Long pos) {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(FileConstants.NEXT_POSITION, pos);
+ return ByteBuffer.wrap(jsonObject.toJSONString().getBytes(Charset.defaultCharset()));
+ }
+
+}