Skip to content

Commit 32be3da

Browse files
authored
Expose nettyRemotingServer (#153)
* add constructor, expose nettyRemotingServer * code review
1 parent bc9a820 commit 32be3da

File tree

2 files changed

+42
-6
lines changed

2 files changed

+42
-6
lines changed

src/main/java/io/openmessaging/storage/dledger/DLedgerRpcNettyService.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.concurrent.Executors;
4444
import java.util.concurrent.ThreadFactory;
4545
import java.util.concurrent.atomic.AtomicInteger;
46+
import org.apache.rocketmq.remoting.ChannelEventListener;
4647
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
4748
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
4849
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
@@ -96,6 +97,14 @@ public Thread newThread(Runnable r) {
9697
});
9798

9899
public DLedgerRpcNettyService(DLedgerServer dLedgerServer) {
100+
this(dLedgerServer, null, null, null);
101+
}
102+
103+
public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig) {
104+
this(dLedgerServer, nettyServerConfig, nettyClientConfig, null);
105+
}
106+
107+
public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
99108
this.dLedgerServer = dLedgerServer;
100109
this.memberState = dLedgerServer.getMemberState();
101110
NettyRequestProcessor protocolProcessor = new NettyRequestProcessor() {
@@ -110,9 +119,11 @@ public boolean rejectRequest() {
110119
}
111120
};
112121
//start the remoting server
113-
NettyServerConfig nettyServerConfig = new NettyServerConfig();
114-
nettyServerConfig.setListenPort(Integer.valueOf(memberState.getSelfAddr().split(":")[1]));
115-
this.remotingServer = new NettyRemotingServer(nettyServerConfig, null);
122+
if (nettyServerConfig == null) {
123+
nettyServerConfig = new NettyServerConfig();
124+
}
125+
nettyServerConfig.setListenPort(Integer.parseInt(memberState.getSelfAddr().split(":")[1]));
126+
this.remotingServer = new NettyRemotingServer(nettyServerConfig, channelEventListener);
116127
this.remotingServer.registerProcessor(DLedgerRequestCode.METADATA.getCode(), protocolProcessor, null);
117128
this.remotingServer.registerProcessor(DLedgerRequestCode.APPEND.getCode(), protocolProcessor, null);
118129
this.remotingServer.registerProcessor(DLedgerRequestCode.GET.getCode(), protocolProcessor, null);
@@ -123,8 +134,10 @@ public boolean rejectRequest() {
123134
this.remotingServer.registerProcessor(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), protocolProcessor, null);
124135

125136
//start the remoting client
126-
this.remotingClient = new NettyRemotingClient(new NettyClientConfig(), null);
127-
137+
if (nettyClientConfig == null) {
138+
nettyClientConfig = new NettyClientConfig();
139+
}
140+
this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
128141
}
129142

130143
private String getPeerAddr(RequestOrResponse request) {
@@ -476,4 +489,8 @@ public DLedgerServer getdLedgerServer() {
476489
public void setdLedgerServer(DLedgerServer dLedgerServer) {
477490
this.dLedgerServer = dLedgerServer;
478491
}
492+
493+
public NettyRemotingServer getRemotingServer() {
494+
return remotingServer;
495+
}
479496
}

src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@
5757
import java.util.concurrent.TimeUnit;
5858
import java.util.concurrent.CompletableFuture;
5959

60+
import org.apache.rocketmq.remoting.ChannelEventListener;
61+
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
62+
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
63+
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
6064
import org.slf4j.Logger;
6165
import org.slf4j.LoggerFactory;
6266

@@ -76,10 +80,18 @@ public class DLedgerServer implements DLedgerProtocolHandler {
7680
private Optional<StateMachineCaller> fsmCaller;
7781

7882
public DLedgerServer(DLedgerConfig dLedgerConfig) {
83+
this(dLedgerConfig, null, null, null);
84+
}
85+
86+
public DLedgerServer(DLedgerConfig dLedgerConfig, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig) {
87+
this(dLedgerConfig, nettyServerConfig, nettyClientConfig, null);
88+
}
89+
90+
public DLedgerServer(DLedgerConfig dLedgerConfig, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
7991
this.dLedgerConfig = dLedgerConfig;
8092
this.memberState = new MemberState(dLedgerConfig);
8193
this.dLedgerStore = createDLedgerStore(dLedgerConfig.getStoreType(), this.dLedgerConfig, this.memberState);
82-
dLedgerRpcService = new DLedgerRpcNettyService(this);
94+
dLedgerRpcService = new DLedgerRpcNettyService(this, nettyServerConfig, nettyClientConfig, channelEventListener);
8395
dLedgerEntryPusher = new DLedgerEntryPusher(dLedgerConfig, memberState, dLedgerStore, dLedgerRpcService);
8496
dLedgerLeaderElector = new DLedgerLeaderElector(dLedgerConfig, memberState, dLedgerRpcService);
8597
executorService = Executors.newSingleThreadScheduledExecutor(r -> {
@@ -433,4 +445,11 @@ public DLedgerLeaderElector getdLedgerLeaderElector() {
433445
public DLedgerConfig getdLedgerConfig() {
434446
return dLedgerConfig;
435447
}
448+
449+
public NettyRemotingServer getRemotingServer() {
450+
if (this.dLedgerRpcService instanceof DLedgerRpcNettyService) {
451+
return ((DLedgerRpcNettyService)this.dLedgerRpcService).getRemotingServer();
452+
}
453+
return null;
454+
}
436455
}

0 commit comments

Comments
 (0)