diff --git a/README.md b/README.md
new file mode 100644
index 000000000..6439269f4
--- /dev/null
+++ b/README.md
@@ -0,0 +1,97 @@
+# rocketmq-connect-cassandra
+
+## rocketmq-connect-cassandra 打包
+```
+mvn clean install -DskipTest -U
+```
+
+## 目前安装会遇到的问题
+
+目前的rocketmq-connect-cassandra 使用的是datastax-java-driver:4.5.0版本的cassandra-driver,由于在打包过程中还有没有解决的问题,该cassandra driver无法读取
+位于driver包中的默认配置文件,因此我们需要手动下载cassandra driver的配置文件[reference.conf](https://github.com/datastax/java-driver/blob/4.5.0/core/src/main/resources/reference.conf) 并将其放置于classpath中。
+
+该问题还仍然在解决的过程中。
+
+## rocketmq-connect-cassandra 启动
+
+* **cassandra-source-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-cassandra-source-connector-name}
+?config={"connector-class":"org.apache.rocketmq.connect.cassandra.connector.JdbcSourceConnector",“dbUrl”:"${source-db-ip}",dbPort”:"${source-db-port}",dbUsername”:"${source-db-username}",dbPassword”:"${source-db-password}","rocketmqTopic":"cassandraTopic","mode":"bulk","whiteDataBase":{"${source-db-name}":{"${source-table-name}":{"${source-column-name}":"${source-column-value}"}}},"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
+```
+
+* **cassandra-sink-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-cassandra-sink-connector-name}
+?config={"connector-class":"org.apache.rocketmq.connect.cassandra.connector.JdbcSinkConnector",“dbUrl”:"${sink-db-ip}",dbPort”:"${sink-db-port}",dbUsername”:"${sink-db-username}",dbPassword”:"${sink-db-password}","rocketmqTopic":"cassandraTopic","mode":"bulk","topicNames":"${sink-topic-name}","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
+```
+>**注:** `rocketmq-cassandra-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
+
+## rocketmq-connect-cassandra 停止
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-cassandra-connector-name}/stop
+```
+
+## rocketmq-connect-cassandra 参数说明
+* **cassandra-source-connector 参数说明**
+
+参数 | 类型 | 是否必须 | 描述 | 样例
+|---|---|---|---|---|
+|dbUrl | String | 是 | source端 DB ip | 192.168.1.2|
+|dbPort | String | 是 | source端 DB port | 3306 |
+|dbUsername | String | 是 | source端 DB 用户名 | root |
+|dbPassword | String | 是 | source端 DB 密码 | 123456 |
+|rocketmqTopic | String | 是 | 待废弃的参数,需和topicNames相同 | jdbc_cassandra |
+|topicNames | String | 是 | rocketmq默认每一个数据源中的表对应一个名字,该名称需和数据库表名称相同 | jdbc_cassandra |
+|whiteDataBase | String | 是 | source端同步数据白名单,嵌套配置,为{DB名:{表名:{字段名:字段值}}},若无指定字段数据同步,字段名可设为NO-FILTER,值为任意 | {"DATABASE_TEST":{"TEST_DATA":{"name":"test"}}} |
+|mode | String | 是 | source-connector 模式,目前仅支持bulk | bulk |
+|localDataCenter | String | 是 | 待废弃 | cassandra 集群的datacenter名称,为必填项 |
+|task-divide-strategy | Integer | 否 | task 分配策略, 默认值为 0,表示按照topic分配任务,每一个table便是一个topic | 0 |
+|task-parallelism | Integer | 否 | task parallelism,默认值为 1,表示将topic拆分为多少个任务进行执行 | 2 |
+|source-cluster | String | 是 | sink 端获取路由信息连接到的RocketMQ nameserver 地址 | 172.17.0.1:10911 |
+|source-rocketmq | String | 是 | sink 端获取路由信息连接到的RocketMQ broker cluster 地址 | 127.0.0.1:9876 |
+|source-record-converter | String | 是 | source data 解析 | org.apache.rocketmq.connect.runtime.converter.JsonConverter |
+
+
+示例配置如下
+```js
+{
+ "connector-class":"org.apache.rocketmq.connect.cassandra.connector.CassandraSourceConnector",
+ "rocketmqTopic":"jdbc_cassandra",
+ "topicNames": "jdbc_cassandra",
+ "dbUrl":"127.0.0.1",
+ "dbPort":"9042",
+ "dbUsername":"cassandra",
+ "dbPassword":"cassandra",
+ "localDataCenter":"datacenter1",
+ "whiteDataBase": {
+ "jdbc":{
+ "jdbc_cassandra": {"NO-FILTER": "10"}
+ }
+ },
+ "mode": "bulk",
+ "task-parallelism": 1,
+ "source-cluster": "172.17.0.1:10911",
+ "source-rocketmq": "127.0.0.1:9876",
+ "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
+ }
+```
+* **cassandra-sink-connector 参数说明**
+
+参数 | 类型 | 是否必须 | 描述 | 样例
+|---|---|---|---|---|
+|dbUrl | String | 是 | sink端 DB ip | 192.168.1.2|
+|dbPort | String | 是 | sink端 DB port | 3306 |
+|dbUsername | String | 是 | sink端 DB 用户名 | root |
+|dbPassword | String | 是 | sink端 DB 密码 | 123456 |
+|topicNames | String | 是 | sink端同步数据的topic名字 | topic-1,topic-2 |
+|mode | String | 是 | source-connector 模式,目前仅支持bulk | bulk |
+|~~rocketmqTopic~~ | String | 是 | 待废弃 | cassandraTopic |
+|task-divide-strategy | Integer | 否 | task 分配策略, 默认值为 0,表示按照topic分配任务,每一个table便是一个topic | 0 |
+|task-parallelism | Integer | 否 | task parallelism,默认值为 1,表示将topic拆分为多少个任务进行执行 | 2 |
+|source-rocketmq | String | 是 | sink 端获取路由信息连接到的RocketMQ nameserver 地址 | 172.17.0.1:10911 |
+|source-cluster | String | 是 | sink 端获取路由信息连接到的RocketMQ broker cluster 地址 | 127.0.0.1:9876 |
+|source-record-converter | String | 是 | source data 解析 | org.apache.rocketmq.connect.runtime.converter.JsonConverter |
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 000000000..286a6efc7
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,276 @@
+
+
+
+
+ 4.0.0
+ org.apache.rocketmq
+ rocketmq-connect-cassandra
+ 0.0.1-SNAPSHOT
+
+ connect-cassandra
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+
+
+
+
+ jira
+ https://issues.apache.org/jira/browse/RocketMQ
+
+
+
+ UTF-8
+ UTF-8
+
+
+ 1.8
+ 1.8
+ 4.5.2
+
+
+
+
+
+ org.codehaus.mojo
+ versions-maven-plugin
+ 2.3
+
+
+ org.codehaus.mojo
+ clirr-maven-plugin
+ 2.7
+
+
+ maven-dependency-plugin
+
+ ${project.build.directory}/lib
+ false
+ true
+
+
+
+ maven-compiler-plugin
+ 3.6.1
+
+
+ ${maven.compiler.target}
+ ${maven.compiler.source}
+ true
+ true
+
+
+
+ maven-surefire-plugin
+ 2.19.1
+
+ -Xms512m -Xmx1024m
+ always
+
+ **/*Test.java
+
+
+
+
+ maven-site-plugin
+ 3.6
+
+ en_US
+ UTF-8
+ UTF-8
+
+
+
+ maven-source-plugin
+ 3.0.1
+
+
+ attach-sources
+
+ jar
+
+
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+ 0.12
+
+
+ README.md
+
+
+
+
+ maven-javadoc-plugin
+ 2.10.4
+
+ UTF-8
+ en_US
+ io.openmessaging.internal
+
+
+
+ aggregate
+
+ aggregate
+
+ site
+
+
+
+
+ maven-resources-plugin
+ 3.0.2
+
+ ${project.build.sourceEncoding}
+
+
+
+ org.codehaus.mojo
+ findbugs-maven-plugin
+ 3.0.4
+
+
+ maven-assembly-plugin
+ 3.0.0
+
+
+
+
+ org.apache.rocketmq.connect.cassandra.connector.CassandraSinkConnector
+
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
+
+
+ junit
+ junit
+ 4.11
+ test
+
+
+ org.assertj
+ assertj-core
+ 2.6.0
+ test
+
+
+ org.mockito
+ mockito-core
+ 2.6.3
+ test
+
+
+ commons-codec
+ commons-codec
+ 1.12
+
+
+ io.openmessaging
+ openmessaging-connector
+ 0.1.1
+ provided
+
+
+ io.openmessaging
+ openmessaging-api
+ 0.3.1-alpha
+
+
+ com.alibaba
+ fastjson
+ 1.2.60
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.7
+
+
+ ch.qos.logback
+ logback-classic
+ 1.0.13
+
+
+ ch.qos.logback
+ logback-core
+ 1.0.13
+
+
+ org.apache.rocketmq
+ rocketmq-client
+ ${rocketmq.version}
+
+
+ org.apache.rocketmq
+ rocketmq-tools
+ ${rocketmq.version}
+
+
+ org.apache.rocketmq
+ rocketmq-remoting
+ ${rocketmq.version}
+
+
+ org.apache.rocketmq
+ rocketmq-openmessaging
+ 4.3.2
+
+
+ commons-cli
+ commons-cli
+ 1.2
+
+
+ io.javalin
+ javalin
+ 1.3.0
+
+
+ com.alibaba
+ druid
+ 1.0.31
+
+
+ com.datastax.oss
+ java-driver-core-shaded
+ 4.5.1
+
+
+ com.datastax.oss
+ java-driver-query-builder
+ 4.5.1
+
+
+
+
diff --git a/scripts/gen_data.py b/scripts/gen_data.py
new file mode 100644
index 000000000..8fde504ee
--- /dev/null
+++ b/scripts/gen_data.py
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from sqlalchemy import *
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy import Column, String
+from sqlalchemy.orm import sessionmaker
+import argparse
+import random
+import string
+import datetime
+import time
+import uuid
+
+
+
+Base = declarative_base()
+class JdbcCassandra(Base):
+ __tablename__ = 'jdbc_cassandra'
+ int_type = Column(INT, primary_key=True, autoincrement=True)
+ ascii_type = Column(VARCHAR(50))
+ boolean_type = Column(BOOLEAN)
+ date_type = Column(DATE)
+ decimal_type = Column(DECIMAL)
+ double_type = Column(FLOAT)
+ float_type = Column(FLOAT)
+ inet_type = Column(VARCHAR(50))
+ smallint_type = Column(SMALLINT)
+ time_type = Column(TIME)
+ text_type = Column(TEXT)
+ timestamp_type = Column(TIMESTAMP)
+ #timeuuid_type = Column(CHAR(36))
+ tinyint_type = Column(SMALLINT)
+ #uuid_type = Column(CHAR(36))
+ varchar_type = Column(VARCHAR(50))
+ varint_type = Column(BIGINT)
+
+
+
+def random_string(strlen=8):
+ letters = string.ascii_lowercase
+ return bytes(''.join(random.choice(letters) for i in range(strlen)), 'utf8')
+
+
+def str_time_prop(start, end, format, prop):
+ """Get a time at a proportion of a range of two formatted times.
+
+ start and end should be strings specifying times formated in the
+ given format (strftime-style), giving an interval [start, end].
+ prop specifies how a proportion of the interval to be taken after
+ start. The returned time will be in the specified format.
+ """
+
+ stime = time.mktime(time.strptime(start, format))
+ etime = time.mktime(time.strptime(end, format))
+
+ ptime = stime + prop * (etime - stime)
+
+ return time.strftime(format, time.localtime(ptime))
+
+
+def random_date(start, end, prop):
+ return bytes(str_time_prop(start, end, '%m/%d/%Y %I:%M %p', prop), 'utf8')
+
+def random_time(start, end, prop):
+ return bytes(str_time_prop(start, end, '%Y-%m-%d %H:%M:%S', prop), 'utf8')
+
+def format_time():
+ t = datetime.datetime.now()
+ s = t.strftime('%Y-%m-%d %H:%M:%S.%f')
+ return s[:-3]
+
+def main():
+
+ # define parser
+ parser = argparse.ArgumentParser()
+ parser.add_argument('hostname', metavar='HOSTNAME', type=str,
+ help='hostname of target mysql database ')
+ parser.add_argument('port', metavar='PORT', type=str,
+ help='port of target mysql database')
+ parser.add_argument('username', metavar='USERNAME', type=str,
+ help='username of the user connecting to mysql')
+ parser.add_argument('password', metavar='PASSWORD', type=str,
+ help='password of specified user')
+ parser.add_argument('database', metavar='DATABASE', type=str,
+ help='which database to connect to')
+ parser.add_argument('count', metavar='COUNT', type=int,
+ help='how many random records to insert into the database')
+ args = parser.parse_args()
+
+
+ # get variabless from command line
+ hostname = args.hostname
+ port = args.port
+ username = args.username
+ password = args.password
+ database = args.database
+ count = args.count
+
+
+ # create db connection
+ # print("----------------ERROR-------------")
+ # print("mysql+pymysql://{}:{}@{}:{}/{}".format(username, password, hostname, port, database))
+ engine = create_engine("mysql+pymysql://{}:{}@{}:{}/{}".format(username, password, hostname, port, database), echo=True)
+
+
+ # create table if not exist
+ Base.metadata.create_all(engine)
+
+ # create a session
+ Session = sessionmaker(bind=engine)
+ session = Session()
+
+ for i in range(0, count):
+ random_record = JdbcCassandra(
+ ascii_type = random_string(30),
+ boolean_type = (i % 2 == 0),
+ date_type = datetime.datetime.now().date(),
+ decimal_type = 10.5,
+ double_type = 1.5,
+ float_type = 8.3,
+ inet_type = "127.0.0.1",
+ smallint_type = 1,
+ time_type = datetime.datetime.now().time(),
+ text_type = random_string(30),
+ timestamp_type = datetime.datetime.now(),
+ #timeuuid_type = str(uuid.uuid1()),
+ tinyint_type = 1,
+ #uuid_type = str(uuid.uuid1()),
+ varchar_type = random_string(30),
+ varint_type = random.getrandbits(63),
+ )
+ session.add(random_record)
+
+ session.commit()
+
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/scripts/requirements.txt b/scripts/requirements.txt
new file mode 100644
index 000000000..fc7dc318c
--- /dev/null
+++ b/scripts/requirements.txt
@@ -0,0 +1,2 @@
+SQLAlchemy==1.3.16
+PyMySQL==0.9.3
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/CloneUtils.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/CloneUtils.java
new file mode 100644
index 000000000..c8607506e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/CloneUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.common;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+public class CloneUtils {
+ @SuppressWarnings("unchecked")
+ public static T clone(T obj) {
+ T clonedObj = null;
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(obj);
+ oos.close();
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ ObjectInputStream ois = new ObjectInputStream(bais);
+ clonedObj = (T) ois.readObject();
+ ois.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return clonedObj;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/ConstDefine.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/ConstDefine.java
new file mode 100644
index 000000000..462add2b1
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/ConstDefine.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.common;
+
+public class ConstDefine {
+
+ public static String CASSANDRA_CONNECTOR_ADMIN_PREFIX = "CASSANDRA-CONNECTOR-ADMIN";
+ public static final String PREFIX = "cassandra";
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/DBUtils.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/DBUtils.java
new file mode 100644
index 000000000..bd58eea27
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/DBUtils.java
@@ -0,0 +1,91 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.common;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.CqlSessionBuilder;
+import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
+import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.internal.core.auth.PlainTextAuthProvider;
+import io.netty.util.concurrent.SingleThreadEventExecutor;
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.connector.CassandraSinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.Date;
+
+public class DBUtils {
+
+ private static final Logger log = LoggerFactory.getLogger(CassandraSinkTask.class);
+
+ public static CqlSession initCqlSession(Config config) throws Exception {
+ log.info("Trying to init Cql Session ");
+ Map map = new HashMap<>();
+
+ String dbUrl = config.getDbUrl();
+ String dbPort = config.getDbPort();
+ String localDataCenter = config.getLocalDataCenter();
+ String username = config.getDbUsername();
+ String password = config.getDbPassword();
+
+// sessionBuilder.addContactPoint(new InetSocketAddress(dbUrl, Integer.parseInt(dbPort)))
+// .withAuthCredentials(username, password);
+
+
+ log.info("Cassandra dbUrl: {}", dbUrl);
+ log.info("Cassandra dbPort: {}", dbPort);
+ log.info("Cassandra datacenter: {}", localDataCenter);
+ log.info("Cassandra username: {}", username);
+ log.info("Cassandra password: {}", password);
+
+ CqlSession cqlSession = null;
+ log.info("Using Program Config Loader");
+ try {
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ Future handle = executorService.submit(new Callable() {
+ @Override
+ public CqlSession call() {
+ return CqlSession.builder()
+ .addContactPoint(new InetSocketAddress(dbUrl, Integer.valueOf(dbPort)))
+ .withLocalDatacenter(localDataCenter)
+ .build();
+ }
+ });
+
+ cqlSession = handle.get();
+
+ } catch (Exception e) {
+ log.info("error when creating cqlSession {}", e.getMessage());
+ e.printStackTrace();
+ }
+ log.info("init Cql Session success");
+
+ return cqlSession;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/DataType.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/DataType.java
new file mode 100644
index 000000000..d6f814fef
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/DataType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.common;
+
+public enum DataType {
+
+ COMMON_MESSAGE,
+ TOPIC_CONFIG,
+ BROKER_CONFIG,
+ SUB_CONFIG,
+ OFFSET
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/Utils.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/Utils.java
new file mode 100644
index 000000000..0911e209c
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/Utils.java
@@ -0,0 +1,76 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.common;
+
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class Utils {
+ private static final Logger log = LoggerFactory.getLogger(Utils.class);
+
+ public static String createGroupName(String prefix) {
+ return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+ }
+
+ public static String createGroupName(String prefix, String postfix) {
+ return new StringBuilder().append(prefix).append("-").append(postfix).toString();
+ }
+
+ public static String createTaskId(String prefix) {
+ return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+ }
+
+ public static String createInstanceName(String namesrvAddr) {
+ String[] namesrvArray = namesrvAddr.split(";");
+ List namesrvList = new ArrayList<>();
+ for (String ns : namesrvArray) {
+ if (!namesrvList.contains(ns)) {
+ namesrvList.add(ns);
+ }
+ }
+ Collections.sort(namesrvList);
+ return String.valueOf(namesrvList.toString().hashCode());
+ }
+
+ public static List examineBrokerData(DefaultMQAdminExt defaultMQAdminExt, String topic,
+ String cluster) throws RemotingException, MQClientException, InterruptedException {
+ List brokerList = new ArrayList<>();
+
+ TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ if (topicRouteData.getBrokerDatas() != null) {
+ for (BrokerData broker : topicRouteData.getBrokerDatas()) {
+ if (StringUtils.equals(broker.getCluster(), cluster)) {
+ brokerList.add(broker);
+ }
+ }
+ }
+ return brokerList;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/Config.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/Config.java
new file mode 100644
index 000000000..b9b115ea3
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/Config.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import com.alibaba.fastjson.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class Config {
+ private static final Logger LOG = LoggerFactory.getLogger(Config.class);
+
+ /* Database Connection Config */
+ private String dbUrl;
+ private String dbPort;
+ private String localDataCenter;
+ private String dbUsername;
+ private String dbPassword;
+ private String dataType;
+ private String rocketmqTopic;
+
+ private List tableWhitelist;
+ private List tableBlacklist;
+ private String whiteDataBase;
+ private String whiteTable;
+
+
+ public static final String CONN_TASK_PARALLELISM = "task-parallelism";
+ public static final String CONN_TASK_DIVIDE_STRATEGY = "task-divide-strategy";
+ public static final String CONN_WHITE_LIST = "whiteDataBase";
+ public static final String CONN_SOURCE_RECORD_CONVERTER = "source-record-converter";
+ public static final String CONN_DB_IP = "dbUrl";
+ public static final String CONN_DB_PORT = "dbPort";
+ public static final String CONN_DB_USERNAME = "dbUsername";
+ public static final String CONN_DB_PASSWORD = "dbPassword";
+ public static final String CONN_DB_DATACENTER = "localDataCenter";
+ public static final String CONN_DATA_TYPE = "dataType";
+ public static final String CONN_TOPIC_NAMES = "topicNames";
+ public static final String CONN_DB_MODE = "mode";
+
+ public static final String CONN_SOURCE_RMQ = "source-rocketmq";
+ public static final String CONN_SOURCE_CLUSTER = "source-cluster";
+ public static final String REFRESH_INTERVAL = "refresh.interval";
+
+ /* Mode Config */
+ private String mode = "";
+ private String incrementingColumnName = "";
+ private String query = "";
+ private String timestampColmnName = "";
+ private boolean validateNonNull = true;
+
+ /*Connector config*/
+ private String tableTypes = "table";
+ private long pollInterval = 5000;
+ private int batchMaxRows = 100;
+ private long tablePollInterval = 60000;
+ private long timestampDelayInterval = 0;
+ private String dbTimezone = "GMT+8";
+ private String queueName;
+
+ private Logger log = LoggerFactory.getLogger(Config.class);
+ public static final Set REQUEST_CONFIG = new HashSet() {
+ {
+ add("dbUrl");
+ add("dbPort");
+ add("localDataCenter");
+ add("dbUsername");
+ add("dbPassword");
+ add("mode");
+ add("rocketmqTopic");
+ }
+ };
+
+
+
+ public static Logger getLOG() {
+ return LOG;
+ }
+
+ public String getDbUrl() { return dbUrl; }
+
+ public void setDbUrl(String dbUrl) { this.dbUrl = dbUrl; }
+
+ public String getDbPort() { return dbPort; }
+
+ public void setDbPort(String dbPort) { this.dbPort = dbPort; }
+
+ public String getLocalDataCenter() {
+ return localDataCenter;
+ }
+
+ public void setLocalDataCenter(String localDataCenter) {
+ this.localDataCenter = localDataCenter;
+ }
+
+ public String getDbUsername() {
+ return dbUsername;
+ }
+
+ public void setDbUsername(String dbUsername) {
+ this.dbUsername = dbUsername;
+ }
+
+ public String getDbPassword() {
+ return dbPassword;
+ }
+
+ public void setDbPassword(String dbPassword) {
+ this.dbPassword = dbPassword;
+ }
+
+ public String getDataType() {
+ return dataType;
+ }
+
+ public void setDataType(String dataType) {
+ this.dataType = dataType;
+ }
+
+ public String getRocketmqTopic() {
+ return rocketmqTopic;
+ }
+
+ public void setRocketmqTopic(String rocketmqTopic) {
+ this.rocketmqTopic = rocketmqTopic;
+ }
+
+ public String getMode() {
+ return mode;
+ }
+
+ public void setMode(String mode) {
+ this.mode = mode;
+ }
+
+ public String getIncrementingColumnName() {
+ return incrementingColumnName;
+ }
+
+ public void setIncrementingColumnName(String incrementingColumnName) {
+ this.incrementingColumnName = incrementingColumnName;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ public void setQuery(String query) {
+ this.query = query;
+ }
+
+ public String getTimestampColmnName() {
+ return timestampColmnName;
+ }
+
+ public void setTimestampColmnName(String timestampColmnName) {
+ this.timestampColmnName = timestampColmnName;
+ }
+
+ public boolean isValidateNonNull() {
+ return validateNonNull;
+ }
+
+ public void setValidateNonNull(boolean validateNonNull) {
+ this.validateNonNull = validateNonNull;
+ }
+
+ public String getTableTypes() {
+ return tableTypes;
+ }
+
+ public void setTableTypes(String tableTypes) {
+ this.tableTypes = tableTypes;
+ }
+
+ public long getPollInterval() {
+ return pollInterval;
+ }
+
+ public void setPollInterval(long pollInterval) {
+ this.pollInterval = pollInterval;
+ }
+
+ public int getBatchMaxRows() {
+ return batchMaxRows;
+ }
+
+ public void setBatchMaxRows(int batchMaxRows) {
+ this.batchMaxRows = batchMaxRows;
+ }
+
+ public long getTablePollInterval() {
+ return tablePollInterval;
+ }
+
+ public void setTablePollInterval(long tablePollInterval) {
+ this.tablePollInterval = tablePollInterval;
+ }
+
+ public long getTimestampDelayInterval() {
+ return timestampDelayInterval;
+ }
+
+ public void setTimestampDelayInterval(long timestampDelayInterval) {
+ this.timestampDelayInterval = timestampDelayInterval;
+ }
+
+ public String getDbTimezone() {
+ return dbTimezone;
+ }
+
+ public void setDbTimezone(String dbTimezone) {
+ this.dbTimezone = dbTimezone;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ }
+
+ public Logger getLog() {
+ return log;
+ }
+
+ public void setLog(Logger log) {
+ this.log = log;
+ }
+
+ public List getTableWhitelist() {
+ return tableWhitelist;
+ }
+
+ public void setTableWhitelist(List tableWhitelist) {
+ this.tableWhitelist = tableWhitelist;
+ }
+
+ public List getTableBlacklist() {
+ return tableBlacklist;
+ }
+
+ public void setTableBlacklist(List tableBlacklist) {
+ this.tableBlacklist = tableBlacklist;
+ }
+
+ public String getWhiteDataBase() {
+ return whiteDataBase;
+ }
+
+ public void setWhiteDataBase(String whiteDataBase) {
+ this.whiteDataBase = whiteDataBase;
+ }
+
+ public String getWhiteTable() {
+ return whiteTable;
+ }
+
+ public void setWhiteTable(String whiteTable) {
+ this.whiteTable = whiteTable;
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/ConfigUtil.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/ConfigUtil.java
new file mode 100644
index 000000000..1c08fb2c7
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/ConfigUtil.java
@@ -0,0 +1,70 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+
+public class ConfigUtil {
+ public static void load(KeyValue props, Object object) {
+
+ properties2Object(props, object);
+ }
+
+ private static void properties2Object(final KeyValue p, final Object object) {
+
+ Method[] methods = object.getClass().getMethods();
+ for (Method method : methods) {
+ String mn = method.getName();
+ if (mn.startsWith("set")) {
+ try {
+ String tmp = mn.substring(4);
+ String first = mn.substring(3, 4);
+
+ String key = first.toLowerCase() + tmp;
+ String property = p.getString(key);
+ if (property != null) {
+ Class>[] pt = method.getParameterTypes();
+ if (pt != null && pt.length > 0) {
+ String cn = pt[0].getSimpleName();
+ Object arg;
+ if (cn.equals("int") || cn.equals("Integer")) {
+ arg = Integer.parseInt(property);
+ } else if (cn.equals("long") || cn.equals("Long")) {
+ arg = Long.parseLong(property);
+ } else if (cn.equals("double") || cn.equals("Double")) {
+ arg = Double.parseDouble(property);
+ } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+ arg = Boolean.parseBoolean(property);
+ } else if (cn.equals("float") || cn.equals("Float")) {
+ arg = Float.parseFloat(property);
+ } else if (cn.equals("String")) {
+ arg = property;
+ } else {
+ continue;
+ }
+ method.invoke(object, arg);
+ }
+ }
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/DbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/DbConnectorConfig.java
new file mode 100644
index 000000000..3dd25c06f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/DbConnectorConfig.java
@@ -0,0 +1,110 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.cassandra.strategy.TaskDivideStrategy;
+
+public abstract class DbConnectorConfig {
+ public TaskDivideStrategy taskDivideStrategy;
+ public String dbUrl;
+ public String dbPort;
+ public String dbUserName;
+ public String dbPassword;
+ public String localDataCenter;
+ public String converter;
+ public int taskParallelism;
+ public String mode;
+
+ public abstract void validate(KeyValue config);
+
+ public abstract T getWhiteTopics();
+
+ public TaskDivideStrategy getTaskDivideStrategy() {
+ return taskDivideStrategy;
+ }
+
+ public void setTaskDivideStrategy(TaskDivideStrategy taskDivideStrategy) {
+ this.taskDivideStrategy = taskDivideStrategy;
+ }
+
+ public String getDbUrl() {
+ return dbUrl;
+ }
+
+ public void setDbUrl(String dbUrl) {
+ this.dbUrl = dbUrl;
+ }
+
+ public String getDbPort() {
+ return dbPort;
+ }
+
+ public void setDbPort(String dbPort) {
+ this.dbPort = dbPort;
+ }
+
+ public String getDbUserName() {
+ return dbUserName;
+ }
+
+ public void setDbUserName(String dbUserName) {
+ this.dbUserName = dbUserName;
+ }
+
+ public String getDbPassword() {
+ return dbPassword;
+ }
+
+ public void setDbPassword(String dbPassword) {
+ this.dbPassword = dbPassword;
+ }
+
+ public String getLocalDataCenter() {
+ return localDataCenter;
+ }
+
+ public void setLocalDataCenter(String localDataCenter) {
+ this.localDataCenter = localDataCenter;
+ }
+
+ public String getConverter() {
+ return converter;
+ }
+
+ public void setConverter(String converter) {
+ this.converter = converter;
+ }
+
+ public int getTaskParallelism() {
+ return taskParallelism;
+ }
+
+ public void setTaskParallelism(int taskParallelism) {
+ this.taskParallelism = taskParallelism;
+ }
+
+ public String getMode() {
+ return mode;
+ }
+
+ public void setMode(String mode) {
+ this.mode = mode;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/SinkDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/SinkDbConnectorConfig.java
new file mode 100644
index 000000000..31450331d
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/SinkDbConnectorConfig.java
@@ -0,0 +1,112 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.cassandra.strategy.DivideStrategyEnum;
+import org.apache.rocketmq.connect.cassandra.strategy.DivideTaskByTopic;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+public class SinkDbConnectorConfig extends DbConnectorConfig {
+ private Set whiteList;
+ private String srcNamesrvs;
+ private String srcCluster;
+ private long refreshInterval;
+ private Map> topicRouteMap;
+
+ public SinkDbConnectorConfig(){
+ }
+
+ @Override
+ public void validate(KeyValue config) {
+ this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 1);
+
+ int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_TOPIC.ordinal());
+
+ this.taskDivideStrategy = new DivideTaskByTopic();
+
+ buildWhiteList(config);
+
+ this.converter = config.getString(Config.CONN_SOURCE_RECORD_CONVERTER);
+ this.dbUrl = config.getString(Config.CONN_DB_IP);
+ this.dbPort = config.getString(Config.CONN_DB_PORT);
+ this.dbUserName = config.getString(Config.CONN_DB_USERNAME);
+ this.dbPassword = config.getString(Config.CONN_DB_PASSWORD);
+ this.localDataCenter = config.getString(Config.CONN_DB_DATACENTER);
+ this.srcNamesrvs = config.getString(Config.CONN_SOURCE_RMQ);
+ this.srcCluster = config.getString(Config.CONN_SOURCE_CLUSTER);
+ this.refreshInterval = config.getLong(Config.REFRESH_INTERVAL, 3);
+ this.mode = config.getString(Config.CONN_DB_MODE, "bulk");
+
+ }
+
+ private void buildWhiteList(KeyValue config) {
+ this.whiteList = new HashSet<>();
+ String whiteListStr = config.getString(Config.CONN_TOPIC_NAMES, "");
+ String[] wl = whiteListStr.trim().split(",");
+ if (wl.length <= 0)
+ throw new IllegalArgumentException("White list must be not empty.");
+ else {
+ this.whiteList.clear();
+ for (String t : wl) {
+ this.whiteList.add(t.trim());
+ }
+ }
+ }
+
+
+ public Set getWhiteList() {
+ return whiteList;
+ }
+
+ public void setWhiteList(Set whiteList) {
+ this.whiteList = whiteList;
+ }
+
+ public String getSrcNamesrvs() {
+ return this.srcNamesrvs;
+ }
+
+ public String getSrcCluster() {
+ return this.srcCluster;
+ }
+
+ public long getRefreshInterval() {
+ return this.refreshInterval;
+ }
+
+ public Map> getTopicRouteMap() {
+ return topicRouteMap;
+ }
+
+ public void setTopicRouteMap(Map> topicRouteMap) {
+ this.topicRouteMap = topicRouteMap;
+ }
+
+ @Override
+ public Set getWhiteTopics() {
+ return getWhiteList();
+ }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/SourceDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/SourceDbConnectorConfig.java
new file mode 100644
index 000000000..6a3f68518
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/SourceDbConnectorConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.cassandra.strategy.DivideStrategyEnum;
+import org.apache.rocketmq.connect.cassandra.strategy.DivideTaskByTopic;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SourceDbConnectorConfig extends DbConnectorConfig{
+
+ private Map whiteMap;
+
+ public SourceDbConnectorConfig(){
+ }
+
+ @Override
+ public void validate(KeyValue config) {
+ this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 1);
+
+ int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_TOPIC.ordinal());
+
+ this.taskDivideStrategy = new DivideTaskByTopic();
+
+ buildWhiteMap(config);
+
+ this.converter = config.getString(Config.CONN_SOURCE_RECORD_CONVERTER);
+ this.dbUrl = config.getString(Config.CONN_DB_IP);
+ this.dbPort = config.getString(Config.CONN_DB_PORT);
+ this.dbUserName = config.getString(Config.CONN_DB_USERNAME);
+ this.dbPassword = config.getString(Config.CONN_DB_PASSWORD);
+ this.localDataCenter = config.getString(Config.CONN_DB_DATACENTER);
+ this.mode = config.getString(Config.CONN_DB_MODE, "bulk");
+
+ }
+
+ private void buildWhiteMap(KeyValue config) {
+ this.whiteMap = new HashMap<>(16);
+ String whiteListStr = config.getString(Config.CONN_WHITE_LIST, "");
+ JSONObject whiteDataBaseObject = JSONObject.parseObject(whiteListStr);
+ if(whiteDataBaseObject.keySet().size() <= 0){
+ throw new IllegalArgumentException("white data base must be not empty.");
+ }else {
+ this.whiteMap.clear();
+ for (String dbName : whiteDataBaseObject.keySet()){
+ JSONObject whiteTableObject = (JSONObject) whiteDataBaseObject.get(dbName);
+ for (String tableName : whiteTableObject.keySet()){
+ String dbTableKey = dbName + "-" + tableName;
+ this.whiteMap.put(dbTableKey, whiteTableObject.getString(tableName));
+ }
+ }
+ }
+ }
+
+
+ public Map getWhiteMap() {
+ return whiteMap;
+ }
+
+ public void setWhiteMap(Map whiteMap) {
+ this.whiteMap = whiteMap;
+ }
+
+ @Override
+ public Map getWhiteTopics() {
+ return getWhiteMap();
+ }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskDivideConfig.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskDivideConfig.java
new file mode 100644
index 000000000..7c43137cb
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskDivideConfig.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.config;
+
+public class TaskDivideConfig {
+
+ private String dbUrl;
+
+ private String dbPort;
+
+ private String dbUserName;
+
+ private String dbPassword;
+
+ private String localDataCenter;
+
+ private String srcRecordConverter;
+
+ private int dataType;
+
+ private int taskParallelism;
+
+ private String mode;
+
+ public TaskDivideConfig(String dbUrl, String dbPort, String dbUserName, String dbPassword, String localDataCenter,
+ String srcRecordConverter, int dataType, int taskParallelism, String mode) {
+ this.dbUrl = dbUrl;
+ this.dbPort = dbPort;
+ this.dbUserName = dbUserName;
+ this.dbPassword = dbPassword;
+ this.localDataCenter = localDataCenter;
+ this.srcRecordConverter = srcRecordConverter;
+ this.dataType = dataType;
+ this.taskParallelism = taskParallelism;
+ this.mode = mode;
+ }
+
+ public String getDbUrl() {
+ return dbUrl;
+ }
+
+ public void setDbUrl(String dbUrl) {
+ this.dbUrl = dbUrl;
+ }
+
+ public String getDbPort() {
+ return dbPort;
+ }
+
+ public void setDbPort(String dbPort) {
+ this.dbPort = dbPort;
+ }
+
+ public String getDbUserName() {
+ return dbUserName;
+ }
+
+ public void setDbUserName(String dbUserName) {
+ this.dbUserName = dbUserName;
+ }
+
+ public String getDbPassword() {
+ return dbPassword;
+ }
+
+ public void setDbPassword(String dbPassword) {
+ this.dbPassword = dbPassword;
+ }
+
+ public String getLocalDataCenter() {
+ return localDataCenter;
+ }
+
+ public void setLocalDataCenter(String localDataCenter) {
+ this.localDataCenter = localDataCenter;
+ }
+
+ public String getSrcRecordConverter() {
+ return srcRecordConverter;
+ }
+
+ public void setSrcRecordConverter(String srcRecordConverter) {
+ this.srcRecordConverter = srcRecordConverter;
+ }
+
+ public int getDataType() {
+ return dataType;
+ }
+
+ public void setDataType(int dataType) {
+ this.dataType = dataType;
+ }
+
+ public int getTaskParallelism() {
+ return taskParallelism;
+ }
+
+ public void setTaskParallelism(int taskParallelism) {
+ this.taskParallelism = taskParallelism;
+ }
+
+ public String getMode() {
+ return mode;
+ }
+
+ public void setMode(String mode) {
+ this.mode = mode;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskTopicInfo.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskTopicInfo.java
new file mode 100644
index 000000000..074faabca
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskTopicInfo.java
@@ -0,0 +1,40 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class TaskTopicInfo extends MessageQueue {
+
+ private String targetTopic;
+
+ public TaskTopicInfo(String sourceTopic, String brokerName, int queueId, String targetTopic) {
+ super(sourceTopic, brokerName, queueId);
+ this.targetTopic = targetTopic;
+ }
+
+ public String getTargetTopic() {
+ return this.targetTopic;
+ }
+
+ public void setTargetTopic(String targetTopic) {
+ this.targetTopic = targetTopic;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkConnector.java b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkConnector.java
new file mode 100644
index 000000000..6ce23f64d
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkConnector.java
@@ -0,0 +1,240 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.connector;
+
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.sink.SinkConnector;
+import java.util.concurrent.ScheduledFuture;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.connect.cassandra.common.CloneUtils;
+import org.apache.rocketmq.connect.cassandra.common.ConstDefine;
+import org.apache.rocketmq.connect.cassandra.common.DataType;
+import org.apache.rocketmq.connect.cassandra.common.Utils;
+import org.apache.rocketmq.connect.cassandra.config.*;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class CassandraSinkConnector extends SinkConnector{
+ private static final Logger log = LoggerFactory.getLogger(CassandraSinkConnector.class);
+ private DbConnectorConfig dbConnectorConfig;
+ private volatile boolean configValid = false;
+ private ScheduledExecutorService executor;
+ private HashMap> topicRouteMap;
+
+ private DefaultMQAdminExt srcMQAdminExt;
+
+ private volatile boolean adminStarted;
+
+ private ScheduledFuture> listenerHandle;
+
+ public CassandraSinkConnector() {
+ topicRouteMap = new HashMap<>();
+ dbConnectorConfig = new SinkDbConnectorConfig();
+ executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("CassandraSinkConnector-SinkWatcher-%d").daemon(true).build());
+ }
+
+ private synchronized void startMQAdminTools() {
+ if (!configValid || adminStarted) {
+ return;
+ }
+ RPCHook rpcHook = null;
+ this.srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ this.srcMQAdminExt.setNamesrvAddr(((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcNamesrvs());
+ this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.CASSANDRA_CONNECTOR_ADMIN_PREFIX));
+ this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcNamesrvs()));
+
+ try {
+ log.info("Trying to start srcMQAdminExt");
+ this.srcMQAdminExt.start();
+ log.info("RocketMQ srcMQAdminExt started");
+
+ } catch (MQClientException e) {
+ log.error("Cassandra Sink Task start failed for `srcMQAdminExt` exception.", e);
+ }
+
+ adminStarted = true;
+ }
+
+ @Override
+ public String verifyAndSetConfig(KeyValue config) {
+ for (String requestKey : Config.REQUEST_CONFIG) {
+ if (!config.containsKey(requestKey)) {
+ return "Request config key: " + requestKey;
+ }
+ }
+ try {
+ this.dbConnectorConfig.validate(config);
+ } catch (IllegalArgumentException e) {
+ return e.getMessage();
+ }
+ this.configValid = true;
+
+ return "";
+ }
+
+ @Override
+ public void start() {
+ startMQAdminTools();
+ startListener();
+ }
+
+ public void startListener() {
+ listenerHandle = executor.scheduleAtFixedRate(new Runnable() {
+ boolean first = true;
+ HashMap> origin = null;
+
+ @Override
+ public void run() {
+ buildRoute();
+ if (first) {
+ origin = CloneUtils.clone(topicRouteMap);
+ first = false;
+ }
+ if (!compare(origin, topicRouteMap)) {
+ context.requestTaskReconfiguration();
+ origin = CloneUtils.clone(topicRouteMap);
+ }
+ }
+ }, ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), TimeUnit.SECONDS);
+ }
+
+ public boolean compare(Map> origin, Map> updated) {
+ if (origin.size() != updated.size()) {
+ return false;
+ }
+ for (Map.Entry> entry : origin.entrySet()) {
+ if (!updated.containsKey(entry.getKey())) {
+ return false;
+ }
+ Set originTasks = entry.getValue();
+ Set updateTasks = updated.get(entry.getKey());
+ if (originTasks.size() != updateTasks.size()) {
+ return false;
+ }
+
+ if (!originTasks.containsAll(updateTasks)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public void buildRoute() {
+ String srcCluster = ((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcCluster();
+ try {
+ for (String topic : ((SinkDbConnectorConfig) this.dbConnectorConfig).getWhiteList()) {
+
+ // different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster.
+ // QueueData use brokerName as unique info on cluster of rocketmq. so when we want to get QueueData of
+ // expected cluster, we should get brokerNames of expected cluster, and then filter queueDatas.
+ List brokerList = Utils.examineBrokerData(this.srcMQAdminExt, topic, srcCluster);
+ Set brokerNameSet = new HashSet();
+ for (BrokerData b : brokerList) {
+ brokerNameSet.add(b.getBrokerName());
+ }
+
+ TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
+ if (!topicRouteMap.containsKey(topic)) {
+ topicRouteMap.put(topic, new HashSet<>(16));
+ }
+ for (QueueData qd : topicRouteData.getQueueDatas()) {
+ if (brokerNameSet.contains(qd.getBrokerName())) {
+ for (int i = 0; i < qd.getReadQueueNums(); i++) {
+ TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), i, null);
+ topicRouteMap.get(topic).add(taskTopicInfo);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("Fetch topic list error.", e);
+ } finally {
+ // srcMQAdminExt.shutdown();
+ }
+ }
+
+
+ /**
+ * We need to reason why we don't call srcMQAdminExt.shutdown() here, and why
+ * it can be applied to srcMQAdminExt
+ */
+ @Override
+ public void stop() {
+ listenerHandle.cancel(true);
+ // srcMQAdminExt.shutdown();
+ }
+
+ @Override
+ public void pause() {
+
+ }
+
+ @Override
+ public void resume() {
+
+ }
+
+ @Override
+ public Class extends Task> taskClass() {
+ return CassandraSinkTask.class;
+ }
+
+ @Override
+ public List taskConfigs() {
+ log.info("List.start");
+ if (!configValid) {
+ return new ArrayList();
+ }
+
+ startMQAdminTools();
+
+ buildRoute();
+
+ TaskDivideConfig tdc = new TaskDivideConfig(
+ this.dbConnectorConfig.getDbUrl(),
+ this.dbConnectorConfig.getDbPort(),
+ this.dbConnectorConfig.getDbUserName(),
+ this.dbConnectorConfig.getDbPassword(),
+ this.dbConnectorConfig.getLocalDataCenter(),
+ this.dbConnectorConfig.getConverter(),
+ DataType.COMMON_MESSAGE.ordinal(),
+ this.dbConnectorConfig.getTaskParallelism(),
+ this.dbConnectorConfig.getMode()
+ );
+
+ ((SinkDbConnectorConfig) this.dbConnectorConfig).setTopicRouteMap(topicRouteMap);
+
+ return this.dbConnectorConfig.getTaskDivideStrategy().divide(this.dbConnectorConfig, tdc);
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkTask.java b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkTask.java
new file mode 100644
index 000000000..a8e9b0a8e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkTask.java
@@ -0,0 +1,161 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.connector;
+
+import com.alibaba.fastjson.JSONObject;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.common.QueueMetaData;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SinkDataEntry;
+import io.openmessaging.connector.api.sink.SinkTask;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.common.DBUtils;
+import org.apache.rocketmq.connect.cassandra.config.ConfigUtil;
+import org.apache.rocketmq.connect.cassandra.sink.Updater;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * In the naming, we are using database for "keyspaces" and table for "columnFamily"
+ * This is because we kind of want the abstract data source to be aligned with SQL databases
+ */
+public class CassandraSinkTask extends SinkTask {
+ private static final Logger log = LoggerFactory.getLogger(CassandraSinkTask.class);
+
+ private Config config;
+
+ private CqlSession cqlSession;
+ private Updater updater;
+ private BlockingQueue tableQueue = new LinkedBlockingQueue();
+
+ public CassandraSinkTask() {
+ this.config = new Config();
+ }
+
+ @Override
+ public void put(Collection sinkDataEntries) {
+ try {
+ if (tableQueue.size() > 1) {
+ updater = tableQueue.poll(1000, TimeUnit.MILLISECONDS);
+ } else {
+ updater = tableQueue.peek();
+ }
+ log.info("Cassandra Sink Task trying to put()");
+ for (SinkDataEntry record : sinkDataEntries) {
+ Map fieldMap = new HashMap<>();
+ Object[] payloads = record.getPayload();
+ Schema schema = record.getSchema();
+ EntryType entryType = record.getEntryType();
+ String cfName = schema.getName();
+ String keyspaceName = schema.getDataSource();
+ List fields = schema.getFields();
+ Boolean parseError = false;
+ if (!fields.isEmpty()) {
+ for (Field field : fields) {
+ Object fieldValue = payloads[field.getIndex()];
+ Object[] value = JSONObject.parseArray((String)fieldValue).toArray();
+ if (value.length == 2) {
+ fieldMap.put(field, value);
+ } else {
+ log.error("parseArray error, fieldValue:{}", fieldValue);
+ parseError = true;
+ }
+ }
+ }
+ if (!parseError) {
+ log.info("Cassandra Sink Task trying to call updater.push()");
+ Boolean isSuccess = updater.push(keyspaceName, cfName, fieldMap, entryType);
+ if (!isSuccess) {
+ log.error("push data error, keyspaceName:{}, cfName:{}, entryType:{}, fieldMap:{}", keyspaceName, cfName, fieldMap, entryType);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("put sinkDataEntries error, {}", e);
+ }
+ }
+
+ @Override
+ public void commit(Map map) {
+
+ }
+
+ /**
+ * Remember always close the CqlSession according to
+ * https://docs.datastax.com/en/developer/java-driver/4.5/manual/core/
+ * @param props
+ */
+ @Override
+ public void start(KeyValue props) {
+ try {
+ ConfigUtil.load(props, this.config);
+ cqlSession = DBUtils.initCqlSession(config);
+ log.info("init data source success");
+ } catch (Exception e) {
+ log.error("Cannot start Cassandra Sink Task because of configuration error{}", e);
+ }
+ String mode = config.getMode();
+ if (mode.equals("bulk")) {
+ Updater updater = new Updater(config, cqlSession);
+ try {
+ updater.start();
+ tableQueue.add(updater);
+ } catch (Exception e) {
+ log.error("fail to start updater{}", e);
+ }
+ }
+
+ }
+
+ @Override
+ public void stop() {
+ try {
+ if (cqlSession != null){
+ cqlSession.close();
+ log.info("cassandra sink task connection is closed.");
+ }
+ } catch (Throwable e) {
+ log.warn("sink task stop error while closing connection to {}", "cassandra", e);
+ }
+ }
+
+ @Override
+ public void pause() {
+
+ }
+
+ @Override
+ public void resume() {
+
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceConnector.java
new file mode 100644
index 000000000..a8adc7455
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceConnector.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.connect.cassandra.common.DataType;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.config.DbConnectorConfig;
+import org.apache.rocketmq.connect.cassandra.config.SourceDbConnectorConfig;
+import org.apache.rocketmq.connect.cassandra.config.TaskDivideConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraSourceConnector extends SourceConnector {
+ private static final Logger log = LoggerFactory.getLogger(org.apache.rocketmq.connect.cassandra.connector.CassandraSourceConnector.class);
+ private DbConnectorConfig dbConnectorConfig;
+ private volatile boolean configValid = false;
+
+ public CassandraSourceConnector() {
+ dbConnectorConfig = new SourceDbConnectorConfig();
+ }
+
+ @Override
+ public String verifyAndSetConfig(KeyValue config) {
+
+ log.info("CassandraSourceConnector verifyAndSetConfig enter");
+ for (String requestKey : Config.REQUEST_CONFIG) {
+
+ if (!config.containsKey(requestKey)) {
+ return "Request config key: " + requestKey;
+ }
+ }
+ try {
+ this.dbConnectorConfig.validate(config);
+ } catch (IllegalArgumentException e) {
+ return e.getMessage();
+ }
+ this.configValid = true;
+
+ return "";
+ }
+
+ @Override
+ public void start() {
+
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public void pause() {
+
+ }
+
+ @Override
+ public void resume() {
+
+ }
+
+ @Override
+ public Class extends Task> taskClass() {
+ return CassandraSourceTask.class;
+ }
+
+ @Override
+ public List taskConfigs() {
+ log.info("List.start");
+ if (!configValid) {
+ return new ArrayList();
+ }
+
+ TaskDivideConfig tdc = new TaskDivideConfig(
+ this.dbConnectorConfig.getDbUrl(),
+ this.dbConnectorConfig.getDbPort(),
+ this.dbConnectorConfig.getDbUserName(),
+ this.dbConnectorConfig.getDbPassword(),
+ this.dbConnectorConfig.getLocalDataCenter(),
+ this.dbConnectorConfig.getConverter(),
+ DataType.COMMON_MESSAGE.ordinal(),
+ this.dbConnectorConfig.getTaskParallelism(),
+ this.dbConnectorConfig.getMode()
+ );
+ return this.dbConnectorConfig.getTaskDivideStrategy().divide(this.dbConnectorConfig, tdc);
+ }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceTask.java b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceTask.java
new file mode 100644
index 000000000..cac44ed79
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceTask.java
@@ -0,0 +1,168 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.connector;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.datastax.oss.driver.api.core.CqlSession;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.DataEntryBuilder;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.source.SourceTask;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+
+import org.apache.rocketmq.connect.cassandra.common.ConstDefine;
+import org.apache.rocketmq.connect.cassandra.common.DBUtils;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.config.ConfigUtil;
+import org.apache.rocketmq.connect.cassandra.schema.Table;
+import org.apache.rocketmq.connect.cassandra.schema.column.ColumnParser;
+import org.apache.rocketmq.connect.cassandra.source.Querier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraSourceTask extends SourceTask {
+
+ private static final Logger log = LoggerFactory.getLogger(org.apache.rocketmq.connect.cassandra.connector.CassandraSourceTask.class);
+
+ private Config config;
+
+ private DataSource dataSource;
+
+ private CqlSession cqlSession;
+
+ BlockingQueue tableQueue = new LinkedBlockingQueue();
+ static final String INCREMENTING_FIELD = "incrementing";
+ static final String TIMESTAMP_FIELD = "timestamp";
+ private Querier querier;
+
+ public CassandraSourceTask() {
+ this.config = new Config();
+ }
+
+ @Override
+ public Collection poll() {
+ List res = new ArrayList<>();
+ try {
+ if (tableQueue.size() > 1)
+ querier = tableQueue.poll(1000, TimeUnit.MILLISECONDS);
+ else
+ querier = tableQueue.peek();
+ Timer timer = new Timer();
+ try {
+ Thread.currentThread();
+ Thread.sleep(1000);//毫秒
+ } catch (Exception e) {
+ throw e;
+ }
+ querier.poll();
+ for (Table dataRow : querier.getList()) {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("nextQuery", "database");
+ jsonObject.put("nextPosition", "table");
+ Schema schema = new Schema();
+ schema.setDataSource(dataRow.getDatabase());
+ schema.setName(dataRow.getName());
+ schema.setFields(new ArrayList<>());
+ for (int i = 0; i < dataRow.getColList().size(); i++) {
+ String columnName = dataRow.getColList().get(i);
+ String rawDataType = dataRow.getRawDataTypeList().get(i);
+ Field field = new Field(i, columnName, ColumnParser.mapConnectorFieldType(rawDataType));
+ schema.getFields().add(field);
+ }
+ DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+ dataEntryBuilder.timestamp(System.currentTimeMillis()).queue(dataRow.getName())
+ .entryType(EntryType.UPDATE);
+ for (int i = 0; i < dataRow.getColList().size(); i++) {
+ Object[] value = new Object[2];
+ value[0] = value[1] = dataRow.getParserList().get(i).getValue(dataRow.getDataList().get(i));
+ dataEntryBuilder.putFiled(dataRow.getColList().get(i), JSONObject.toJSONString(value));
+ }
+
+ SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+ ByteBuffer.wrap((ConstDefine.PREFIX + config.getDbUrl() + config.getDbPort()).getBytes(StandardCharsets.UTF_8)),
+ ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8)));
+ res.add(sourceDataEntry);
+ log.debug("sourceDataEntry : {}", JSONObject.toJSONString(sourceDataEntry));
+ }
+ } catch (Exception e) {
+ log.error("Cassandra task poll error, current config:" + JSON.toJSONString(config), e);
+ }
+ log.debug("dataEntry poll successfully,{}", JSONObject.toJSONString(res));
+ return res;
+ }
+
+ @Override
+ public void start(KeyValue props) {
+ try {
+ ConfigUtil.load(props, this.config);
+ cqlSession = DBUtils.initCqlSession(config);
+ log.info("init data source success");
+ } catch (Exception e) {
+ log.error("Cannot start Cassandra Source Task because of configuration error{}", e);
+ }
+ Map