diff --git a/arthas-grpc-server/pom.xml b/arthas-grpc-server/pom.xml index 352b7e8c12..6f3e8575d5 100644 --- a/arthas-grpc-server/pom.xml +++ b/arthas-grpc-server/pom.xml @@ -39,13 +39,13 @@ io.netty netty-codec-http2 - 4.1.111.Final + 4.1.72.Final com.google.protobuf protobuf-java - 4.27.2 + 3.19.2 @@ -70,6 +70,12 @@ io.grpc grpc-netty provided + + + io.netty + netty-codec-http2 + + io.grpc diff --git a/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/ErrorRes.java b/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/ErrorRes.java new file mode 100644 index 0000000000..c7cbe8d7ea --- /dev/null +++ b/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/ErrorRes.java @@ -0,0 +1,21 @@ +package com.taobao.arthas.grpc.server.handler; + +import com.taobao.arthas.grpc.server.protobuf.annotation.ProtobufClass; + +/** + * @author: FengYe + * @date: 2024/9/23 23:58 + * @description: ErrorRes + */ +@ProtobufClass +public class ErrorRes { + private String errorMsg; + + public String getErrorMsg() { + return errorMsg; + } + + public void setErrorMsg(String errorMsg) { + this.errorMsg = errorMsg; + } +} diff --git a/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/GrpcDispatcher.java b/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/GrpcDispatcher.java index 10f40b14cb..8f3daed1be 100644 --- a/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/GrpcDispatcher.java +++ b/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/GrpcDispatcher.java @@ -89,7 +89,7 @@ public GrpcResponse execute(GrpcRequest request) throws Throwable { */ public Class getRequestClass(String serviceName, String methodName) { //protobuf 规范只能有单入参 - return grpcMethodInvokeMap.get(generateGrpcMethodKey(serviceName, methodName)).type().parameterArray()[0]; + return Optional.ofNullable(grpcMethodInvokeMap.get(generateGrpcMethodKey(serviceName, methodName))).orElseThrow(() -> new RuntimeException("The specified grpc method does not exist")).type().parameterArray()[0]; } public void checkGrpcStream(GrpcRequest request) { diff --git a/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/GrpcResponse.java b/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/GrpcResponse.java index 1c6c2ccec3..bbf678fcd7 100644 --- a/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/GrpcResponse.java +++ b/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/GrpcResponse.java @@ -51,10 +51,14 @@ public ByteBuf getResponseData() { return byteData; } - public void writeResponseData(Object response) throws IOException { + public void writeResponseData(Object response) { ProtobufCodec codec = ProtobufProxy.getCodecCacheSide(clazz); - byte[] encode = codec.encode(response); - + byte[] encode = null; + try { + encode = codec.encode(response); + } catch (IOException e) { + throw new RuntimeException("ProtobufCodec encode error"); + } this.byteData = ByteUtil.newByteBuf(); this.byteData.writeBoolean(false); this.byteData.writeInt(encode.length); diff --git a/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/Http2Handler.java b/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/Http2Handler.java index fdf634f4eb..ff4c01a8b6 100644 --- a/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/Http2Handler.java +++ b/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/handler/Http2Handler.java @@ -2,6 +2,8 @@ import com.taobao.arthas.grpc.server.protobuf.ProtobufCodec; import com.taobao.arthas.grpc.server.protobuf.ProtobufProxy; +import com.taobao.arthas.grpc.server.utils.ByteUtil; +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http2.*; @@ -43,7 +45,6 @@ protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) throws } } - @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); @@ -65,7 +66,6 @@ private void handleGrpcData(Http2DataFrame dataFrame, ChannelHandlerContext ctx) GrpcRequest grpcRequest = dataBuffer.get(dataFrame.stream().id()); grpcRequest.writeData(dataFrame.content()); - System.out.println(dataFrame.stream().id()); if (grpcRequest.isStream()) { // 流式调用,即刻响应 try { @@ -92,7 +92,7 @@ private void handleGrpcData(Http2DataFrame dataFrame, ChannelHandlerContext ctx) ctx.writeAndFlush(new DefaultHttp2HeadersFrame(response.getEndStreamHeader(), true).stream(dataFrame.stream())); } } catch (Throwable e) { - processError(ctx, e); + processError(ctx, e, dataFrame.stream()); } } else { // 非流式调用,等到 endStream 再响应 @@ -103,14 +103,20 @@ private void handleGrpcData(Http2DataFrame dataFrame, ChannelHandlerContext ctx) ctx.writeAndFlush(new DefaultHttp2DataFrame(response.getResponseData()).stream(dataFrame.stream())); ctx.writeAndFlush(new DefaultHttp2HeadersFrame(response.getEndStreamHeader(), true).stream(dataFrame.stream())); } catch (Throwable e) { - processError(ctx, e); + processError(ctx, e, dataFrame.stream()); } } } } - private void processError(ChannelHandlerContext ctx, Throwable e) { - //todo - ctx.writeAndFlush(e.getMessage()); + private void processError(ChannelHandlerContext ctx, Throwable e, Http2FrameStream stream) { + GrpcResponse response = new GrpcResponse(); + ErrorRes errorRes = new ErrorRes(); + errorRes.setErrorMsg(e.getMessage()); + response.setClazz(ErrorRes.class); + response.writeResponseData(errorRes); + ctx.writeAndFlush(new DefaultHttp2HeadersFrame(response.getEndHeader()).stream(stream)); + ctx.writeAndFlush(new DefaultHttp2DataFrame(response.getResponseData()).stream(stream)); + ctx.writeAndFlush(new DefaultHttp2HeadersFrame(response.getEndStreamHeader(), true).stream(stream)); } } \ No newline at end of file diff --git a/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/service/impl/ArthasSampleServiceImpl.java b/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/service/impl/ArthasSampleServiceImpl.java index 5e5cb415e3..b01787c36c 100644 --- a/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/service/impl/ArthasSampleServiceImpl.java +++ b/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/service/impl/ArthasSampleServiceImpl.java @@ -11,23 +11,22 @@ * @date: 2024/6/30 下午11:43 * @description: ArthasSampleServiceImpl */ -@GrpcService("arthas.sample.ArthasTempService") +@GrpcService("arthas.unittest.ArthasUnittestService") public class ArthasSampleServiceImpl implements ArthasSampleService { @Override @GrpcMethod("trace") public ArthasUnittestResponse trace(ArthasUnittestRequest command) { ArthasUnittestResponse arthasUnittestResponse = new ArthasUnittestResponse(); - arthasUnittestResponse.setMessage("trace"); + arthasUnittestResponse.setMessage(command.getMessage()); return arthasUnittestResponse; } @Override @GrpcMethod(value = "watch", stream = true) public ArthasUnittestResponse watch(ArthasUnittestRequest command) { - String message = command.getMessage(); ArthasUnittestResponse arthasUnittestResponse = new ArthasUnittestResponse(); - arthasUnittestResponse.setMessage(message); + arthasUnittestResponse.setMessage(command.getMessage()); return arthasUnittestResponse; } } \ No newline at end of file diff --git a/arthas-grpc-server/src/main/proto/arthasSample.proto b/arthas-grpc-server/src/main/proto/arthasSample.proto deleted file mode 100644 index 33df5d8a80..0000000000 --- a/arthas-grpc-server/src/main/proto/arthasSample.proto +++ /dev/null @@ -1,29 +0,0 @@ -syntax = "proto3"; - -package arthas.sample; - - -enum StatusEnum { - START = 0; - STOP = 1; -} - -service ArthasSampleService { - rpc testUnary(ArthasSampleRequest) returns (ArthasSampleResponse); - rpc testBiStream(stream ArthasSampleRequest) returns (stream ArthasSampleResponse); -} - -message ArthasSampleRequest { - string name = 1; double age = 2; - int64 price = 3; - StatusEnum status = 4; - repeated TestClass testList = 5; -} - -message TestClass{ - string name = 1; -} - -message ArthasSampleResponse { - string message = 1; -} diff --git a/arthas-grpc-server/src/test/java/unittest/grpc/GrpcTest.java b/arthas-grpc-server/src/test/java/unittest/grpc/GrpcTest.java index 504282a14c..d6d57b12e0 100644 --- a/arthas-grpc-server/src/test/java/unittest/grpc/GrpcTest.java +++ b/arthas-grpc-server/src/test/java/unittest/grpc/GrpcTest.java @@ -1,10 +1,108 @@ package unittest.grpc; +import arthas.unittest.ArthasUnittest; +import arthas.unittest.ArthasUnittestServiceGrpc; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import org.junit.Test; +import org.junit.jupiter.api.Disabled; + +import java.util.concurrent.CountDownLatch; + /** * @author: FengYe - * @date: 2024/9/19 22:20 - * @description: grpc.GrpcTest + * @date: 2024/9/24 00:17 + * @description: GrpcUnaryTest */ public class GrpcTest { + private static final String target = "localhost:9090"; + private ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub blockingStub = null; + private ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = null; + + @Disabled("跳过启动测试") + @Test + public void testUnary() { + ManagedChannel channel = ManagedChannelBuilder.forTarget(target) + .usePlaintext() + .build(); + + blockingStub = ArthasUnittestServiceGrpc.newBlockingStub(channel); + + try { + trace("trace"); + } finally { + channel.shutdownNow(); + } + } + + @Disabled("跳过启动测试") + @Test + public void testStream() { + ManagedChannel channel = ManagedChannelBuilder.forTarget(target) + .usePlaintext() + .build(); + + stub = ArthasUnittestServiceGrpc.newStub(channel); + + try { + watch("watch1", "watch2", "watch3"); + } finally { + channel.shutdownNow(); + } + } + + private void trace(String name) { + ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setMessage(name).build(); + try { + ArthasUnittest.ArthasUnittestResponse res = blockingStub.trace(request); + System.out.println(res.getMessage()); + } catch (StatusRuntimeException e) { + e.printStackTrace(); + System.out.println("RPC failed: " + e.getStatus()); + } + } + + private void watch(String... names) { + // 使用 CountDownLatch 来等待所有响应 + CountDownLatch finishLatch = new CountDownLatch(1); + + StreamObserver watch = stub.watch(new StreamObserver() { + @Override + public void onNext(ArthasUnittest.ArthasUnittestResponse value) { + System.out.println("watch: " + value.getMessage()); + } + + @Override + public void onError(Throwable t) { + + } + + @Override + public void onCompleted() { + System.out.println("Finished sending watch."); + } + }); + + + try { + for (String name : names) { + ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setMessage(name).build(); + Thread.sleep(1000L); + watch.onNext(request); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + watch.onCompleted(); + } + // 等待服务器的响应 + try { + finishLatch.await(); // 等待完成 + } catch (InterruptedException e) { + System.out.println("Client interrupted: " + e.getMessage()); + } + } }