Skip to content

Commit

Permalink
update: add unit test and error process
Browse files Browse the repository at this point in the history
  • Loading branch information
fengye404 committed Sep 23, 2024
1 parent f3050ab commit 7aa63aa
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 48 deletions.
10 changes: 8 additions & 2 deletions arthas-grpc-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<version>4.1.111.Final</version>
<version>4.1.72.Final</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>4.27.2</version>
<version>3.19.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
Expand All @@ -70,6 +70,12 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.taobao.arthas.grpc.server.handler;

import com.taobao.arthas.grpc.server.protobuf.annotation.ProtobufClass;

/**
* @author: FengYe
* @date: 2024/9/23 23:58
* @description: ErrorRes
*/
@ProtobufClass
public class ErrorRes {
private String errorMsg;

public String getErrorMsg() {
return errorMsg;
}

public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public GrpcResponse execute(GrpcRequest request) throws Throwable {
*/
public Class<?> getRequestClass(String serviceName, String methodName) {
//protobuf 规范只能有单入参
return grpcMethodInvokeMap.get(generateGrpcMethodKey(serviceName, methodName)).type().parameterArray()[0];
return Optional.ofNullable(grpcMethodInvokeMap.get(generateGrpcMethodKey(serviceName, methodName))).orElseThrow(() -> new RuntimeException("The specified grpc method does not exist")).type().parameterArray()[0];
}

public void checkGrpcStream(GrpcRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,14 @@ public ByteBuf getResponseData() {
return byteData;
}

public void writeResponseData(Object response) throws IOException {
public void writeResponseData(Object response) {
ProtobufCodec codec = ProtobufProxy.getCodecCacheSide(clazz);
byte[] encode = codec.encode(response);

byte[] encode = null;
try {
encode = codec.encode(response);
} catch (IOException e) {
throw new RuntimeException("ProtobufCodec encode error");
}
this.byteData = ByteUtil.newByteBuf();
this.byteData.writeBoolean(false);
this.byteData.writeInt(encode.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.taobao.arthas.grpc.server.protobuf.ProtobufCodec;
import com.taobao.arthas.grpc.server.protobuf.ProtobufProxy;
import com.taobao.arthas.grpc.server.utils.ByteUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http2.*;
Expand Down Expand Up @@ -43,7 +45,6 @@ protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) throws
}
}


@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
Expand All @@ -65,7 +66,6 @@ private void handleGrpcData(Http2DataFrame dataFrame, ChannelHandlerContext ctx)
GrpcRequest grpcRequest = dataBuffer.get(dataFrame.stream().id());
grpcRequest.writeData(dataFrame.content());

System.out.println(dataFrame.stream().id());
if (grpcRequest.isStream()) {
// 流式调用,即刻响应
try {
Expand All @@ -92,7 +92,7 @@ private void handleGrpcData(Http2DataFrame dataFrame, ChannelHandlerContext ctx)
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(response.getEndStreamHeader(), true).stream(dataFrame.stream()));
}
} catch (Throwable e) {
processError(ctx, e);
processError(ctx, e, dataFrame.stream());
}
} else {
// 非流式调用,等到 endStream 再响应
Expand All @@ -103,14 +103,20 @@ private void handleGrpcData(Http2DataFrame dataFrame, ChannelHandlerContext ctx)
ctx.writeAndFlush(new DefaultHttp2DataFrame(response.getResponseData()).stream(dataFrame.stream()));
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(response.getEndStreamHeader(), true).stream(dataFrame.stream()));
} catch (Throwable e) {
processError(ctx, e);
processError(ctx, e, dataFrame.stream());
}
}
}
}

private void processError(ChannelHandlerContext ctx, Throwable e) {
//todo
ctx.writeAndFlush(e.getMessage());
private void processError(ChannelHandlerContext ctx, Throwable e, Http2FrameStream stream) {
GrpcResponse response = new GrpcResponse();
ErrorRes errorRes = new ErrorRes();
errorRes.setErrorMsg(e.getMessage());
response.setClazz(ErrorRes.class);
response.writeResponseData(errorRes);
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(response.getEndHeader()).stream(stream));
ctx.writeAndFlush(new DefaultHttp2DataFrame(response.getResponseData()).stream(stream));
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(response.getEndStreamHeader(), true).stream(stream));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,22 @@
* @date: 2024/6/30 下午11:43
* @description: ArthasSampleServiceImpl
*/
@GrpcService("arthas.sample.ArthasTempService")
@GrpcService("arthas.unittest.ArthasUnittestService")
public class ArthasSampleServiceImpl implements ArthasSampleService {

@Override
@GrpcMethod("trace")
public ArthasUnittestResponse trace(ArthasUnittestRequest command) {
ArthasUnittestResponse arthasUnittestResponse = new ArthasUnittestResponse();
arthasUnittestResponse.setMessage("trace");
arthasUnittestResponse.setMessage(command.getMessage());
return arthasUnittestResponse;
}

@Override
@GrpcMethod(value = "watch", stream = true)
public ArthasUnittestResponse watch(ArthasUnittestRequest command) {
String message = command.getMessage();
ArthasUnittestResponse arthasUnittestResponse = new ArthasUnittestResponse();
arthasUnittestResponse.setMessage(message);
arthasUnittestResponse.setMessage(command.getMessage());
return arthasUnittestResponse;
}
}
29 changes: 0 additions & 29 deletions arthas-grpc-server/src/main/proto/arthasSample.proto

This file was deleted.

102 changes: 100 additions & 2 deletions arthas-grpc-server/src/test/java/unittest/grpc/GrpcTest.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,108 @@
package unittest.grpc;

import arthas.unittest.ArthasUnittest;
import arthas.unittest.ArthasUnittestServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.junit.Test;
import org.junit.jupiter.api.Disabled;

import java.util.concurrent.CountDownLatch;

/**
* @author: FengYe
* @date: 2024/9/19 22:20
* @description: grpc.GrpcTest
* @date: 2024/9/24 00:17
* @description: GrpcUnaryTest
*/
public class GrpcTest {
private static final String target = "localhost:9090";
private ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub blockingStub = null;
private ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = null;

@Disabled("跳过启动测试")
@Test
public void testUnary() {
ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
.usePlaintext()
.build();

blockingStub = ArthasUnittestServiceGrpc.newBlockingStub(channel);

try {
trace("trace");
} finally {
channel.shutdownNow();
}
}

@Disabled("跳过启动测试")
@Test
public void testStream() {
ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
.usePlaintext()
.build();

stub = ArthasUnittestServiceGrpc.newStub(channel);

try {
watch("watch1", "watch2", "watch3");
} finally {
channel.shutdownNow();
}
}

private void trace(String name) {
ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setMessage(name).build();
try {
ArthasUnittest.ArthasUnittestResponse res = blockingStub.trace(request);
System.out.println(res.getMessage());
} catch (StatusRuntimeException e) {
e.printStackTrace();
System.out.println("RPC failed: " + e.getStatus());
}
}

private void watch(String... names) {
// 使用 CountDownLatch 来等待所有响应
CountDownLatch finishLatch = new CountDownLatch(1);

StreamObserver<ArthasUnittest.ArthasUnittestRequest> watch = stub.watch(new StreamObserver<ArthasUnittest.ArthasUnittestResponse>() {
@Override
public void onNext(ArthasUnittest.ArthasUnittestResponse value) {
System.out.println("watch: " + value.getMessage());
}

@Override
public void onError(Throwable t) {

}

@Override
public void onCompleted() {
System.out.println("Finished sending watch.");
}
});


try {
for (String name : names) {
ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setMessage(name).build();
Thread.sleep(1000L);
watch.onNext(request);
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
watch.onCompleted();
}

// 等待服务器的响应
try {
finishLatch.await(); // 等待完成
} catch (InterruptedException e) {
System.out.println("Client interrupted: " + e.getMessage());
}
}
}

0 comments on commit 7aa63aa

Please sign in to comment.