Skip to content

Commit

Permalink
feat(runtime) [ISSUE #570] ASoC connect runtime optimization: Offset …
Browse files Browse the repository at this point in the history
…synchronization (#624)

* Optimize configuration synchronization

* Slave nodes serve read-only requests, and fix format problems

* Fix format

* Fix offset synchronization have the same partitions

* Update taskId and Fix bug when task restart

* Change offset commit mode to incremental updating

* Fix get clientId problem

* Add RPC

* Fix format

* Fix gRPC generated files checkstyle

* Fix bugs and adjust the format

* Fix bug
  • Loading branch information
Dreaouth authored Sep 14, 2020
1 parent 5cbec0c commit 045af48
Show file tree
Hide file tree
Showing 49 changed files with 5,016 additions and 724 deletions.
17 changes: 16 additions & 1 deletion rocketmq-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
</license>
</licenses>
<properties>
<rocketmq.version>4.4.0</rocketmq.version>
<rocketmq.version>4.7.0</rocketmq.version>
<junit.version>4.11</junit.version>
<assertj.version>2.6.0</assertj.version>
<mockito.version>2.6.3</mockito.version>
Expand Down Expand Up @@ -124,6 +124,21 @@
<artifactId>reflections</artifactId>
<version>${reflections.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.31.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.31.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.31.0</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
21 changes: 20 additions & 1 deletion rocketmq-connect/rocketmq-connect-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,18 @@
<maven.compiler.target>1.8</maven.compiler.target>

<!-- RocketMQ Version-->
<rocketmq.version>4.5.2</rocketmq.version>
<rocketmq.version>4.7.0</rocketmq.version>

</properties>

<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<resources>
<resource>
<directory>src/main/resources</directory>
Expand Down Expand Up @@ -230,5 +237,17 @@
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public ConnectController(

this.connectConfig = connectConfig;
this.clusterManagementService = new ClusterManagementServiceImpl(connectConfig);
this.configManagementService = new ConfigManagementServiceImpl(connectConfig, plugin);
this.configManagementService = new ConfigManagementServiceImpl(connectConfig, clusterManagementService, plugin);
this.positionManagementService = new PositionManagementServiceImpl(connectConfig);
this.offsetManagementService = new OffsetManagementServiceImpl(connectConfig);
this.worker = new Worker(connectConfig, positionManagementService, offsetManagementService, plugin);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ private static void start(ConnectController controller) {
controller.start();
String tip = "The worker [" + controller.getClusterManagementService().getCurrentWorker() + "] boot success.";
log.info(tip);
System.out.printf("%s%n", tip);
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.connect.runtime.common;

public class ConfigWrapper extends ConnAndTaskConfigs {

private String leader;

private ConnAndTaskConfigs connAndTaskConfigs;

public String getLeader() {
return leader;
}

public void setLeader(String leader) {
this.leader = leader;
}

public ConnAndTaskConfigs getConnAndTaskConfigs() {
return connAndTaskConfigs;
}

public void setConnAndTaskConfigs(ConnAndTaskConfigs connAndTaskConfigs) {
this.connAndTaskConfigs = connAndTaskConfigs;
}

@Override
public String toString() {
return "ConnAndTaskConfigs{" +
"leader={" + this.getLeader() + "}" +
"connectorConfigs=" + this.connAndTaskConfigs.getConnectorConfigs() +
", taskConfigs=" + this.connAndTaskConfigs.getTaskConfigs() +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.connect.runtime.common;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Objects;

public class PositionValue implements Serializable {

private ByteBuffer partition;

private ByteBuffer position;

public PositionValue(ByteBuffer partition, ByteBuffer position) {
this.partition = partition;
this.position = position;
}

public ByteBuffer getPartition() {
return partition;
}

public void setPartition(ByteBuffer partition) {
this.partition = partition;
}

public ByteBuffer getPosition() {
return position;
}

public void setPosition(ByteBuffer position) {
this.position = position;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof PositionValue)) return false;
PositionValue that = (PositionValue) o;
return Objects.equals(getPartition(), that.getPartition()) &&
Objects.equals(getPosition(), that.getPosition());
}

@Override
public int hashCode() {
return Objects.hash(getPartition(), getPosition());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ public class ConnectConfig {

private String namesrvAddr;

private int workerRole;

private String leaderID;

private String workerID;

private String workerAddr;

private String rmqProducerGroup = "connector-producer-group";

private int maxMessageSize;
Expand Down Expand Up @@ -111,6 +119,38 @@ public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}

public int getWorkerRole() {
return workerRole;
}

public void setWorkerRole(int workerRole) {
this.workerRole = workerRole;
}

public String getLeaderID() {
return leaderID;
}

public void setLeaderID(String leaderID) {
this.leaderID = leaderID;
}

public String getWorkerID() {
return workerID;
}

public void setWorkerID(String workerID) {
this.workerID = workerID;
}

public String getWorkerAddr() {
return workerAddr;
}

public void setWorkerAddr(String workerAddr) {
this.workerAddr = workerAddr;
}

public String getRmqProducerGroup() {
return rmqProducerGroup;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.connect.runtime.config;

public class RPCConfigDefine {
public static final int PORT = 50051;
}
Loading

0 comments on commit 045af48

Please sign in to comment.