Skip to content

Commit c5b4b28

Browse files
committed
Support concurrent clients
1 parent 3f7076b commit c5b4b28

File tree

2 files changed

+65
-15
lines changed

2 files changed

+65
-15
lines changed

protorpc/echo/EchoClient.java

+60-15
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
package echo;
22

33
import java.net.InetSocketAddress;
4+
import java.util.concurrent.CountDownLatch;
5+
import java.util.concurrent.Executors;
6+
7+
import org.jboss.netty.channel.ChannelFactory;
8+
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
9+
10+
import com.google.protobuf.ServiceException;
411

512
import muduo.rpc.RpcChannel;
613
import muduo.rpc.RpcClient;
@@ -10,27 +17,65 @@
1017
import echo.EchoProto.EchoService.BlockingInterface;
1118

1219
public class EchoClient {
20+
static final int kRequests = 20000;
21+
22+
public static class Client implements Runnable {
23+
private ChannelFactory channelFactory;
24+
private InetSocketAddress serverAddr;
25+
private CountDownLatch latch;
26+
27+
public Client(ChannelFactory channelFactory, InetSocketAddress server, CountDownLatch latch) {
28+
this.channelFactory = channelFactory;
29+
this.serverAddr = server;
30+
this.latch = latch;
31+
}
32+
33+
@Override
34+
public void run() {
35+
System.out.println(Thread.currentThread());
36+
RpcClient client = new RpcClient(channelFactory);
37+
RpcChannel channel = client.blockingConnect(serverAddr);
38+
BlockingInterface remoteService = EchoService.newBlockingStub(channel);
39+
String payload = new String(new byte[100]);
40+
payload = "Hello";
41+
EchoRequest request = EchoRequest.newBuilder().setPayload(payload).build();
42+
43+
for (int i = 0; i < kRequests; ++i) {
44+
EchoResponse response;
45+
try {
46+
response = remoteService.echo(null, request);
47+
assert response.getPayload().equals(payload);
48+
} catch (ServiceException e) {
49+
// TODO Auto-generated catch block
50+
e.printStackTrace();
51+
}
52+
// System.out.println(response);
53+
}
54+
latch.countDown();
55+
System.out.println(Thread.currentThread());
56+
// System.out.println(response);
57+
channel.disconnect();
58+
// client.stop();
59+
}
60+
}
1361

1462
public static void main(String[] args) throws Exception {
15-
RpcClient client = new RpcClient();
16-
RpcChannel channel = client.blockingConnect(new InetSocketAddress(args[0], 8888));
17-
BlockingInterface remoteService = EchoService.newBlockingStub(channel);
18-
String payload = new String(new byte[100]);
19-
payload = "Hello";
20-
EchoRequest request = EchoRequest.newBuilder().setPayload(payload).build();
63+
ChannelFactory channelFactory = new NioClientSocketChannelFactory(
64+
Executors.newCachedThreadPool(),
65+
Executors.newCachedThreadPool());
66+
InetSocketAddress server = new InetSocketAddress(args[0], 8888);
67+
int N = 4;
68+
CountDownLatch latch = new CountDownLatch(N);
2169
long start = System.currentTimeMillis();
22-
int N = 20000;
70+
Thread[] threads = new Thread[N];
2371
for (int i = 0; i < N; ++i) {
24-
EchoResponse response = remoteService.echo(null, request);
25-
assert response.getPayload().equals(payload);
26-
//System.out.println(response);
72+
threads[i] = new Thread(new Client(channelFactory, server, latch));
73+
threads[i].start();
2774
}
75+
latch.await();
2876
long end = System.currentTimeMillis();
29-
System.err.println(end-start);
30-
System.err.println(N*1000L/(end-start));
31-
// System.out.println(response);
32-
channel.disconnect();
33-
client.stop();
77+
System.err.println(end - start);
78+
System.err.println(N * kRequests * 1000L / (end - start));
3479
}
3580

3681
}

protorpc/muduo/rpc/RpcClient.java

+5
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ public RpcClient() {
2626
bootstrap.setPipelineFactory(new RpcChannelPiplineFactory(this));
2727
}
2828

29+
public RpcClient(ChannelFactory channelFactory) {
30+
bootstrap = new ClientBootstrap(channelFactory);
31+
bootstrap.setPipelineFactory(new RpcChannelPiplineFactory(this));
32+
}
33+
2934
public RpcChannel blockingConnect(SocketAddress addr) {
3035
final CountDownLatch latch = new CountDownLatch(1);
3136
startConnect(addr, new NewChannelCallback() {

0 commit comments

Comments
 (0)