Skip to content

Commit

Permalink
[ISSUE #570] ASoC runtime optimization: Cassandra connectors (#587)
Browse files Browse the repository at this point in the history
* feat(connect-cassanra) merge affe's cassandra connector implementation

* Merge master into current branch : deleted .iml in connect-jms

* style(connect-cassandra): resolve all TODOs and meaningless comments
  • Loading branch information
imaffe authored Jul 28, 2020
0 parents commit e2cc843
Show file tree
Hide file tree
Showing 39 changed files with 3,819 additions and 0 deletions.
97 changes: 97 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# rocketmq-connect-cassandra

## rocketmq-connect-cassandra 打包
```
mvn clean install -DskipTest -U
```

## 目前安装会遇到的问题

目前的rocketmq-connect-cassandra 使用的是datastax-java-driver:4.5.0版本的cassandra-driver,由于在打包过程中还有没有解决的问题,该cassandra driver无法读取
位于driver包中的默认配置文件,因此我们需要手动下载cassandra driver的配置文件[reference.conf](https://github.com/datastax/java-driver/blob/4.5.0/core/src/main/resources/reference.conf) 并将其放置于classpath中。

该问题还仍然在解决的过程中。

## rocketmq-connect-cassandra 启动

* **cassandra-source-connector** 启动

```
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-cassandra-source-connector-name}
?config={"connector-class":"org.apache.rocketmq.connect.cassandra.connector.JdbcSourceConnector",“dbUrl”:"${source-db-ip}",dbPort”:"${source-db-port}",dbUsername”:"${source-db-username}",dbPassword”:"${source-db-password}","rocketmqTopic":"cassandraTopic","mode":"bulk","whiteDataBase":{"${source-db-name}":{"${source-table-name}":{"${source-column-name}":"${source-column-value}"}}},"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
```

* **cassandra-sink-connector** 启动

```
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-cassandra-sink-connector-name}
?config={"connector-class":"org.apache.rocketmq.connect.cassandra.connector.JdbcSinkConnector",“dbUrl”:"${sink-db-ip}",dbPort”:"${sink-db-port}",dbUsername”:"${sink-db-username}",dbPassword”:"${sink-db-password}","rocketmqTopic":"cassandraTopic","mode":"bulk","topicNames":"${sink-topic-name}","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
```
>**注:** `rocketmq-cassandra-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
## rocketmq-connect-cassandra 停止

```
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-cassandra-connector-name}/stop
```

## rocketmq-connect-cassandra 参数说明
* **cassandra-source-connector 参数说明**

参数 | 类型 | 是否必须 | 描述 | 样例
|---|---|---|---|---|
|dbUrl | String || source端 DB ip | 192.168.1.2|
|dbPort | String || source端 DB port | 3306 |
|dbUsername | String || source端 DB 用户名 | root |
|dbPassword | String || source端 DB 密码 | 123456 |
|rocketmqTopic | String || 待废弃的参数,需和topicNames相同 | jdbc_cassandra |
|topicNames | String || rocketmq默认每一个数据源中的表对应一个名字,该名称需和数据库表名称相同 | jdbc_cassandra |
|whiteDataBase | String || source端同步数据白名单,嵌套配置,为{DB名:{表名:{字段名:字段值}}},若无指定字段数据同步,字段名可设为NO-FILTER,值为任意 | {"DATABASE_TEST":{"TEST_DATA":{"name":"test"}}} |
|mode | String || source-connector 模式,目前仅支持bulk | bulk |
|localDataCenter | String || 待废弃 | cassandra 集群的datacenter名称,为必填项 |
|task-divide-strategy | Integer || task 分配策略, 默认值为 0,表示按照topic分配任务,每一个table便是一个topic | 0 |
|task-parallelism | Integer || task parallelism,默认值为 1,表示将topic拆分为多少个任务进行执行 | 2 |
|source-cluster | String || sink 端获取路由信息连接到的RocketMQ nameserver 地址 | 172.17.0.1:10911 |
|source-rocketmq | String || sink 端获取路由信息连接到的RocketMQ broker cluster 地址 | 127.0.0.1:9876 |
|source-record-converter | String || source data 解析 | org.apache.rocketmq.connect.runtime.converter.JsonConverter |


示例配置如下
```js
{
"connector-class":"org.apache.rocketmq.connect.cassandra.connector.CassandraSourceConnector",
"rocketmqTopic":"jdbc_cassandra",
"topicNames": "jdbc_cassandra",
"dbUrl":"127.0.0.1",
"dbPort":"9042",
"dbUsername":"cassandra",
"dbPassword":"cassandra",
"localDataCenter":"datacenter1",
"whiteDataBase": {
"jdbc":{
"jdbc_cassandra": {"NO-FILTER": "10"}
}
},
"mode": "bulk",
"task-parallelism": 1,
"source-cluster": "172.17.0.1:10911",
"source-rocketmq": "127.0.0.1:9876",
"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}
```
* **cassandra-sink-connector 参数说明**

参数 | 类型 | 是否必须 | 描述 | 样例
|---|---|---|---|---|
|dbUrl | String || sink端 DB ip | 192.168.1.2|
|dbPort | String || sink端 DB port | 3306 |
|dbUsername | String || sink端 DB 用户名 | root |
|dbPassword | String || sink端 DB 密码 | 123456 |
|topicNames | String || sink端同步数据的topic名字 | topic-1,topic-2 |
|mode | String || source-connector 模式,目前仅支持bulk | bulk |
|~~rocketmqTopic~~ | String || 待废弃 | cassandraTopic |
|task-divide-strategy | Integer || task 分配策略, 默认值为 0,表示按照topic分配任务,每一个table便是一个topic | 0 |
|task-parallelism | Integer || task parallelism,默认值为 1,表示将topic拆分为多少个任务进行执行 | 2 |
|source-rocketmq | String || sink 端获取路由信息连接到的RocketMQ nameserver 地址 | 172.17.0.1:10911 |
|source-cluster | String || sink 端获取路由信息连接到的RocketMQ broker cluster 地址 | 127.0.0.1:9876 |
|source-record-converter | String || source data 解析 | org.apache.rocketmq.connect.runtime.converter.JsonConverter |
276 changes: 276 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-connect-cassandra</artifactId>
<version>0.0.1-SNAPSHOT</version>

<name>connect-cassandra</name>
<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>

<issueManagement>
<system>jira</system>
<url>https://issues.apache.org/jira/browse/RocketMQ</url>
</issueManagement>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<!-- Compiler settings properties -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<rocketmq.version>4.5.2</rocketmq.version>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>versions-maven-plugin</artifactId>
<version>2.3</version>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>clirr-maven-plugin</artifactId>
<version>2.7</version>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<excludeTransitive>false</excludeTransitive>
<stripVersion>true</stripVersion>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<compilerVersion>${maven.compiler.source}</compilerVersion>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<argLine>-Xms512m -Xmx1024m</argLine>
<forkMode>always</forkMode>
<includes>
<include>**/*Test.java</include>
</includes>
</configuration>
</plugin>
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.6</version>
<configuration>
<locales>en_US</locales>
<outputEncoding>UTF-8</outputEncoding>
<inputEncoding>UTF-8</inputEncoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>3.0.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<version>0.12</version>
<configuration>
<excludes>
<exclude>README.md</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.4</version>
<configuration>
<charset>UTF-8</charset>
<locale>en_US</locale>
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
</configuration>
<executions>
<execution>
<id>aggregate</id>
<goals>
<goal>aggregate</goal>
</goals>
<phase>site</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<version>3.0.4</version>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<archive>
<!-- The Main Class Here doesn't make a lot sense since it was dynamically loaded-->
<manifest>
<mainClass>org.apache.rocketmq.connect.cassandra.connector.CassandraSinkConnector</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>2.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.6.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.12</version>
</dependency>
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-connector</artifactId>
<version>0.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-api</artifactId>
<version>0.3.1-alpha</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.13</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.0.13</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-remoting</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.3.2</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>io.javalin</groupId>
<artifactId>javalin</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.31</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core-shaded</artifactId>
<version>4.5.1</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-query-builder</artifactId>
<version>4.5.1</version>
</dependency>
</dependencies>

</project>
Loading

0 comments on commit e2cc843

Please sign in to comment.