From 2d4874a9a54b3831558fdf683433b3e76d28cd7d Mon Sep 17 00:00:00 2001 From: Dreaouth Date: Sun, 20 Sep 2020 19:09:29 +0800 Subject: [PATCH] [ISSUE #570] ASoC connect runtime optimization: CLI (#622) feature(rocketmq-runtime) add CLI support for rocketmq-connect-runtime * Add CLI * Fix checkstyle * Optimize CLI structure * Add README.md * Rename CLI * Update pom.xml * Optimize the connectors and tasks format * Fix newline format --- pom.xml | 1 + rocketmq-connect-cli/README.md | 69 +++ rocketmq-connect-cli/connectAdmin | 18 + rocketmq-connect-cli/pom.xml | 186 ++++++ .../connect/cli/ConnectAdminStartup.java | 194 +++++++ .../command/CreateConnectorSubCommand.java | 92 +++ .../cli/command/GetAllocatedConnectors.java | 88 +++ .../cli/command/GetAllocatedInfoCommand.java | 63 +++ .../cli/command/GetAllocatedTasks.java | 94 ++++ .../cli/command/GetClusterInfoSubCommand.java | 63 +++ .../cli/command/GetConfigInfoSubCommand.java | 63 +++ .../QueryConnectorConfigSubCommand.java | 70 +++ .../QueryConnectorStatusSubCommand.java | 70 +++ .../cli/command/ReloadPluginsSubCommand.java | 63 +++ .../cli/command/StopAllSubCommand.java | 63 +++ .../cli/command/StopConnectorSubCommand.java | 70 +++ .../connect/cli/command/SubCommand.java | 32 ++ .../connect/cli/commom/CLIConfigDefine.java | 25 + .../rocketmq/connect/cli/commom/Config.java | 44 ++ .../connect/cli/commom/ConnectKeyValue.java | 149 +++++ .../cli/utils/FileAndPropertyUtil.java | 117 ++++ .../connect/cli/utils/RestSender.java | 57 ++ .../connect/cli/utils/ServerUtil.java | 80 +++ .../src/main/resources/connect.conf | 20 + .../src/main/resources/package.xml | 40 ++ rocketmq-connect-runtime/pom.xml | 5 +- .../connectorwrapper/WorkerSourceTask.java | 1 + .../connect/runtime/rest/RestHandler.java | 16 +- .../service/ConfigManagementServiceImpl.java | 4 +- .../runtime/service/RebalanceImpl.java | 1 - .../utils/datasync/BrokerBasedLog.java | 400 ++++++------- .../src/main/resources/connect.conf | 4 +- .../src/main/resources/package.xml | 80 +-- .../rocketmq/connect/file/FileSourceTask.java | 530 +++++++++--------- 34 files changed, 2349 insertions(+), 523 deletions(-) create mode 100644 rocketmq-connect-cli/README.md create mode 100644 rocketmq-connect-cli/connectAdmin create mode 100644 rocketmq-connect-cli/pom.xml create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/ConnectAdminStartup.java create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/CreateConnectorSubCommand.java create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedConnectors.java create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedInfoCommand.java create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedTasks.java create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetClusterInfoSubCommand.java create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetConfigInfoSubCommand.java create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/QueryConnectorConfigSubCommand.java create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/QueryConnectorStatusSubCommand.java create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/ReloadPluginsSubCommand.java create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/StopAllSubCommand.java create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/StopConnectorSubCommand.java create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/SubCommand.java create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/CLIConfigDefine.java create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/Config.java create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/ConnectKeyValue.java create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/FileAndPropertyUtil.java create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/RestSender.java create mode 100644 rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/ServerUtil.java create mode 100644 rocketmq-connect-cli/src/main/resources/connect.conf create mode 100644 rocketmq-connect-cli/src/main/resources/package.xml diff --git a/pom.xml b/pom.xml index 1262d7abf..833cdcb0f 100644 --- a/pom.xml +++ b/pom.xml @@ -25,6 +25,7 @@ rocketmq-connect-sample rocketmq-connect-runtime + rocketmq-connect-cli RocketMQ Connect diff --git a/rocketmq-connect-cli/README.md b/rocketmq-connect-cli/README.md new file mode 100644 index 000000000..4c28bf6c8 --- /dev/null +++ b/rocketmq-connect-cli/README.md @@ -0,0 +1,69 @@ +# rocketmq-connect-CLI Admin + +与RocketMQ中的mqadmin类似,使用简洁的CLI命令实现增加,删除,查看connector等功能 + +在rocketmq-connect\rocketmq-connect-CLI目录下,运行`sh connectAdmin` + +```bash +The most commonly used connectAdmin commands are: + createConnector Create and start a connector by connector's config + stopConnector Stop a specific connector by connector-name + queryConnectorConfig Get configuration information for a connector + queryConnectorStatus Get Status information for a connector + stopAll Stop and delete all Connectors and all configuration information + reloadPlugins Reload the Connector file under the plugin directory + getConfigInfo Get all configuration information + getClusterInfo Get cluster information + getAllocatedInfo Get the load information of the current worker + +See 'connectAdmin help ' for more information on a specific command. +``` + +如上所示,其中列出了最常用的命令,并附有简短说明。要获取每个命令的详细手册,请使用`sh connectAdmin help `。例如,命令`sh connectAdmin help stopConnector`将输出如下内容: + +```bash +$ sh connectAdmin help stopConnector +usage: connectAdmin stopConnector -c [-h] + -c,--connectorName connector name + -h,--help Print help +``` + +## getAllocatedConnectors&getAllocatedTasks + +提供格式化输出当前connectors和tasks + +| taskName | connectorName | status | topic | update-timestamp | +| -------------- | ------------------- | ---------- | --------- | ---------------- | +| JdbcSourceTask | jdbcConnectorSource | TERMINATED | jdbcTopic | 1597409102590 | +| FileSourceTask | fileConnectorSource | TERMINATED | fileTopic | 1597409110815 | +| FileSinkTask | fileConnectorSink | STOPPING | fileTopic | 1597409204516 | + +## createConnector + +其中要说明的一点是,createConnector需要指定配置文件的路径 + +```bash +$ sh connectAdmin help createConnector +usage: connectAdmin createConnector -c [-h] -p + -c,--connectorName connector name + -h,--help Print help + -p,--path Configuration file pat +``` + +所以在启动新的connector时,要用`-p`指定json配置文件的路径,例如 + +```bash +sh connectAdmin createConnector -c fileConnectorSource -p /root/shell/file-connector.json +``` + +配置文件格式参考具体的connector,这里给出file-connector的格式 + +```json +{ + "connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector", + "topic":"fileTopic", + "filename":"/opt/source-file/source-file.txt", + "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter" +} +``` + diff --git a/rocketmq-connect-cli/connectAdmin b/rocketmq-connect-cli/connectAdmin new file mode 100644 index 000000000..5ded525c7 --- /dev/null +++ b/rocketmq-connect-cli/connectAdmin @@ -0,0 +1,18 @@ +#!/bin/sh +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cd target/distribution/ && java -cp .:./conf/:./lib/* org.apache.rocketmq.connect.cli.ConnectAdminStartup -s conf/connect.conf $@ diff --git a/rocketmq-connect-cli/pom.xml b/rocketmq-connect-cli/pom.xml new file mode 100644 index 000000000..047398265 --- /dev/null +++ b/rocketmq-connect-cli/pom.xml @@ -0,0 +1,186 @@ + + + + + rocketmq-connect + org.apache.rocketmq + 0.0.1-SNAPSHOT + + 4.0.0 + + rocketmq-connect-cli + + + UTF-8 + UTF-8 + + + 1.8 + 1.8 + + + 4.7.0 + + + + + + src/main/resources + + *.xml + connect.conf + + true + + + + + maven-assembly-plugin + 2.2.1 + + distribution + false + false + + src/main/resources/package.xml + + + + + make-assembly + package + + single + + + + + + + + org.codehaus.mojo + versions-maven-plugin + 2.3 + + + org.codehaus.mojo + clirr-maven-plugin + 2.7 + + + maven-compiler-plugin + 3.6.1 + + ${maven.compiler.source} + ${maven.compiler.target} + ${maven.compiler.source} + true + true + + + + maven-surefire-plugin + 2.19.1 + + -Xms512m -Xmx1024m + always + + **/*Test.java + + + + + maven-site-plugin + 3.6 + + en_US + UTF-8 + UTF-8 + + + + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + maven-javadoc-plugin + 2.10.4 + + UTF-8 + en_US + + + + aggregate + + aggregate + + site + + + + + maven-resources-plugin + 3.0.2 + + ${project.build.sourceEncoding} + + + + org.codehaus.mojo + findbugs-maven-plugin + 3.0.4 + + + + + + + org.slf4j + slf4j-api + + + org.apache.httpcomponents + httpclient + + + io.openmessaging + openmessaging-connector + + + commons-cli + commons-cli + + + org.apache.rocketmq + rocketmq-tools + ${rocketmq.version} + + + + diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/ConnectAdminStartup.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/ConnectAdminStartup.java new file mode 100644 index 000000000..cee747643 --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/ConnectAdminStartup.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli; + + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.connect.cli.command.SubCommand; +import org.apache.rocketmq.connect.cli.command.GetAllocatedConnectors; +import org.apache.rocketmq.connect.cli.command.GetAllocatedTasks; +import org.apache.rocketmq.connect.cli.command.CreateConnectorSubCommand; +import org.apache.rocketmq.connect.cli.command.GetAllocatedInfoCommand; +import org.apache.rocketmq.connect.cli.command.GetClusterInfoSubCommand; +import org.apache.rocketmq.connect.cli.command.GetConfigInfoSubCommand; +import org.apache.rocketmq.connect.cli.command.QueryConnectorConfigSubCommand; +import org.apache.rocketmq.connect.cli.command.QueryConnectorStatusSubCommand; +import org.apache.rocketmq.connect.cli.command.ReloadPluginsSubCommand; +import org.apache.rocketmq.connect.cli.command.StopAllSubCommand; +import org.apache.rocketmq.connect.cli.command.StopConnectorSubCommand; +import org.apache.rocketmq.connect.cli.commom.Config; +import org.apache.rocketmq.connect.cli.utils.FileAndPropertyUtil; +import org.apache.rocketmq.connect.cli.utils.ServerUtil; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +public class ConnectAdminStartup { + + protected static List subCommandList = new ArrayList(); + + public static CommandLine commandLine = null; + + public static String configFile = null; + + public static Properties properties = null; + + public static void main(String[] args) { + System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); + + String[] configArgs = Arrays.copyOfRange(args, 0, 2); + args = Arrays.copyOfRange(args, 2, args.length); + + try { + + // Build the command line options. + Options option = ServerUtil.buildCommandlineOptions(new Options()); + commandLine = ServerUtil.parseCmdLine("connect", configArgs, buildCommandlineOptions(option), + new PosixParser()); + if (null == commandLine) { + System.exit(-1); + } + + // Load configs from command line. + Config config = new Config(); + if (commandLine.hasOption('s')) { + String file = commandLine.getOptionValue('s'); + if (file != null) { + configFile = file; + InputStream in = new BufferedInputStream(new FileInputStream(file)); + properties = new Properties(); + properties.load(in); + FileAndPropertyUtil.properties2Object(properties, config); + in.close(); + } + } + + initCommand(config); + + switch (args.length) { + case 0: + printHelp(); + break; + case 2: + if (args[0].equals("help")) { + SubCommand cmd = findSubCommand(args[1]); + if (cmd != null) { + Options options = ServerUtil.buildCommandlineOptions(new Options()); + options = cmd.buildCommandlineOptions(options); + if (options != null) { + ServerUtil.printCommandLineHelp("connectAdmin " + cmd.commandName(), options); + } + } else { + System.out.printf("The sub command %s not exist.%n", args[1]); + } + break; + } + case 1: + default: + SubCommand cmd = findSubCommand(args[0]); + if (cmd != null) { + String[] subargs = parseSubArgs(args); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + final CommandLine commandLine = + ServerUtil.parseCmdLine("connectAdmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), + new PosixParser()); + if (null == commandLine) { + return; + } + + cmd.execute(commandLine, options); + } else { + System.out.printf("The sub command %s not exist.%n", args[0]); + } + break; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void initCommand(Config config) { + initCommand(new CreateConnectorSubCommand(config)); + initCommand(new StopConnectorSubCommand(config)); + initCommand(new QueryConnectorConfigSubCommand(config)); + initCommand(new QueryConnectorStatusSubCommand(config)); + initCommand(new StopAllSubCommand(config)); + initCommand(new ReloadPluginsSubCommand(config)); + initCommand(new GetConfigInfoSubCommand(config)); + initCommand(new GetClusterInfoSubCommand(config)); + initCommand(new GetAllocatedInfoCommand(config)); + initCommand(new GetAllocatedConnectors(config)); + initCommand(new GetAllocatedTasks(config)); + } + + + private static void printHelp() { + System.out.printf("The most commonly used connectAdmin commands are:%n"); + for (SubCommand cmd : subCommandList) { + System.out.printf(" %-25s %s%n", cmd.commandName(), cmd.commandDesc()); + } + + System.out.printf("%nSee 'connectAdmin help ' for more information on a specific command.%n"); + } + + private static SubCommand findSubCommand(final String name) { + for (SubCommand cmd : subCommandList) { + if (cmd.commandName().toUpperCase().equals(name.toUpperCase())) { + return cmd; + } + } + + return null; + } + + private static String[] parseSubArgs(String[] args) { + if (args.length > 1) { + String[] result = new String[args.length - 1]; + for (int i = 0; i < args.length - 1; i++) { + result[i] = args[i + 1]; + } + return result; + } + return null; + } + + public static void initCommand(SubCommand command) { + subCommandList.add(command); + } + + private static Options buildCommandlineOptions(Options options) { + + Option opt = new Option("s", "configFile", true, "connect config properties file"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + +} diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/CreateConnectorSubCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/CreateConnectorSubCommand.java new file mode 100644 index 000000000..6cc7b356c --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/CreateConnectorSubCommand.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli.command; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine; +import org.apache.rocketmq.connect.cli.commom.Config; +import org.apache.rocketmq.connect.cli.utils.RestSender; +import org.apache.rocketmq.tools.command.SubCommandException; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URL; +import java.nio.charset.StandardCharsets; + +public class CreateConnectorSubCommand implements SubCommand { + + private final Config config; + + public CreateConnectorSubCommand(Config config) { + this.config = config; + } + + @Override + public String commandName() { + return "createConnector"; + } + + @Override + public String commandDesc() { + return "Create and start a connector by connector's config"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("c", "connectorName", true, "connector name"); + opt.setRequired(true); + options.addOption(opt); + opt = new Option("p", "path", true, "Configuration file path"); + opt.setRequired(true); + options.addOption(opt); + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options) throws SubCommandException { + try { + String connectorName = commandLine.getOptionValue('c').trim(); + String request = "/connectors/" + connectorName + "?config="; + URL baseUrl = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request); + String filePath = commandLine.getOptionValue('p').trim(); + String config = readFile(filePath); + String result = new RestSender().sendHttpRequest(baseUrl.toString(), config); + System.out.printf(result + "%n"); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } + } + + private String readFile(String filePath) { + StringBuilder sb = new StringBuilder(); + try { + BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), StandardCharsets.UTF_8)); + int index = 0; + while ((index = reader.read()) != -1) { + sb.append((char) index); + } + } catch (IOException e) { + e.printStackTrace(); + } + return sb.toString(); + } +} diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedConnectors.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedConnectors.java new file mode 100644 index 000000000..95b618ba1 --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedConnectors.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli.command; + + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine; +import org.apache.rocketmq.connect.cli.commom.Config; +import org.apache.rocketmq.connect.cli.commom.ConnectKeyValue; +import org.apache.rocketmq.connect.cli.utils.RestSender; +import org.apache.rocketmq.tools.command.SubCommandException; + +import java.net.URL; +import java.util.Map; + +import static org.apache.rocketmq.connect.cli.utils.FileAndPropertyUtil.stringToKeyValue; + +public class GetAllocatedConnectors implements SubCommand { + + private final Config config; + + public GetAllocatedConnectors(Config config) { + this.config = config; + } + + @Override + public String commandName() { + return "getAllocatedConnectors"; + } + + @Override + public String commandDesc() { + return "Get allocated connectors information"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options) throws SubCommandException { + try { + String request = "/getAllocatedConnectors"; + URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request); + String result = new RestSender().sendHttpRequest(url.toString(), ""); + showConnectorInfo(result); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } + } + + private void showConnectorInfo(String result) { + Map connectMap = JSON.parseObject(result, Map.class); + System.out.printf("%-25s %-15s %-17s %-25s%n", "connectorName", "status", "topic", "update-timestamp"); + for (Map.Entry entry : connectMap.entrySet()) { + String json = entry.getValue().getString("properties"); + ConnectKeyValue keyValue = stringToKeyValue(json); + String connectorName = entry.getKey(); + String topic; + if (keyValue.getString("topic") != null) { + topic = keyValue.getString("topic"); + } else { + topic = keyValue.getString("topicNames"); + } + String updateTimestamp = keyValue.getString("update-timestamp"); + System.out.printf("%-25s %-15s %-17s %-25s%n", connectorName, "RUNNING", topic, updateTimestamp); + } + } +} diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedInfoCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedInfoCommand.java new file mode 100644 index 000000000..49f1a7729 --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedInfoCommand.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli.command; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine; +import org.apache.rocketmq.connect.cli.commom.Config; +import org.apache.rocketmq.connect.cli.utils.RestSender; +import org.apache.rocketmq.tools.command.SubCommandException; + +import java.net.URL; + +public class GetAllocatedInfoCommand implements SubCommand { + + private final Config config; + + public GetAllocatedInfoCommand(Config config) { + this.config = config; + } + + @Override + public String commandName() { + return "getAllocatedInfo"; + } + + @Override + public String commandDesc() { + return "Get the load information of the current worker"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options) throws SubCommandException { + try { + String request = "/" + commandName(); + URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request); + String result = new RestSender().sendHttpRequest(url.toString(), ""); + System.out.printf("%s%n", result); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } + } +} diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedTasks.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedTasks.java new file mode 100644 index 000000000..6e4d790a3 --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetAllocatedTasks.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli.command; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine; +import org.apache.rocketmq.connect.cli.commom.Config; +import org.apache.rocketmq.connect.cli.commom.ConnectKeyValue; +import org.apache.rocketmq.connect.cli.utils.RestSender; +import org.apache.rocketmq.tools.command.SubCommandException; + +import java.net.URL; +import java.util.List; +import java.util.Map; + +import static org.apache.rocketmq.connect.cli.utils.FileAndPropertyUtil.stringToKeyValue; + +public class GetAllocatedTasks implements SubCommand { + + private final Config config; + + public GetAllocatedTasks(Config config) { + this.config = config; + } + + @Override + public String commandName() { + return "getAllocatedTasks"; + } + + @Override + public String commandDesc() { + return "Get allocated tasks information"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options) throws SubCommandException { + try { + String request = "/getAllocatedTasks"; + URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request); + String result = new RestSender().sendHttpRequest(url.toString(), ""); + showTaskInfo(result); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } + } + + private void showTaskInfo(String result) { + Map> taskMap = JSON.parseObject(result, Map.class); + System.out.printf("%-22s %-22s %-12s %-15s %-20s %-30s%n", "taskName", "connectorName", "status", "topic", "update-timestamp", "taskId"); + for (Map.Entry> entry : taskMap.entrySet()) { + for (JSONObject jsonObject: entry.getValue()) { + String connectorName = jsonObject.getString("connectorName"); + String status = jsonObject.getString("state"); + ConnectKeyValue keyValue = stringToKeyValue(jsonObject.getJSONObject("configs").getString("properties")); + String[] taskNameTmp = keyValue.getString("task-class").split("\\."); + String taskName = taskNameTmp[taskNameTmp.length - 1]; + String taskId = keyValue.getString("task-id"); + String topic; + if (keyValue.getString("topic") != null) { + topic = keyValue.getString("topic"); + } else { + topic = keyValue.getString("topicNames"); + } + String updateTimestamp = keyValue.getString("update-timestamp"); + System.out.printf("%-22s %-22s %-12s %-15s %-20s %-30s%n", taskName, connectorName, status, topic, updateTimestamp, taskId); + } + + } + } +} diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetClusterInfoSubCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetClusterInfoSubCommand.java new file mode 100644 index 000000000..4698883c9 --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetClusterInfoSubCommand.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli.command; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine; +import org.apache.rocketmq.connect.cli.commom.Config; +import org.apache.rocketmq.connect.cli.utils.RestSender; +import org.apache.rocketmq.tools.command.SubCommandException; + +import java.net.URL; + +public class GetClusterInfoSubCommand implements SubCommand { + + private final Config config; + + public GetClusterInfoSubCommand(Config config) { + this.config = config; + } + + @Override + public String commandName() { + return "getClusterInfo"; + } + + @Override + public String commandDesc() { + return "Get cluster information"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options) throws SubCommandException { + try { + String request = "/" + commandName(); + URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request); + String result = new RestSender().sendHttpRequest(url.toString(), ""); + System.out.printf("%s%n", result); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } + } +} diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetConfigInfoSubCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetConfigInfoSubCommand.java new file mode 100644 index 000000000..8ded0def0 --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/GetConfigInfoSubCommand.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli.command; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine; +import org.apache.rocketmq.connect.cli.commom.Config; +import org.apache.rocketmq.connect.cli.utils.RestSender; +import org.apache.rocketmq.tools.command.SubCommandException; + +import java.net.URL; + +public class GetConfigInfoSubCommand implements SubCommand { + + private final Config config; + + public GetConfigInfoSubCommand(Config config) { + this.config = config; + } + + @Override + public String commandName() { + return "getConfigInfo"; + } + + @Override + public String commandDesc() { + return "Get all connectors and tasks configuration information"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options) throws SubCommandException { + try { + String request = "/" + commandName(); + URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request); + String result = new RestSender().sendHttpRequest(url.toString(), ""); + System.out.printf("%s%n", result); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } + } +} diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/QueryConnectorConfigSubCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/QueryConnectorConfigSubCommand.java new file mode 100644 index 000000000..04f11b3c0 --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/QueryConnectorConfigSubCommand.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli.command; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine; +import org.apache.rocketmq.connect.cli.commom.Config; +import org.apache.rocketmq.connect.cli.utils.RestSender; +import org.apache.rocketmq.tools.command.SubCommandException; + +import java.net.URL; + +public class QueryConnectorConfigSubCommand implements SubCommand { + + private final Config config; + + public QueryConnectorConfigSubCommand(Config config) { + this.config = config; + } + + @Override + public String commandName() { + return "queryConnectorConfig"; + } + + @Override + public String commandDesc() { + return "Get configuration information for a connector"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("c", "connectorName", true, "connector name"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options) throws SubCommandException { + try { + String connectorName = commandLine.getOptionValue('c').trim(); + String request = "/connectors/" + connectorName + "/config"; + URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request); + String result = new RestSender().sendHttpRequest(url.toString(), ""); + System.out.printf("%s%n", result); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } + } +} diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/QueryConnectorStatusSubCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/QueryConnectorStatusSubCommand.java new file mode 100644 index 000000000..9817a125f --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/QueryConnectorStatusSubCommand.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli.command; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine; +import org.apache.rocketmq.connect.cli.commom.Config; +import org.apache.rocketmq.connect.cli.utils.RestSender; +import org.apache.rocketmq.tools.command.SubCommandException; + +import java.net.URL; + +public class QueryConnectorStatusSubCommand implements SubCommand { + + private final Config config; + + public QueryConnectorStatusSubCommand(Config config) { + this.config = config; + } + + @Override + public String commandName() { + return "queryConnectorStatus"; + } + + @Override + public String commandDesc() { + return "Get Status information for a connector"; + } + + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("c", "connectorName", true, "connector name"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options) throws SubCommandException { + try { + String connectorName = commandLine.getOptionValue('c').trim(); + String request = "/connectors/" + connectorName + "/config"; + URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request); + String result = new RestSender().sendHttpRequest(url.toString(), ""); + System.out.printf("%s%n", result); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } + } +} diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/ReloadPluginsSubCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/ReloadPluginsSubCommand.java new file mode 100644 index 000000000..d70c57442 --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/ReloadPluginsSubCommand.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli.command; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine; +import org.apache.rocketmq.connect.cli.commom.Config; +import org.apache.rocketmq.connect.cli.utils.RestSender; +import org.apache.rocketmq.tools.command.SubCommandException; + +import java.net.URL; + +public class ReloadPluginsSubCommand implements SubCommand { + + private final Config config; + + public ReloadPluginsSubCommand(Config config) { + this.config = config; + } + + @Override + public String commandName() { + return "reloadPlugins"; + } + + @Override + public String commandDesc() { + return "Reload the Connector file under the plugin directory"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options) throws SubCommandException { + try { + String request = "/plugin/reload"; + URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request); + String result = new RestSender().sendHttpRequest(url.toString(), ""); + System.out.printf("%s%n", result); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } + } +} diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/StopAllSubCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/StopAllSubCommand.java new file mode 100644 index 000000000..adee95fe5 --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/StopAllSubCommand.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli.command; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine; +import org.apache.rocketmq.connect.cli.commom.Config; +import org.apache.rocketmq.connect.cli.utils.RestSender; +import org.apache.rocketmq.tools.command.SubCommandException; + +import java.net.URL; + +public class StopAllSubCommand implements SubCommand { + + private final Config config; + + public StopAllSubCommand(Config config) { + this.config = config; + } + + @Override + public String commandName() { + return "stopAll"; + } + + @Override + public String commandDesc() { + return "Stop and delete all Connectors and all configuration information"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options) throws SubCommandException { + try { + String request = "/connectors/" + commandName(); + URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request); + String result = new RestSender().sendHttpRequest(url.toString(), ""); + System.out.printf("%s%n", result); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } + } +} diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/StopConnectorSubCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/StopConnectorSubCommand.java new file mode 100644 index 000000000..497625ed9 --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/StopConnectorSubCommand.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli.command; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.connect.cli.commom.CLIConfigDefine; +import org.apache.rocketmq.connect.cli.commom.Config; +import org.apache.rocketmq.connect.cli.utils.RestSender; +import org.apache.rocketmq.tools.command.SubCommandException; + +import java.net.URL; + + +public class StopConnectorSubCommand implements SubCommand { + + private final Config config; + + public StopConnectorSubCommand(Config config) { + this.config = config; + } + + @Override + public String commandName() { + return "stopConnector"; + } + + @Override + public String commandDesc() { + return "Stop a specific connector by connector-name"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("c", "connectorName", true, "connector name"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options) throws SubCommandException { + try { + String connectorName = commandLine.getOptionValue('c').trim(); + String request = "/connectors/" + connectorName + "/stop"; + URL url = new URL(CLIConfigDefine.PROTOCOL, config.getHttpAddr(), config.getHttpPort(), request); + String result = new RestSender().sendHttpRequest(url.toString(), ""); + System.out.printf("%s%n", result); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } + } +} diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/SubCommand.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/SubCommand.java new file mode 100644 index 000000000..44759a106 --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/command/SubCommand.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli.command; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.tools.command.SubCommandException; + +public interface SubCommand { + String commandName(); + + String commandDesc(); + + Options buildCommandlineOptions(Options options); + + void execute(CommandLine commandLine, Options options) throws SubCommandException; +} diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/CLIConfigDefine.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/CLIConfigDefine.java new file mode 100644 index 000000000..706ffbd23 --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/CLIConfigDefine.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli.commom; + +/** + * Define all the logger name of the runtime. + */ +public class CLIConfigDefine { + public static final String PROTOCOL = "http"; +} diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/Config.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/Config.java new file mode 100644 index 000000000..de94a8877 --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/Config.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli.commom; + +/** + * Configurations for tools. + */ +public class Config { + + private String httpAddr; + + private int httpPort = 8081; + + public String getHttpAddr() { + return httpAddr; + } + + public void setHttpAddr(String httpAddr) { + this.httpAddr = httpAddr; + } + + public int getHttpPort() { + return httpPort; + } + + public void setHttpPort(int httpPort) { + this.httpPort = httpPort; + } +} diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/ConnectKeyValue.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/ConnectKeyValue.java new file mode 100644 index 000000000..320b218b8 --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/ConnectKeyValue.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli.commom; + +import io.openmessaging.KeyValue; + +import java.io.Serializable; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Default Implements of {@link KeyValue} for runtime, which can be parsed by fastJson. + */ +public class ConnectKeyValue implements KeyValue, Serializable { + + /** + * All data are reserved in this map. + */ + private Map properties; + + public ConnectKeyValue() { + properties = new ConcurrentHashMap<>(); + } + + @Override + public KeyValue put(String key, int value) { + properties.put(key, String.valueOf(value)); + return this; + } + + @Override + public KeyValue put(String key, long value) { + properties.put(key, String.valueOf(value)); + return this; + } + + @Override + public KeyValue put(String key, double value) { + properties.put(key, String.valueOf(value)); + return this; + } + + @Override + public KeyValue put(String key, String value) { + properties.put(key, String.valueOf(value)); + return this; + } + + @Override + public int getInt(String key) { + if (!properties.containsKey(key)) + return 0; + return Integer.valueOf(properties.get(key)); + } + + @Override + public int getInt(final String key, final int defaultValue) { + return properties.containsKey(key) ? getInt(key) : defaultValue; + } + + @Override + public long getLong(String key) { + if (!properties.containsKey(key)) + return 0; + return Long.valueOf(properties.get(key)); + } + + @Override + public long getLong(final String key, final long defaultValue) { + return properties.containsKey(key) ? getLong(key) : defaultValue; + } + + @Override + public double getDouble(String key) { + if (!properties.containsKey(key)) + return 0; + return Double.valueOf(properties.get(key)); + } + + @Override + public double getDouble(final String key, final double defaultValue) { + return properties.containsKey(key) ? getDouble(key) : defaultValue; + } + + @Override + public String getString(String key) { + return properties.get(key); + } + + @Override + public String getString(final String key, final String defaultValue) { + return properties.containsKey(key) ? getString(key) : defaultValue; + } + + @Override + public Set keySet() { + return properties.keySet(); + } + + @Override + public boolean containsKey(String key) { + return properties.containsKey(key); + } + + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + + @Override + public boolean equals(Object obj) { + + if (obj != null && obj.getClass() == this.getClass()) { + ConnectKeyValue keyValue = (ConnectKeyValue) obj; + return this.properties.equals(keyValue.getProperties()); + } + return false; + } + + @Override + public int hashCode() { + return properties.hashCode(); + } + + @Override public String toString() { + return "ConnectKeyValue{" + + "properties=" + properties + + '}'; + } +} diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/FileAndPropertyUtil.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/FileAndPropertyUtil.java new file mode 100644 index 000000000..12a0fe381 --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/FileAndPropertyUtil.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli.utils; + +import com.alibaba.fastjson.JSON; +import org.apache.rocketmq.connect.cli.commom.ConnectKeyValue; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; +import java.util.Properties; + +/** + * Utils for file and property. + */ +public class FileAndPropertyUtil { + + public static String file2String(final String fileName) throws IOException { + File file = new File(fileName); + return file2String(file); + } + + public static String file2String(final File file) throws IOException { + if (file.exists()) { + byte[] data = new byte[(int) file.length()]; + boolean result; + + FileInputStream inputStream = null; + try { + inputStream = new FileInputStream(file); + int len = inputStream.read(data); + result = len == data.length; + } finally { + if (inputStream != null) { + inputStream.close(); + } + } + + if (result) { + return new String(data); + } + } + return null; + } + + public static ConnectKeyValue stringToKeyValue(String json) { + + if (null == json) { + return new ConnectKeyValue(); + } + Map map = JSON.parseObject(json, Map.class); + ConnectKeyValue keyValue = new ConnectKeyValue(); + for (String key : map.keySet()) { + keyValue.put(key, map.get(key)); + } + return keyValue; + } + + public static void properties2Object(final Properties p, final Object object) { + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(4); + String first = mn.substring(3, 4); + + String key = first.toLowerCase() + tmp; + String property = p.getProperty(key); + if (property != null) { + Class[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg = null; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, arg); + } + } + } catch (IllegalAccessException | InvocationTargetException e) { + e.printStackTrace(); + } + } + } + } +} diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/RestSender.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/RestSender.java new file mode 100644 index 000000000..19201b4b2 --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/RestSender.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli.utils; + +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; + +import java.net.URLEncoder; + +public class RestSender { + + public String sendHttpRequest(String baseUrl, String configs) { + + try { + CloseableHttpClient client = null; + CloseableHttpResponse response = null; + try { + String encodedConfigs = URLEncoder.encode(configs, "utf-8"); + HttpGet httpGet = new HttpGet(baseUrl + encodedConfigs); + client = HttpClients.createDefault(); + response = client.execute(httpGet); + HttpEntity entity = response.getEntity(); + String result = EntityUtils.toString(entity); + return result; + } finally { + if (response != null) { + response.close(); + } + if (client != null) { + client.close(); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + return ""; + } +} diff --git a/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/ServerUtil.java b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/ServerUtil.java new file mode 100644 index 000000000..81dd58b98 --- /dev/null +++ b/rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/utils/ServerUtil.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.cli.utils; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.ParseException; + +import java.util.Properties; + +public class ServerUtil { + + public static Options buildCommandlineOptions(final Options options) { + Option opt = new Option("h", "help", false, "Print help"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + public static CommandLine parseCmdLine(final String appName, String[] args, Options options, + CommandLineParser parser) { + HelpFormatter hf = new HelpFormatter(); + hf.setWidth(110); + CommandLine commandLine = null; + try { + commandLine = parser.parse(options, args); + if (commandLine.hasOption('h')) { + hf.printHelp(appName, options, true); + return null; + } + } catch (ParseException e) { + hf.printHelp(appName, options, true); + } + + return commandLine; + } + + public static void printCommandLineHelp(final String appName, final Options options) { + HelpFormatter hf = new HelpFormatter(); + hf.setWidth(110); + hf.printHelp(appName, options, true); + } + + public static Properties commandLine2Properties(final CommandLine commandLine) { + Properties properties = new Properties(); + Option[] opts = commandLine.getOptions(); + + if (opts != null) { + for (Option opt : opts) { + String name = opt.getLongOpt(); + String value = commandLine.getOptionValue(name); + if (value != null) { + properties.setProperty(name, value); + } + } + } + + return properties; + } + +} diff --git a/rocketmq-connect-cli/src/main/resources/connect.conf b/rocketmq-connect-cli/src/main/resources/connect.conf new file mode 100644 index 000000000..9b8c36ffb --- /dev/null +++ b/rocketmq-connect-cli/src/main/resources/connect.conf @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +## Http port for user to access REST API +httpPort=8081 + +# RocketMQ Connect Runtime Addr +httpAddr=localhost \ No newline at end of file diff --git a/rocketmq-connect-cli/src/main/resources/package.xml b/rocketmq-connect-cli/src/main/resources/package.xml new file mode 100644 index 000000000..91fce8549 --- /dev/null +++ b/rocketmq-connect-cli/src/main/resources/package.xml @@ -0,0 +1,40 @@ + + + + package + + dir + + false + + + src/main/resources + conf + false + + + + + lib + runtime + + + diff --git a/rocketmq-connect-runtime/pom.xml b/rocketmq-connect-runtime/pom.xml index 32da8e3a4..d341bf65e 100644 --- a/rocketmq-connect-runtime/pom.xml +++ b/rocketmq-connect-runtime/pom.xml @@ -43,7 +43,7 @@ 1.8 - 4.5.2 + 4.7.0 @@ -186,7 +186,6 @@ org.apache.httpcomponents httpclient - test io.openmessaging @@ -231,4 +230,4 @@ reflections - \ No newline at end of file + diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java index a7a535594..171d394ca 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java @@ -240,6 +240,7 @@ private void sendRecord(Collection sourceDataEntries) { try { producer.send(sourceMessage, new SendCallback() { @Override public void onSuccess(org.apache.rocketmq.client.producer.SendResult result) { + log.info("Successful send message to RocketMQ:{}", result.getMsgId()); try { if (null != partition && null != position) { positionData.put(partition, position); diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java index 3b25738cb..1aff87d92 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java @@ -69,24 +69,16 @@ public RestHandler(ConnectController connectController) { app.get("/plugin/reload", this::reloadPlugins); } - /** - * We need to refactor this method to use json output format - * @param context - */ + private void getAllocatedConnectors(Context context) { Set workerConnectors = connectController.getWorker().getWorkingConnectors(); Set workerTasks = connectController.getWorker().getWorkingTasks(); - StringBuilder sb = new StringBuilder(); - sb.append("working connectors:\n"); + Map connectors = new HashMap<>(); for (WorkerConnector workerConnector : workerConnectors) { - sb.append(workerConnector.toString() + "\n"); + connectors.put(workerConnector.getConnectorName(), workerConnector.getKeyValue()); } - sb.append("working tasks:\n"); - for (Runnable runnable : workerTasks) { - sb.append(runnable.toString() + "\n"); - } - context.result(sb.toString()); + context.result(JSON.toJSONString(connectors)); } diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java index 227d37198..1e973ab2d 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java @@ -195,11 +195,11 @@ public void recomputeTaskConfigs(String connectorName, Connector connector, Long @Override public void removeConnectorConfig(String connectorName) { - ConnectKeyValue config = new ConnectKeyValue(); + ConnectKeyValue config = connectorKeyValueStore.get(connectorName); config.put(RuntimeConfigDefine.UPDATE_TIMESATMP, System.currentTimeMillis()); config.put(RuntimeConfigDefine.CONFIG_DELETED, 1); - List taskConfigList = new ArrayList<>(); + List taskConfigList = taskKeyValueStore.get(connectorName); taskConfigList.add(config); connectorKeyValueStore.put(connectorName, config); diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java index cb86aefb6..37b328661 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java @@ -80,7 +80,6 @@ public void doRebalance() { List curAliveWorkers = clusterManagementService.getAllAliveWorkers(); Map curConnectorConfigs = configManagementService.getConnectorConfigs(); Map> curTaskConfigs = configManagementService.getTaskConfigs(); - log.info("[ISSUE #2027] The connectorConfigs are:" + curConnectorConfigs.toString() + " with timestamp :" + System.currentTimeMillis()); ConnAndTaskConfigs allocateResult = allocateConnAndTaskStrategy.allocate(curAliveWorkers, clusterManagementService.getCurrentWorker(), curConnectorConfigs, curTaskConfigs); log.info("Allocated connector:{}", allocateResult.getConnectorConfigs()); log.info("Allocated task:{}", allocateResult.getTaskConfigs()); diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java index 2474fe5f9..97856b35f 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java @@ -1,200 +1,200 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.connect.runtime.utils.datasync; - -import com.alibaba.fastjson.JSON; -import io.openmessaging.connector.api.data.Converter; -import java.util.Base64; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.common.consumer.ConsumeFromWhere; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.connect.runtime.common.LoggerName; -import org.apache.rocketmq.connect.runtime.config.ConnectConfig; -import org.apache.rocketmq.connect.runtime.utils.ConnectUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine.MAX_MESSAGE_SIZE; - -/** - * A Broker base data synchronizer, synchronize data between workers. - * - * @param - * @param - */ -public class BrokerBasedLog implements DataSynchronizer { - - private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME); - - /** - * A callback to receive data from other workers. - */ - private DataSynchronizerCallback dataSynchronizerCallback; - - /** - * Producer to send data to broker. - */ - private DefaultMQProducer producer; - - /** - * Consumer to receive synchronize data from broker. - */ - private DefaultMQPushConsumer consumer; - - /** - * A queue to send or consume message. - */ - private String topicName; - - /** - * Used to convert key to byte[]. - */ - private Converter keyConverter; - - /** - * Used to convert value to byte[]. - */ - private Converter valueConverter; - - public BrokerBasedLog(ConnectConfig connectConfig, - String topicName, - String workId, - DataSynchronizerCallback dataSynchronizerCallback, - Converter keyConverter, - Converter valueConverter) { - - this.topicName = topicName; - this.dataSynchronizerCallback = dataSynchronizerCallback; - this.producer = new DefaultMQProducer(); - this.producer.setNamesrvAddr(connectConfig.getNamesrvAddr()); - this.producer.setInstanceName(ConnectUtil.createInstance(connectConfig.getNamesrvAddr())); - this.producer.setProducerGroup(workId); - this.producer.setSendMsgTimeout(connectConfig.getOperationTimeout()); - this.producer.setMaxMessageSize(MAX_MESSAGE_SIZE); - - this.consumer = new DefaultMQPushConsumer(); - this.consumer.setNamesrvAddr(connectConfig.getNamesrvAddr()); - this.consumer.setInstanceName(ConnectUtil.createInstance(connectConfig.getNamesrvAddr())); - this.consumer.setConsumerGroup(workId); - this.consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes()); - this.consumer.setConsumeTimeout((long) connectConfig.getRmqMessageConsumeTimeout()); - this.consumer.setConsumeThreadMin(connectConfig.getRmqMinConsumeThreadNums()); - this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); - this.keyConverter = keyConverter; - this.valueConverter = valueConverter; - } - - @Override - public void start() { - try { - producer.start(); - consumer.subscribe(topicName, "*"); - consumer.registerMessageListener(new MessageListenerImpl()); - consumer.start(); - } catch (MQClientException e) { - log.error("Start error.", e); - } - } - - @Override - public void stop() { - producer.shutdown(); - consumer.shutdown(); - } - - @Override - public void send(K key, V value) { - - try { - byte[] messageBody = encodeKeyValue(key, value); - if (messageBody.length > MAX_MESSAGE_SIZE) { - log.error("Message size is greater than {} bytes, key: {}, value {}", MAX_MESSAGE_SIZE, key, value); - return; - } - producer.send(new Message(topicName, messageBody), new SendCallback() { - @Override public void onSuccess(org.apache.rocketmq.client.producer.SendResult result) { - log.info("Send async message OK, msgId: {}", result.getMsgId()); - } - - @Override public void onException(Throwable throwable) { - if (null != throwable) { - log.error("Send async message Failed, error: {}", throwable); - } - } - }); - } catch (Exception e) { - log.error("BrokerBaseLog send async message Failed.", e); - } - } - - private byte[] encodeKeyValue(K key, V value) throws Exception { - - byte[] keyByte = keyConverter.objectToByte(key); - byte[] valueByte = valueConverter.objectToByte(value); - Map map = new HashMap<>(); - map.put(Base64.getEncoder().encodeToString(keyByte), Base64.getEncoder().encodeToString(valueByte)); - - return JSON.toJSONString(map).getBytes("UTF-8"); - } - - private Map decodeKeyValue(byte[] bytes) throws Exception { - - Map resultMap = new HashMap<>(); - String rawString = new String(bytes, "UTF-8"); - Map map = JSON.parseObject(rawString, Map.class); - for (String key : map.keySet()) { - K decodeKey = (K) keyConverter.byteToObject(Base64.getDecoder().decode(key)); - V decodeValue = (V) valueConverter.byteToObject(Base64.getDecoder().decode(map.get(key))); - resultMap.put(decodeKey, decodeValue); - } - return resultMap; - } - - class MessageListenerImpl implements MessageListenerConcurrently { - @Override - public ConsumeConcurrentlyStatus consumeMessage(List rmqMsgList, - ConsumeConcurrentlyContext context) { - for (MessageExt messageExt : rmqMsgList) { - log.info("Received one message: {}", messageExt.getMsgId() + "\n"); - byte[] bytes = messageExt.getBody(); - Map map; - try { - map = decodeKeyValue(bytes); - } catch (Exception e) { - log.error("Decode message data error. message: {}, error info: {}", messageExt, e); - return ConsumeConcurrentlyStatus.RECONSUME_LATER; - } - for (K key : map.keySet()) { - dataSynchronizerCallback.onCompletion(null, key, map.get(key)); - } - } - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - } - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.runtime.utils.datasync; + +import com.alibaba.fastjson.JSON; +import io.openmessaging.connector.api.data.Converter; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.connect.runtime.common.LoggerName; +import org.apache.rocketmq.connect.runtime.config.ConnectConfig; +import org.apache.rocketmq.connect.runtime.utils.ConnectUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine.MAX_MESSAGE_SIZE; + +/** + * A Broker base data synchronizer, synchronize data between workers. + * + * @param + * @param + */ +public class BrokerBasedLog implements DataSynchronizer { + + private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME); + + /** + * A callback to receive data from other workers. + */ + private DataSynchronizerCallback dataSynchronizerCallback; + + /** + * Producer to send data to broker. + */ + private DefaultMQProducer producer; + + /** + * Consumer to receive synchronize data from broker. + */ + private DefaultMQPushConsumer consumer; + + /** + * A queue to send or consume message. + */ + private String topicName; + + /** + * Used to convert key to byte[]. + */ + private Converter keyConverter; + + /** + * Used to convert value to byte[]. + */ + private Converter valueConverter; + + public BrokerBasedLog(ConnectConfig connectConfig, + String topicName, + String workId, + DataSynchronizerCallback dataSynchronizerCallback, + Converter keyConverter, + Converter valueConverter) { + + this.topicName = topicName; + this.dataSynchronizerCallback = dataSynchronizerCallback; + this.producer = new DefaultMQProducer(); + this.producer.setNamesrvAddr(connectConfig.getNamesrvAddr()); + this.producer.setInstanceName(ConnectUtil.createInstance(connectConfig.getNamesrvAddr())); + this.producer.setProducerGroup(workId); + this.producer.setSendMsgTimeout(connectConfig.getOperationTimeout()); + this.producer.setMaxMessageSize(MAX_MESSAGE_SIZE); + + this.consumer = new DefaultMQPushConsumer(); + this.consumer.setNamesrvAddr(connectConfig.getNamesrvAddr()); + this.consumer.setInstanceName(ConnectUtil.createInstance(connectConfig.getNamesrvAddr())); + this.consumer.setConsumerGroup(workId); + this.consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes()); + this.consumer.setConsumeTimeout((long) connectConfig.getRmqMessageConsumeTimeout()); + this.consumer.setConsumeThreadMin(connectConfig.getRmqMinConsumeThreadNums()); + this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + this.keyConverter = keyConverter; + this.valueConverter = valueConverter; + } + + @Override + public void start() { + try { + producer.start(); + consumer.subscribe(topicName, "*"); + consumer.registerMessageListener(new MessageListenerImpl()); + consumer.start(); + } catch (MQClientException e) { + log.error("Start error.", e); + } + } + + @Override + public void stop() { + producer.shutdown(); + consumer.shutdown(); + } + + @Override + public void send(K key, V value) { + + try { + byte[] messageBody = encodeKeyValue(key, value); + if (messageBody.length > MAX_MESSAGE_SIZE) { + log.error("Message size is greater than {} bytes, key: {}, value {}", MAX_MESSAGE_SIZE, key, value); + return; + } + producer.send(new Message(topicName, messageBody), new SendCallback() { + @Override public void onSuccess(org.apache.rocketmq.client.producer.SendResult result) { + log.info("Send async message OK, msgId: {},topic:{}", result.getMsgId(), topicName); + } + + @Override public void onException(Throwable throwable) { + if (null != throwable) { + log.error("Send async message Failed, error: {}", throwable); + } + } + }); + } catch (Exception e) { + log.error("BrokerBaseLog send async message Failed.", e); + } + } + + private byte[] encodeKeyValue(K key, V value) throws Exception { + + byte[] keyByte = keyConverter.objectToByte(key); + byte[] valueByte = valueConverter.objectToByte(value); + Map map = new HashMap<>(); + map.put(Base64.getEncoder().encodeToString(keyByte), Base64.getEncoder().encodeToString(valueByte)); + + return JSON.toJSONString(map).getBytes("UTF-8"); + } + + private Map decodeKeyValue(byte[] bytes) throws Exception { + + Map resultMap = new HashMap<>(); + String rawString = new String(bytes, "UTF-8"); + Map map = JSON.parseObject(rawString, Map.class); + for (String key : map.keySet()) { + K decodeKey = (K) keyConverter.byteToObject(Base64.getDecoder().decode(key)); + V decodeValue = (V) valueConverter.byteToObject(Base64.getDecoder().decode(map.get(key))); + resultMap.put(decodeKey, decodeValue); + } + return resultMap; + } + + class MessageListenerImpl implements MessageListenerConcurrently { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List rmqMsgList, + ConsumeConcurrentlyContext context) { + for (MessageExt messageExt : rmqMsgList) { + log.info("Received one message: {}, topic is {}", messageExt.getMsgId() + "\n", topicName); + byte[] bytes = messageExt.getBody(); + Map map; + try { + map = decodeKeyValue(bytes); + } catch (Exception e) { + log.error("Decode message data error. message: {}, error info: {}", messageExt, e); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + for (K key : map.keySet()) { + dataSynchronizerCallback.onCompletion(null, key, map.get(key)); + } + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + } + +} diff --git a/rocketmq-connect-runtime/src/main/resources/connect.conf b/rocketmq-connect-runtime/src/main/resources/connect.conf index bbbbb4ac4..5ec7a795e 100644 --- a/rocketmq-connect-runtime/src/main/resources/connect.conf +++ b/rocketmq-connect-runtime/src/main/resources/connect.conf @@ -13,11 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -## Http prot for user to access REST API +## Http port for user to access REST API httpPort=8081 # Rocketmq namesrvAddr namesrvAddr=localhost:9876 # Source or sink connector jar file dir,The default value is rocketmq-connect-sample -pluginPaths=/home/connect/rocketmq-connect/rocketmq-connect-sample/target \ No newline at end of file +pluginPaths=/usr/local/connector-plugins diff --git a/rocketmq-connect-runtime/src/main/resources/package.xml b/rocketmq-connect-runtime/src/main/resources/package.xml index e682f4a7c..91fce8549 100644 --- a/rocketmq-connect-runtime/src/main/resources/package.xml +++ b/rocketmq-connect-runtime/src/main/resources/package.xml @@ -1,40 +1,40 @@ - - - - package - - dir - - false - - - src/main/resources - conf - false - - - - - lib - runtime - - - \ No newline at end of file + + + + package + + dir + + false + + + src/main/resources + conf + false + + + + + lib + runtime + + + diff --git a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java index 078bc2d48..bda470d47 100644 --- a/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java +++ b/rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileSourceTask.java @@ -1,263 +1,267 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.connect.file; - -import com.alibaba.fastjson.JSONObject; -import io.openmessaging.KeyValue; -import io.openmessaging.connector.api.data.DataEntryBuilder; -import io.openmessaging.connector.api.data.EntryType; -import io.openmessaging.connector.api.data.Field; -import io.openmessaging.connector.api.data.FieldType; -import io.openmessaging.connector.api.data.Schema; -import io.openmessaging.connector.api.data.SinkDataEntry; -import io.openmessaging.connector.api.data.SourceDataEntry; -import io.openmessaging.connector.api.exception.ConnectException; -import io.openmessaging.connector.api.source.SourceTask; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.NoSuchFileException; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.rocketmq.connect.file.FileConstants.LINE; - -public class FileSourceTask extends SourceTask { - - private Logger log = LoggerFactory.getLogger(LoggerName.FILE_CONNECTOR); - - private FileConfig fileConfig; - - private InputStream stream; - private BufferedReader reader = null; - private char[] buffer = new char[1024]; - private int offset = 0; - private int batchSize = FileSourceConnector.DEFAULT_TASK_BATCH_SIZE; - - private Long streamOffset; - - @Override public Collection poll() { - if (stream == null) { - try { - stream = Files.newInputStream(Paths.get(fileConfig.getFilename())); - ByteBuffer positionInfo; - positionInfo = this.context.positionStorageReader().getPosition(ByteBuffer.wrap(FileConstants.getPartition(fileConfig.getFilename()).getBytes(Charset.defaultCharset()))); - if (positionInfo != null) { - String positionJson = new String(positionInfo.array(), Charset.defaultCharset()); - JSONObject jsonObject = JSONObject.parseObject(positionJson); - Object lastRecordedOffset = jsonObject.getLong(FileConstants.NEXT_POSITION); - if (lastRecordedOffset != null && !(lastRecordedOffset instanceof Long)) - throw new ConnectException(-1, "Offset position is the incorrect type"); - if (lastRecordedOffset != null) { - log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset); - long skipLeft = (Long) lastRecordedOffset; - while (skipLeft > 0) { - try { - long skipped = stream.skip(skipLeft); - skipLeft -= skipped; - } catch (IOException e) { - log.error("Error while trying to seek to previous offset in file {}: ", fileConfig.getFilename(), e); - throw new ConnectException(-1, e); - } - } - log.debug("Skipped to offset {}", lastRecordedOffset); - } - streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L; - } else { - streamOffset = 0L; - } - reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8)); - log.debug("Opened {} for reading", logFilename()); - } catch (NoSuchFileException e) { - log.warn("Couldn't find file {} for FileStreamSourceTask, sleeping to wait for it to be created", logFilename()); - synchronized (this) { - try { - this.wait(1000); - } catch (InterruptedException e1) { - log.error("Interrupt error .", e1); - } - } - return null; - } catch (IOException e) { - log.error("Error while trying to open file {}: ", fileConfig.getFilename(), e); - throw new ConnectException(-1, e); - } - } - - try { - final BufferedReader readerCopy; - synchronized (this) { - readerCopy = reader; - } - if (readerCopy == null) { - return null; - } - - Collection records = null; - - int nread = 0; - while (readerCopy.ready()) { - nread = readerCopy.read(buffer, offset, buffer.length - offset); - log.trace("Read {} bytes from {}", nread, logFilename()); - - if (nread > 0) { - offset += nread; - if (offset == buffer.length) { - char[] newbuf = new char[buffer.length * 2]; - System.arraycopy(buffer, 0, newbuf, 0, buffer.length); - buffer = newbuf; - } - - String line; - do { - line = extractLine(); - if (line != null) { - log.trace("Read a line from {}", logFilename()); - if (records == null) { - records = new ArrayList<>(); - } - Schema schema = new Schema(); - schema.setDataSource(fileConfig.getFilename()); - schema.setName(fileConfig.getFilename() + LINE); - final Field field = new Field(0, FileConstants.FILE_LINE_CONTENT, FieldType.STRING); - List fields = new ArrayList() { - { - add(field); - } - }; - schema.setFields(fields); - DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema) - .entryType(EntryType.CREATE) - .queue(fileConfig.getTopic()) - .timestamp(System.currentTimeMillis()) - .putFiled(FileConstants.FILE_LINE_CONTENT, line); - final SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(offsetKey(FileConstants.getPartition(fileConfig.getFilename())), offsetValue(streamOffset)); - records.add(sourceDataEntry); - if (records.size() >= batchSize) { - return records; - } - } - } - while (line != null); - } - } - - if (nread <= 0) { - synchronized (this) { - this.wait(1000); - } - } - - return records; - } catch (IOException e) { - } catch (InterruptedException e) { - log.error("Interrupt error .", e); - } - return null; - } - - private String extractLine() { - int until = -1, newStart = -1; - for (int i = 0; i < offset; i++) { - if (buffer[i] == '\n') { - until = i; - newStart = i + 1; - break; - } else if (buffer[i] == '\r') { - // We need to check for \r\n, so we must skip this if we can't check the next char - if (i + 1 >= offset) - return null; - - until = i; - newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1; - break; - } - } - - if (until != -1) { - String result = new String(buffer, 0, until); - System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart); - offset = offset - newStart; - if (streamOffset != null) - streamOffset += newStart; - return result; - } else { - return null; - } - } - - public void commitRecord(SourceDataEntry sourceDataEntry, SinkDataEntry sinkDataEntry) { - log.info("commit sink queueOffset: {} ", sinkDataEntry.getQueueOffset()); - } - - @Override public void start(KeyValue props) { - fileConfig = new FileConfig(); - fileConfig.load(props); - if (fileConfig.getFilename() == null || fileConfig.getFilename().isEmpty()) { - stream = System.in; - streamOffset = null; - reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8)); - } - } - - @Override public void stop() { - log.trace("Stopping"); - synchronized (this) { - try { - if (stream != null && stream != System.in) { - stream.close(); - log.trace("Closed input stream"); - } - } catch (IOException e) { - log.error("Failed to close FileStreamSourceTask stream: ", e); - } - this.notify(); - } - } - - @Override public void pause() { - - } - - @Override public void resume() { - - } - - private String logFilename() { - return fileConfig.getFilename() == null ? "stdin" : fileConfig.getFilename(); - } - - private ByteBuffer offsetKey(String filename) { - return ByteBuffer.wrap(filename.getBytes(Charset.defaultCharset())); - } - - private ByteBuffer offsetValue(Long pos) { - JSONObject jsonObject = new JSONObject(); - jsonObject.put(FileConstants.NEXT_POSITION, pos); - return ByteBuffer.wrap(jsonObject.toJSONString().getBytes(Charset.defaultCharset())); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.file; + +import com.alibaba.fastjson.JSONObject; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.DataEntryBuilder; +import io.openmessaging.connector.api.data.EntryType; +import io.openmessaging.connector.api.data.Field; +import io.openmessaging.connector.api.data.FieldType; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.SinkDataEntry; +import io.openmessaging.connector.api.data.SourceDataEntry; +import io.openmessaging.connector.api.exception.ConnectException; +import io.openmessaging.connector.api.source.SourceTask; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.rocketmq.connect.file.FileConstants.LINE; + +public class FileSourceTask extends SourceTask { + + private Logger log = LoggerFactory.getLogger(LoggerName.FILE_CONNECTOR); + + private FileConfig fileConfig; + + private InputStream stream; + private BufferedReader reader = null; + private char[] buffer = new char[1024]; + private int offset = 0; + private int batchSize = FileSourceConnector.DEFAULT_TASK_BATCH_SIZE; + + private Long streamOffset; + + @Override public Collection poll() { + log.info("Start a poll stream is null:{}", stream == null); + if (stream == null) { + try { + stream = Files.newInputStream(Paths.get(fileConfig.getFilename())); + ByteBuffer positionInfo; + positionInfo = this.context.positionStorageReader().getPosition(ByteBuffer.wrap(FileConstants.getPartition(fileConfig.getFilename()).getBytes(Charset.defaultCharset()))); + if (positionInfo != null) { + log.info("positionInfo is not null!"); + String positionJson = new String(positionInfo.array(), Charset.defaultCharset()); + JSONObject jsonObject = JSONObject.parseObject(positionJson); + Object lastRecordedOffset = jsonObject.getLong(FileConstants.NEXT_POSITION); + if (lastRecordedOffset != null && !(lastRecordedOffset instanceof Long)) + throw new ConnectException(-1, "Offset position is the incorrect type"); + if (lastRecordedOffset != null) { + log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset); + long skipLeft = (Long) lastRecordedOffset; + while (skipLeft > 0) { + try { + long skipped = stream.skip(skipLeft); + skipLeft -= skipped; + } catch (IOException e) { + log.error("Error while trying to seek to previous offset in file {}: ", fileConfig.getFilename(), e); + throw new ConnectException(-1, e); + } + } + log.debug("Skipped to offset {}", lastRecordedOffset); + } + streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L; + } else { + log.info("positionInfo is null!"); + streamOffset = 0L; + } + reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8)); + log.debug("Opened {} for reading", logFilename()); + } catch (NoSuchFileException e) { + log.warn("Couldn't find file {} for FileStreamSourceTask, sleeping to wait for it to be created", logFilename()); + synchronized (this) { + try { + this.wait(1000); + } catch (InterruptedException e1) { + log.error("Interrupt error .", e1); + } + } + return null; + } catch (IOException e) { + log.error("Error while trying to open file {}: ", fileConfig.getFilename(), e); + throw new ConnectException(-1, e); + } + } + + try { + final BufferedReader readerCopy; + synchronized (this) { + readerCopy = reader; + } + if (readerCopy == null) { + return null; + } + + Collection records = null; + + int nread = 0; + while (readerCopy.ready()) { + nread = readerCopy.read(buffer, offset, buffer.length - offset); + log.trace("Read {} bytes from {}", nread, logFilename()); + + if (nread > 0) { + offset += nread; + if (offset == buffer.length) { + char[] newbuf = new char[buffer.length * 2]; + System.arraycopy(buffer, 0, newbuf, 0, buffer.length); + buffer = newbuf; + } + + String line; + do { + line = extractLine(); + if (line != null) { + log.trace("Read a line from {}", logFilename()); + if (records == null) { + records = new ArrayList<>(); + } + Schema schema = new Schema(); + schema.setDataSource(fileConfig.getFilename()); + schema.setName(fileConfig.getFilename() + LINE); + final Field field = new Field(0, FileConstants.FILE_LINE_CONTENT, FieldType.STRING); + List fields = new ArrayList() { + { + add(field); + } + }; + schema.setFields(fields); + DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema) + .entryType(EntryType.CREATE) + .queue(fileConfig.getTopic()) + .timestamp(System.currentTimeMillis()) + .putFiled(FileConstants.FILE_LINE_CONTENT, line); + final SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(offsetKey(FileConstants.getPartition(fileConfig.getFilename())), offsetValue(streamOffset)); + records.add(sourceDataEntry); + if (records.size() >= batchSize) { + return records; + } + } + } + while (line != null); + } + } + + if (nread <= 0) { + synchronized (this) { + this.wait(1000); + } + } + + return records; + } catch (IOException e) { + } catch (InterruptedException e) { + log.error("Interrupt error .", e); + } + return null; + } + + private String extractLine() { + int until = -1, newStart = -1; + for (int i = 0; i < offset; i++) { + if (buffer[i] == '\n') { + until = i; + newStart = i + 1; + break; + } else if (buffer[i] == '\r') { + // We need to check for \r\n, so we must skip this if we can't check the next char + if (i + 1 >= offset) + return null; + + until = i; + newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1; + break; + } + } + + if (until != -1) { + String result = new String(buffer, 0, until); + System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart); + offset = offset - newStart; + if (streamOffset != null) + streamOffset += newStart; + return result; + } else { + return null; + } + } + + public void commitRecord(SourceDataEntry sourceDataEntry, SinkDataEntry sinkDataEntry) { + log.info("commit sink queueOffset: {} ", sinkDataEntry.getQueueOffset()); + } + + @Override public void start(KeyValue props) { + fileConfig = new FileConfig(); + fileConfig.load(props); + log.info("fileName is:{}", fileConfig.getFilename()); + if (fileConfig.getFilename() == null || fileConfig.getFilename().isEmpty()) { + stream = System.in; + streamOffset = null; + reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8)); + } + } + + @Override public void stop() { + log.trace("Stopping"); + synchronized (this) { + try { + if (stream != null && stream != System.in) { + stream.close(); + log.trace("Closed input stream"); + } + } catch (IOException e) { + log.error("Failed to close FileStreamSourceTask stream: ", e); + } + this.notify(); + } + } + + @Override public void pause() { + + } + + @Override public void resume() { + + } + + private String logFilename() { + return fileConfig.getFilename() == null ? "stdin" : fileConfig.getFilename(); + } + + private ByteBuffer offsetKey(String filename) { + return ByteBuffer.wrap(filename.getBytes(Charset.defaultCharset())); + } + + private ByteBuffer offsetValue(Long pos) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put(FileConstants.NEXT_POSITION, pos); + return ByteBuffer.wrap(jsonObject.toJSONString().getBytes(Charset.defaultCharset())); + } + +}