From 3745f08752d49a418c712e9f800c1b3946b437a7 Mon Sep 17 00:00:00 2001 From: x956 <942995218@qq.com> Date: Fri, 22 Sep 2023 14:48:11 +0800 Subject: [PATCH] add arthas-grpc-web-proxy module (#2668) --- arthas-grpc-web-proxy/README.md | 40 ++++ arthas-grpc-web-proxy/pom.xml | 147 +++++++++++++ .../arthas/grpcweb/proxy/CorsUtils.java | 27 +++ .../proxy/GrpcServiceConnectionManager.java | 44 ++++ .../proxy/GrpcWebClientInterceptor.java | 75 +++++++ .../grpcweb/proxy/GrpcWebRequestHandler.java | 183 +++++++++++++++++ .../arthas/grpcweb/proxy/MessageDeframer.java | 134 ++++++++++++ .../arthas/grpcweb/proxy/MessageFramer.java | 43 ++++ .../arthas/grpcweb/proxy/MessageUtils.java | 82 ++++++++ .../arthas/grpcweb/proxy/MetadataUtil.java | 80 ++++++++ .../grpcweb/proxy/SendGrpcWebResponse.java | 186 +++++++++++++++++ .../grpcweb/proxy/SingleHttpChunkedInput.java | 100 +++++++++ .../proxy/server/GrpcWebProxyHandler.java | 54 +++++ .../proxy/server/GrpcWebProxyServer.java | 73 +++++++ .../server/GrpcWebProxyServerInitializer.java | 26 +++ .../grpcweb/proxy/server/CorsUtilsTest.java | 16 ++ .../proxy/server/GrpcWebProxyServerTest.java | 194 ++++++++++++++++++ .../proxy/server/MessageDeframerTest.java | 33 +++ .../proxy/server/MessageUtilsTest.java | 33 +++ .../grpcweb/proxy/server/StartGrpcTest.java | 32 +++ .../proxy/server/StartGrpcWebProxyTest.java | 19 ++ .../proxy/server/grpcService/EchoImpl.java | 81 ++++++++ .../server/grpcService/GreeterService.java | 34 +++ .../proxy/server/grpcService/HelloImpl.java | 40 ++++ .../src/test/proto/echo.proto | 100 +++++++++ .../src/test/proto/greeter.proto | 45 ++++ .../src/test/proto/helloworld.proto | 37 ++++ pom.xml | 1 + 28 files changed, 1959 insertions(+) create mode 100644 arthas-grpc-web-proxy/README.md create mode 100644 arthas-grpc-web-proxy/pom.xml create mode 100644 arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/CorsUtils.java create mode 100644 arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/GrpcServiceConnectionManager.java create mode 100644 arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/GrpcWebClientInterceptor.java create mode 100644 arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/GrpcWebRequestHandler.java create mode 100644 arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/MessageDeframer.java create mode 100644 arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/MessageFramer.java create mode 100644 arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/MessageUtils.java create mode 100644 arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/MetadataUtil.java create mode 100644 arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/SendGrpcWebResponse.java create mode 100644 arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/SingleHttpChunkedInput.java create mode 100644 arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/server/GrpcWebProxyHandler.java create mode 100644 arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/server/GrpcWebProxyServer.java create mode 100644 arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/server/GrpcWebProxyServerInitializer.java create mode 100644 arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/CorsUtilsTest.java create mode 100644 arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/GrpcWebProxyServerTest.java create mode 100644 arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/MessageDeframerTest.java create mode 100644 arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/MessageUtilsTest.java create mode 100644 arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/StartGrpcTest.java create mode 100644 arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/StartGrpcWebProxyTest.java create mode 100644 arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/grpcService/EchoImpl.java create mode 100644 arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/grpcService/GreeterService.java create mode 100644 arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/grpcService/HelloImpl.java create mode 100644 arthas-grpc-web-proxy/src/test/proto/echo.proto create mode 100644 arthas-grpc-web-proxy/src/test/proto/greeter.proto create mode 100644 arthas-grpc-web-proxy/src/test/proto/helloworld.proto diff --git a/arthas-grpc-web-proxy/README.md b/arthas-grpc-web-proxy/README.md new file mode 100644 index 0000000000..a7393b846c --- /dev/null +++ b/arthas-grpc-web-proxy/README.md @@ -0,0 +1,40 @@ +## netty grpc web proxy + +from: https://github.com/grpc/grpc-web/tree/1.4.2/src/connector + +原项目已废弃删除,本项目改用 netty 来做转发。 + +## 缺点 + +原项目需要 `.proto` 文件编译的 `.class`才能运行,比如`GreeterGrpc`,本项目同样有这个问题。 + + +## 测试 + +工程导入IDE之后,进入test目录 + +在 com.taobao.arthas.grpcweb.proxy.server.GrpcWebProxyServerTest 启动测试 + +也可以用原项目的相关工程来测试 + +* https://github.com/grpc/grpc-web/ + +## 开发验证 + +可以用其它的 grpc web proxy来抓包辅助验证。 + +### 用 envoy + +下载envoy 后,可以用本项目里的`envoy.yaml` + +* `envoy --config-path ./envoy.yaml` + +### 使用 grpcwebproxy + +* https://github.com/improbable-eng/grpc-web/blob/master/go/grpcwebproxy/README.md + +下载后,启动: + +* `grpcwebproxy --backend_addr 127.0.0.1:9090 --run_tls_server=false --allow_all_origins` + + diff --git a/arthas-grpc-web-proxy/pom.xml b/arthas-grpc-web-proxy/pom.xml new file mode 100644 index 0000000000..a3f44278ca --- /dev/null +++ b/arthas-grpc-web-proxy/pom.xml @@ -0,0 +1,147 @@ + + + + arthas-all + com.taobao.arthas + ${revision} + ../pom.xml + + 4.0.0 + arthas-grpc-web-proxy + arthas-grpc-web-proxy + https://github.com/alibaba/arthas + + 1.8 + 1.46.0 + + + + + io.grpc + grpc-bom + ${grpc.version} + pom + import + + + + + + + + io.netty + netty-codec-http + + + com.alibaba.arthas + arthas-repackage-logger + + + io.grpc + grpc-netty + provided + + + io.grpc + grpc-services + provided + + + javax.annotation + javax.annotation-api + 1.3.2 + provided + true + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + test + + + ch.qos.logback + logback-core + test + + + org.junit.vintage + junit-vintage-engine + test + + + org.junit.jupiter + junit-jupiter + test + + + org.apache.httpcomponents + httpmime + 4.5.2 + test + + + com.taobao.arthas + arthas-common + ${project.version} + test + + + com.taobao.arthas + arthas-common + ${project.version} + + + + + + mac + + + mac + + + + osx-x86_64 + + + + + ${project.artifactId} + + + kr.motd.maven + os-maven-plugin + 1.6.2 + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + ${basedir}/src/main/proto + com.google.protobuf:protoc:3.11.0:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:1.28.0:exe:${os.detected.classifier} + + + + + test-compile + test-compile-custom + + + + + + + + + \ No newline at end of file diff --git a/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/CorsUtils.java b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/CorsUtils.java new file mode 100644 index 0000000000..8968c7f631 --- /dev/null +++ b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/CorsUtils.java @@ -0,0 +1,27 @@ +package com.taobao.arthas.grpcweb.proxy; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaders; + +/** + * TODO 支持让用户配置更精细的 cors header + * @author hengyunabc 2023-09-07 + * + */ +public class CorsUtils { + + public static void updateCorsHeader(HttpHeaders headers) { +// headers.set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, +// StringUtils.joinWith(",", "user-agent", "cache-control", "content-type", "content-transfer-encoding", +// "grpc-timeout", "keep-alive")); + headers.set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "*"); + + headers.set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*"); + headers.set(HttpHeaderNames.ACCESS_CONTROL_REQUEST_HEADERS, "content-type,x-grpc-web,x-user-agent"); + headers.set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS,GET,POST,HEAD"); + +// headers.set(HttpHeaderNames.ACCESS_CONTROL_EXPOSE_HEADERS, +// StringUtils.joinWith(",", "grpc-status", "grpc-message")); + headers.set(HttpHeaderNames.ACCESS_CONTROL_EXPOSE_HEADERS, "*"); + } +} diff --git a/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/GrpcServiceConnectionManager.java b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/GrpcServiceConnectionManager.java new file mode 100644 index 0000000000..3be3b570da --- /dev/null +++ b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/GrpcServiceConnectionManager.java @@ -0,0 +1,44 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.taobao.arthas.grpcweb.proxy; + +import io.grpc.Channel; +import io.grpc.ClientInterceptors; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import com.alibaba.arthas.deps.org.slf4j.Logger; +import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; + +/** + * TODO: Manage the connection pool to talk to the grpc-service + */ +public class GrpcServiceConnectionManager { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName()); + private final ManagedChannel channel; + + public GrpcServiceConnectionManager(int grpcPortNum) { + // TODO: Manage a connection pool. + channel = ManagedChannelBuilder.forAddress("localhost", grpcPortNum).usePlaintext().build(); + logger.info("**** connection channel initiated"); + } + + Channel getChannelWithClientInterceptor(GrpcWebClientInterceptor interceptor) { + return ClientInterceptors.intercept(channel, interceptor); + } +} diff --git a/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/GrpcWebClientInterceptor.java b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/GrpcWebClientInterceptor.java new file mode 100644 index 0000000000..40534f77f4 --- /dev/null +++ b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/GrpcWebClientInterceptor.java @@ -0,0 +1,75 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.taobao.arthas.grpcweb.proxy; + +import io.grpc.*; +import io.grpc.ClientCall.Listener; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; + +import java.util.concurrent.CountDownLatch; + +class GrpcWebClientInterceptor implements ClientInterceptor { + + private final CountDownLatch latch; + private final SendGrpcWebResponse sendResponse; + + GrpcWebClientInterceptor(CountDownLatch latch, SendGrpcWebResponse send) { + this.latch = latch; + sendResponse = send; + } + + @Override + public ClientCall interceptCall(MethodDescriptor method, + CallOptions callOptions, Channel channel) { + return new SimpleForwardingClientCall(channel.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + super.start(new MetadataResponseListener(responseListener), headers); + } + }; + } + + class MetadataResponseListener extends SimpleForwardingClientCallListener { + private boolean headersSent = false; + + MetadataResponseListener(Listener responseListener) { + super(responseListener); + } + + @Override + public void onHeaders(Metadata h) { + sendResponse.writeHeaders(h); + headersSent = true; + } + + @Override + public void onClose(Status s, Metadata t) { + // TODO 这个函数会在 onCompleted 之前回调,这里有点奇怪 + if (!headersSent) { + // seems, sometimes onHeaders() is not called before this method is called! + // so far, they are the error cases. let onError() method in ClientListener + // handle this call. Could ignore this. + // TODO is this correct? what if onError() never gets called? + } else { + sendResponse.writeTrailer(s, t); + latch.countDown(); + } + super.onClose(s, t); + } + } +} diff --git a/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/GrpcWebRequestHandler.java b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/GrpcWebRequestHandler.java new file mode 100644 index 0000000000..62de829feb --- /dev/null +++ b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/GrpcWebRequestHandler.java @@ -0,0 +1,183 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.taobao.arthas.grpcweb.proxy; + +import com.taobao.arthas.common.Pair; +import io.grpc.Channel; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.stub.MetadataUtils; +import io.grpc.stub.StreamObserver; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.*; +import com.alibaba.arthas.deps.org.slf4j.Logger; +import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Method; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class GrpcWebRequestHandler { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName()); + private final GrpcServiceConnectionManager grpcServiceConnectionManager; + + public GrpcWebRequestHandler(GrpcServiceConnectionManager g) { + grpcServiceConnectionManager = g; + } + + public void handle(ChannelHandlerContext ctx, FullHttpRequest req) { + // 处理 CORS OPTIONS 请求 + if (req.method().equals(HttpMethod.OPTIONS)) { + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + CorsUtils.updateCorsHeader(response.headers()); + ctx.writeAndFlush(response); + return; + } + + String contentTypeStr = req.headers().get(HttpHeaderNames.CONTENT_TYPE); + + MessageUtils.ContentType contentType = MessageUtils.validateContentType(contentTypeStr); + SendGrpcWebResponse sendResponse = new SendGrpcWebResponse(ctx, req); + + try { + // From the request, get the rpc-method name and class name and then get their + // corresponding + // concrete objects. + QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri()); + String pathInfo = queryStringDecoder.path(); + + Pair classAndMethodNames = getClassAndMethod(pathInfo); + String className = classAndMethodNames.getFirst(); + String methodName = classAndMethodNames.getSecond(); + Class cls = getClassObject(className); + if (cls == null) { + logger.error("cannot find service impl in the request, className: " + className); + // incorrect classname specified in the request. + sendResponse.returnUnimplementedStatusCode(className); + return; + } + + // Create a ClientInterceptor object + CountDownLatch latch = new CountDownLatch(1); + GrpcWebClientInterceptor interceptor = new GrpcWebClientInterceptor(latch, sendResponse); + Channel channel = grpcServiceConnectionManager.getChannelWithClientInterceptor(interceptor); + + // get the stub for the rpc call and the method to be called within the stub + io.grpc.stub.AbstractStub asyncStub = getRpcStub(channel, cls, "newStub"); + Metadata headers = MetadataUtil.getHtpHeaders(req.headers()); + if (!headers.keys().isEmpty()) { + asyncStub = MetadataUtils.attachHeaders(asyncStub, headers); + } + Method asyncStubCall = getRpcMethod(asyncStub, methodName); + // Get the input object bytes + ByteBuf content = req.content(); + InputStream in = new ByteBufInputStream(content); + MessageDeframer deframer = new MessageDeframer(); + Object inObj = null; + if (deframer.processInput(in, contentType)) { + inObj = MessageUtils.getInputProtobufObj(asyncStubCall, deframer.getMessageBytes()); + } + + // Invoke the rpc call + asyncStubCall.invoke(asyncStub, inObj, new GrpcCallResponseReceiver(sendResponse, latch)); + if (!latch.await(500 * 1000, TimeUnit.MILLISECONDS)) { + logger.warn("grpc call took too long!"); + } + } catch (Exception e) { + logger.error("try to invoke grpc serivce error, uri: {}", req.uri(), e); + sendResponse.writeError(Status.UNAVAILABLE.withCause(e)); + } + } + + private Pair getClassAndMethod(String pathInfo) throws IllegalArgumentException { + // pathInfo starts with "/". ignore that first char. + String[] rpcClassAndMethodTokens = pathInfo.substring(1).split("/"); + if (rpcClassAndMethodTokens.length != 2) { + throw new IllegalArgumentException("incorrect pathinfo: " + pathInfo); + } + + String rpcClassName = rpcClassAndMethodTokens[0]; + String rpcMethodNameRecvd = rpcClassAndMethodTokens[1]; + String rpcMethodName = rpcMethodNameRecvd.substring(0, 1).toLowerCase() + rpcMethodNameRecvd.substring(1); + return new Pair<>(rpcClassName, rpcMethodName); + } + + private Class getClassObject(String className) { + Class rpcClass = null; + try { + rpcClass = Class.forName(className + "Grpc"); + } catch (ClassNotFoundException e) { + logger.info("no such class " + className); + } + return rpcClass; + } + + private io.grpc.stub.AbstractStub getRpcStub(Channel ch, Class cls, String stubName) { + try { + Method m = cls.getDeclaredMethod(stubName, io.grpc.Channel.class); + return (io.grpc.stub.AbstractStub) m.invoke(null, ch); + } catch (Exception e) { + logger.warn("Error when fetching " + stubName + " for: " + cls.getName()); + throw new IllegalArgumentException(e); + } + } + + /** + * Find the matching method in the stub class. + */ + private Method getRpcMethod(Object stub, String rpcMethodName) { + for (Method m : stub.getClass().getMethods()) { + if (m.getName().equals(rpcMethodName)) { + return m; + } + } + throw new IllegalArgumentException("Couldn't find rpcmethod: " + rpcMethodName); + } + + private static class GrpcCallResponseReceiver implements StreamObserver { + private final SendGrpcWebResponse sendResponse; + private final CountDownLatch latch; + + GrpcCallResponseReceiver(SendGrpcWebResponse s, CountDownLatch c) { + sendResponse = s; + latch = c; + } + + @Override + public void onNext(java.lang.Object resp) { + // TODO verify that the resp object is of Class instance returnedCls. + byte[] outB = ((com.google.protobuf.GeneratedMessageV3) resp).toByteArray(); + sendResponse.writeResponse(outB); + } + + @Override + public void onError(Throwable t) { + Status s = Status.fromThrowable(t); + sendResponse.writeError(s); + latch.countDown(); + } + + @Override + public void onCompleted() { + sendResponse.writeTrailer(Status.OK, null); + latch.countDown(); + } + } +} diff --git a/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/MessageDeframer.java b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/MessageDeframer.java new file mode 100644 index 0000000000..f916013840 --- /dev/null +++ b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/MessageDeframer.java @@ -0,0 +1,134 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.taobao.arthas.grpcweb.proxy; + +import com.taobao.arthas.grpcweb.proxy.MessageUtils.ContentType; +import com.taobao.arthas.common.IOUtils; +import com.alibaba.arthas.deps.org.slf4j.Logger; +import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.lang.invoke.MethodHandles; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; + +/** + * Reads frames from the input bytes and returns a single message. + */ +public class MessageDeframer { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName()); + static final byte DATA_BYTE = (byte) 0x00; + + // TODO: fix this code to be able to handle upto 4GB input size. + private int mLength = 0; + private int mReadSoFar = 0; + + private ArrayList mFrames = new ArrayList<>(); + private byte[] mMsg = null; + private int mNumFrames; + + byte[] getMessageBytes() { + return mMsg; + } + + int getLength() { + return mLength; + } + + int getNumberOfFrames() { + return mNumFrames; + } + + /** + * Reads the bytes from the given InputStream and populates bytes in + * {@link #mMsg} + */ + public boolean processInput(InputStream in, MessageUtils.ContentType contentType) { + byte[] inBytes; + try { + InputStream inStream = (contentType == ContentType.GRPC_WEB_TEXT) ? Base64.getDecoder().wrap(in) : in; + inBytes = IOUtils.getBytes(inStream); + } catch (IOException e) { + e.printStackTrace(); + logger.warn("invalid input"); + return false; + } + if (inBytes.length < 5) { + logger.debug("invalid input. Expected minimum of 5 bytes"); + return false; + } + + while (getNextFrameBytes(inBytes)) { + } + mNumFrames = mFrames.size(); + + // common case is only one frame. + if (mNumFrames == 1) { + mMsg = mFrames.get(0); + } else { + // concatenate all frames into one byte array + // TODO: this is inefficient. + mMsg = new byte[mLength]; + int offset = 0; + for (byte[] f : mFrames) { + System.arraycopy(f, 0, mMsg, offset, f.length); + offset += f.length; + } + mFrames = null; + } + return true; + } + + /** returns true if the next frame is a DATA frame */ + private boolean getNextFrameBytes(byte[] inBytes) { + // Firstbyte should be 0x00 (for this to be a DATA frame) + int firstByteValue = inBytes[mReadSoFar] | DATA_BYTE; + if (firstByteValue != 0) { + logger.debug("done with DATA bytes"); + return false; + } + + // Next 4 bytes = length of the bytes array starting after the 4 bytes. + int offset = mReadSoFar + 1; + int len = ByteBuffer.wrap(inBytes, offset, 4).getInt(); + + // Empty message is special case. + // TODO: Can this is special handling be removed? + if (len == 0) { + mFrames.add(new byte[0]); + return false; + } + + // Make sure we have enough bytes in the inputstream + int expectedNumBytes = len + 5 + mReadSoFar; + if (inBytes.length < expectedNumBytes) { + logger.warn(String.format("input doesn't have enough bytes. expected: %d, found %d", expectedNumBytes, + inBytes.length)); + return false; + } + + // Read "len" bytes into message + mLength += len; + offset += 4; + byte[] inputBytes = Arrays.copyOfRange(inBytes, offset, len + offset); + mFrames.add(inputBytes); + mReadSoFar += (len + 5); + // we have more frames to process, if there are bytes unprocessed + return inBytes.length > mReadSoFar; + } +} diff --git a/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/MessageFramer.java b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/MessageFramer.java new file mode 100644 index 0000000000..b1345f364f --- /dev/null +++ b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/MessageFramer.java @@ -0,0 +1,43 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.taobao.arthas.grpcweb.proxy; + +/** + * Creates frames from the input bytes. + */ +public class MessageFramer { + public enum Type { + DATA ((byte) 0x00), + TRAILER ((byte) 0x80); + + public final byte value; + Type(byte b) { + value = b; + } + } + + // TODO: handle more than single frame; i.e., input byte array size > (2GB - 1) + public byte[] getPrefix(byte[] in, Type type) { + int len = in.length; + return new byte[] { + type.value, + (byte) ((len >> 24) & 0xff), + (byte) ((len >> 16) & 0xff), + (byte) ((len >> 8) & 0xff), + (byte) ((len >> 0) & 0xff), + }; + } +} diff --git a/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/MessageUtils.java b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/MessageUtils.java new file mode 100644 index 0000000000..c861a2f197 --- /dev/null +++ b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/MessageUtils.java @@ -0,0 +1,82 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.taobao.arthas.grpcweb.proxy; + +import com.google.common.annotations.VisibleForTesting; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +public class MessageUtils { + @VisibleForTesting + public + enum ContentType { + GRPC_WEB_BINARY, GRPC_WEB_TEXT; + } + + private static Map GRPC_GCP_CONTENT_TYPES = new HashMap() { + { + put("application/grpc-web", ContentType.GRPC_WEB_BINARY); + put("application/grpc-web+proto", ContentType.GRPC_WEB_BINARY); + put("application/grpc-web-text", ContentType.GRPC_WEB_TEXT); + put("application/grpc-web-text+proto", ContentType.GRPC_WEB_TEXT); + } + }; + + /** + * Validate the content-type + */ + public static ContentType validateContentType(String contentType) throws IllegalArgumentException { + if (contentType == null || !GRPC_GCP_CONTENT_TYPES.containsKey(contentType)) { + throw new IllegalArgumentException("This content type is not used for grpc-web: " + contentType); + } + return getContentType(contentType); + } + + static ContentType getContentType(String type) { + return GRPC_GCP_CONTENT_TYPES.get(type); + } + + /** + * Find the input arg protobuf class for the given rpc-method. Convert the given + * bytes to the input protobuf. return that. + */ + static Object getInputProtobufObj(Method rpcMethod, byte[] in) { + Class[] inputArgs = rpcMethod.getParameterTypes(); + Class inputArgClass = inputArgs[0]; + + // use the inputArg classtype to create a protobuf object + Method parseFromObj; + try { + parseFromObj = inputArgClass.getMethod("parseFrom", byte[].class); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("Couldn't find method in 'parseFrom' in " + inputArgClass.getName()); + } + + Object inputObj; + try { + inputObj = parseFromObj.invoke(null, in); + } catch (InvocationTargetException | IllegalAccessException e) { + throw new IllegalArgumentException(e); + } + + if (inputObj == null || !inputArgClass.isInstance(inputObj)) { + throw new IllegalArgumentException("Input obj is **not** instance of the correct input class type"); + } + return inputObj; + } +} diff --git a/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/MetadataUtil.java b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/MetadataUtil.java new file mode 100644 index 0000000000..90c4d574a2 --- /dev/null +++ b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/MetadataUtil.java @@ -0,0 +1,80 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.taobao.arthas.grpcweb.proxy; + +import io.grpc.Metadata; +import io.netty.handler.codec.http.HttpHeaders; +import java.util.*; + +class MetadataUtil { + private static final String BINARY_HEADER_SUFFIX = "-bin"; + private static final String GRPC_HEADER_PREFIX = "x-grpc-"; + private static final List EXCLUDED = Arrays.asList("x-grpc-web", "content-type", "grpc-accept-encoding", + "grpc-encoding"); + + static Metadata getHtpHeaders(HttpHeaders headers) { + Metadata httpHeaders = new Metadata(); + + Set headerNames = headers.names(); + if (headerNames == null) { + return httpHeaders; + } + // copy all headers "x-grpc-*" into Metadata + // TODO: do we need to copy all "x-*" headers instead? + for (String headerName : headerNames) { + if (EXCLUDED.contains(headerName.toLowerCase())) { + continue; + } + if (headerName.toLowerCase().startsWith(GRPC_HEADER_PREFIX)) { + // Get all the values of this header. + + List values = headers.getAll(headerName); + if (values != null) { + // Java enumerations have klunky API. lets convert to a list. + // this will be a short list usually. + for (String s : values) { + if (headerName.toLowerCase().endsWith(BINARY_HEADER_SUFFIX)) { + // Binary header + httpHeaders.put(Metadata.Key.of(headerName, Metadata.BINARY_BYTE_MARSHALLER), s.getBytes()); + } else { + // String header + httpHeaders.put(Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER), s); + } + } + } + } + } + return httpHeaders; + } + + static Map getHttpHeadersFromMetadata(Metadata trailer) { + Map map = new HashMap<>(); + for (String key : trailer.keys()) { + if (EXCLUDED.contains(key.toLowerCase())) { + continue; + } + if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { + // TODO allow any object type here + byte[] value = trailer.get(Metadata.Key.of(key, Metadata.BINARY_BYTE_MARSHALLER)); + map.put(key, new String(value)); + } else { + String value = trailer.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)); + map.put(key, value); + } + } + return map; + } +} diff --git a/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/SendGrpcWebResponse.java b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/SendGrpcWebResponse.java new file mode 100644 index 0000000000..0ca26d744c --- /dev/null +++ b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/SendGrpcWebResponse.java @@ -0,0 +1,186 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.taobao.arthas.grpcweb.proxy; + +import com.taobao.arthas.grpcweb.proxy.MessageUtils.ContentType; +import io.grpc.Metadata; +import io.grpc.Status; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.*; +import io.netty.handler.stream.ChunkedStream; +import com.alibaba.arthas.deps.org.slf4j.Logger; +import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.invoke.MethodHandles; +import java.util.Base64; +import java.util.Map; + +/** + *
+ * * https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
+ * * https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
+ * 
+ * 据协议和抓包分析,grpc-web 回应需要以 HTTP chunk数据包,包装 grpc 本身的数据。
+ * 
+ * grpc-web 的 http1.1 Response 由三部分组成:
+ * 1. headers , 返回 status 总是 200
+ * 2. data chunk ,可能多个
+ * 3. trailer chunk , grpc的 grpc-status, grpc-message 在这里
+ * 
+ * 
+ * + * @author hengyunabc 2023-09-06 + * + */ +class SendGrpcWebResponse { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName()); + + private final String contentType; + + /** + * 回应的 http1.1 header 是否已发送 + */ + private boolean isHeaderSent = false; + + /** + * 所有的 grpc message 都会转换为一个 HTTP Chunk,所有的 Chunk 发送完之后,需要发送一个空的 Chunk 结束 + */ + private boolean isEndChunkSent = false; + + /** + * 在 grpc 协议里,在发送完 DATA 后,最后可能发送一个 trailer,它也需要转换为 HTTP Chunk + */ + private boolean isTrailerSent = false; + + private ChannelHandlerContext ctx; + + SendGrpcWebResponse(ChannelHandlerContext ctx, FullHttpRequest req) { + HttpHeaders headers = req.headers(); + contentType = headers.get(HttpHeaderNames.CONTENT_TYPE); + this.ctx = ctx; + } + + synchronized void writeHeaders(Metadata headers) { + if (isHeaderSent) { + return; + } + // 发送 http1.1 开头部分的内容 + DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, contentType).set(HttpHeaderNames.TRANSFER_ENCODING, + "chunked"); + + CorsUtils.updateCorsHeader(response.headers()); + + if (headers != null) { + Map ht = MetadataUtil.getHttpHeadersFromMetadata(headers); + for (String key : ht.keySet()) { + response.headers().set(key, ht.get(key)); + } + } + + logger.debug("write headers: {}", response); + + ctx.writeAndFlush(response); + + isHeaderSent = true; + } + + synchronized void returnUnimplementedStatusCode(String className) { + writeHeaders(null); + writeTrailer( + Status.UNIMPLEMENTED.withDescription("Can not find service impl, check dep, service: " + className), + null); + } + + // 发送最后的 http chunked 空块 + private void writeEndChunk() { + if (isEndChunkSent) { + return; + } + LastHttpContent end = new DefaultLastHttpContent(); + ctx.writeAndFlush(end); + isEndChunkSent = true; + } + + synchronized void writeError(Status s) { + writeHeaders(null); + writeTrailer(s, null); + } + + synchronized void writeTrailer(Status status, Metadata trailer) { + if (isTrailerSent) { + return; + } + StringBuffer sb = new StringBuffer(); + if (trailer != null) { + Map ht = MetadataUtil.getHttpHeadersFromMetadata(trailer); + for (String key : ht.keySet()) { + sb.append(String.format("%s:%s\r\n", key, ht.get(key))); + } + } + sb.append(String.format("grpc-status:%d\r\n", status.getCode().value())); + if (status.getDescription() != null && !status.getDescription().isEmpty()) { + sb.append(String.format("grpc-message:%s\r\n", status.getDescription())); + } + + writeResponse(sb.toString().getBytes(), MessageFramer.Type.TRAILER); + + isTrailerSent = true; + + writeEndChunk(); + } + + synchronized void writeResponse(byte[] out) { + writeResponse(out, MessageFramer.Type.DATA); + } + + private void writeResponse(byte[] out, MessageFramer.Type type) { + if (isTrailerSent) { + logger.error("grpcweb trailer sented, writeResponse can not be called, framer type: {}", type); + return; + } + + try { + // PUNT multiple frames not handled + byte[] prefix = new MessageFramer().getPrefix(out, type); + ByteArrayOutputStream oStream = new ByteArrayOutputStream(); + // binary encode if it is "text" content type + if (MessageUtils.getContentType(contentType) == ContentType.GRPC_WEB_TEXT) { + byte[] concated = new byte[out.length + 5]; + System.arraycopy(prefix, 0, concated, 0, 5); + System.arraycopy(out, 0, concated, 5, out.length); + oStream.write(Base64.getEncoder().encode(concated)); + } else { + oStream.write(prefix); + oStream.write(out); + } + + byte[] byteArray = oStream.toByteArray(); + + InputStream dataStream = new ByteArrayInputStream(byteArray); + ChunkedStream chunkedStream = new ChunkedStream(dataStream); + SingleHttpChunkedInput httpChunkedInput = new SingleHttpChunkedInput(chunkedStream); + ctx.writeAndFlush(httpChunkedInput); + + } catch (IOException e) { + logger.error("write grpcweb response error, framer type: {}", type, e); + } + } + +} diff --git a/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/SingleHttpChunkedInput.java b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/SingleHttpChunkedInput.java new file mode 100644 index 0000000000..25ea947c51 --- /dev/null +++ b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/SingleHttpChunkedInput.java @@ -0,0 +1,100 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.taobao.arthas.grpcweb.proxy; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.stream.ChunkedInput; + +/** + * 和 LastHttpContent 对比,少了 LastHttpContent.EMPTY_LAST_CONTENT + * + * @see LastHttpContent + * @see LastHttpContent#EMPTY_LAST_CONTENT + */ +public class SingleHttpChunkedInput implements ChunkedInput { + + private final ChunkedInput input; + + /** + * Creates a new instance using the specified input. + * @param input {@link ChunkedInput} containing data to write + */ + public SingleHttpChunkedInput(ChunkedInput input) { + this.input = input; +// lastHttpContent = LastHttpContent.EMPTY_LAST_CONTENT; + } + + /** + * Creates a new instance using the specified input. {@code lastHttpContent} will be written as the terminating + * chunk. + * @param input {@link ChunkedInput} containing data to write + * @param lastHttpContent {@link LastHttpContent} that will be written as the terminating chunk. Use this for + * training headers. + */ + public SingleHttpChunkedInput(ChunkedInput input, LastHttpContent lastHttpContent) { + this.input = input; +// this.lastHttpContent = lastHttpContent; + } + + @Override + public boolean isEndOfInput() throws Exception { + if (input.isEndOfInput()) { + // Only end of input after last HTTP chunk has been sent + return true; + } else { + return false; + } + } + + @Override + public void close() throws Exception { + input.close(); + } + + @Deprecated + @Override + public HttpContent readChunk(ChannelHandlerContext ctx) throws Exception { + return readChunk(ctx.alloc()); + } + + @Override + public HttpContent readChunk(ByteBufAllocator allocator) throws Exception { + if (input.isEndOfInput()) { + return null; + } else { + ByteBuf buf = input.readChunk(allocator); + if (buf == null) { + return null; + } + return new DefaultHttpContent(buf); + } + } + + @Override + public long length() { + return input.length(); + } + + @Override + public long progress() { + return input.progress(); + } +} diff --git a/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/server/GrpcWebProxyHandler.java b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/server/GrpcWebProxyHandler.java new file mode 100644 index 0000000000..7964cd0be4 --- /dev/null +++ b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/server/GrpcWebProxyHandler.java @@ -0,0 +1,54 @@ +package com.taobao.arthas.grpcweb.proxy.server; + +import com.taobao.arthas.grpcweb.proxy.GrpcServiceConnectionManager; +import com.taobao.arthas.grpcweb.proxy.GrpcWebRequestHandler; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import com.alibaba.arthas.deps.org.slf4j.Logger; +import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; + +import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +public class GrpcWebProxyHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName()); + private GrpcWebRequestHandler requestHandler; + + private static GrpcServiceConnectionManager manager; + + public GrpcWebProxyHandler(int grpcPort) { + manager = new GrpcServiceConnectionManager(grpcPort); + requestHandler = new GrpcWebRequestHandler(manager); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) { + logger.debug("http request: {} ", request); + + send100Continue(ctx); + requestHandler.handle(ctx, request); + } + + private static void send100Continue(ChannelHandlerContext ctx) { + FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, CONTINUE, Unpooled.EMPTY_BUFFER); + ctx.write(response); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + logger.error("grpc web proxy handler error", cause); + ctx.close(); + } + +} diff --git a/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/server/GrpcWebProxyServer.java b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/server/GrpcWebProxyServer.java new file mode 100644 index 0000000000..e240a0a1c9 --- /dev/null +++ b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/server/GrpcWebProxyServer.java @@ -0,0 +1,73 @@ +package com.taobao.arthas.grpcweb.proxy.server; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import com.alibaba.arthas.deps.org.slf4j.Logger; +import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; + +public final class GrpcWebProxyServer { + + private static final Logger logger = LoggerFactory.getLogger(GrpcWebProxyServer.class); + + + private int port; + + private int grpcPort; + + private EventLoopGroup bossGroup; + + private EventLoopGroup workerGroup; + + private Channel channel; + + + public GrpcWebProxyServer(int port, int grpcPort) { + this.port = port; + this.grpcPort = grpcPort; + bossGroup = new NioEventLoopGroup(1); + workerGroup = new NioEventLoopGroup(); + } + + public void start() { + try { + ServerBootstrap serverBootstrap = new ServerBootstrap(); + serverBootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new GrpcWebProxyServerInitializer(grpcPort)); + channel = serverBootstrap.bind(port).sync().channel(); + + logger.info("grpc web proxy server started, listening on " + port); + System.out.println("grpc web proxy server started, listening on " + port); + channel.closeFuture().sync(); + } catch (InterruptedException e) { + logger.info("fail to start grpc web proxy server!"); + throw new RuntimeException(e); + } finally { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + + public void close() { + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + } + if(workerGroup != null){ + workerGroup.shutdownGracefully(); + } + logger.info("success to close grpc web proxy server!"); + } + + public int actualPort() { + int boundPort = ((InetSocketAddress) channel.localAddress()).getPort(); + return boundPort; + } +} diff --git a/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/server/GrpcWebProxyServerInitializer.java b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/server/GrpcWebProxyServerInitializer.java new file mode 100644 index 0000000000..e01f012f87 --- /dev/null +++ b/arthas-grpc-web-proxy/src/main/java/com/taobao/arthas/grpcweb/proxy/server/GrpcWebProxyServerInitializer.java @@ -0,0 +1,26 @@ +package com.taobao.arthas.grpcweb.proxy.server; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.stream.ChunkedWriteHandler; + +public class GrpcWebProxyServerInitializer extends ChannelInitializer { + + private int grpcPort; + + public GrpcWebProxyServerInitializer(int grpcPort) { + this.grpcPort = grpcPort; + } + + @Override + public void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new HttpServerCodec()); + pipeline.addLast(new HttpObjectAggregator(65536)); + pipeline.addLast(new ChunkedWriteHandler()); + pipeline.addLast(new GrpcWebProxyHandler(grpcPort)); + } +} diff --git a/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/CorsUtilsTest.java b/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/CorsUtilsTest.java new file mode 100644 index 0000000000..2358ab247c --- /dev/null +++ b/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/CorsUtilsTest.java @@ -0,0 +1,16 @@ +package com.taobao.arthas.grpcweb.proxy.server; + +import com.taobao.arthas.grpcweb.proxy.CorsUtils; +import io.netty.handler.codec.http.*; +import org.junit.Test; + + +public class CorsUtilsTest { + + @Test + public void test(){ + DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + CorsUtils.updateCorsHeader(response.headers()); + System.out.println(response.headers()); + } +} diff --git a/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/GrpcWebProxyServerTest.java b/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/GrpcWebProxyServerTest.java new file mode 100644 index 0000000000..b7d217a118 --- /dev/null +++ b/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/GrpcWebProxyServerTest.java @@ -0,0 +1,194 @@ +package com.taobao.arthas.grpcweb.proxy.server; + +import grpc.gateway.testing.Echo; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicHeader; +import org.apache.http.protocol.HTTP; +import org.apache.http.util.EntityUtils; +import com.taobao.arthas.common.SocketUtils; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Base64; + + +public class GrpcWebProxyServerTest { + + private int GRPC_WEB_PROXY_PORT; + private int GRPC_PORT; + private String hostName; + private CloseableHttpClient httpClient; + @Before + public void startServer(){ + GRPC_WEB_PROXY_PORT = SocketUtils.findAvailableTcpPort(); + GRPC_PORT = SocketUtils.findAvailableTcpPort(); + // 启动grpc服务 + Thread grpcStart = new Thread(() -> { + StartGrpcTest startGrpcTest = new StartGrpcTest(GRPC_PORT); + startGrpcTest.startGrpcService(); + }); + grpcStart.start(); + // 启动grpc-web-proxy服务 + Thread grpcWebProxyStart = new Thread(() -> { + StartGrpcWebProxyTest startGrpcWebProxyTest = new StartGrpcWebProxyTest(GRPC_WEB_PROXY_PORT,GRPC_PORT); + startGrpcWebProxyTest.startGrpcWebProxy(); + }); + grpcWebProxyStart.start(); + try { + // waiting for the server to start + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + hostName = "http://127.0.0.1:" + GRPC_WEB_PROXY_PORT; + httpClient = HttpClients.createDefault(); + } + + @Test + public void simpleReqTest() { + // 单个response + String url = hostName +"/grpc.gateway.testing.EchoService/Echo"; + + String requestStr = "hello world!!!"; + Echo.EchoRequest request = Echo.EchoRequest.newBuilder().setMessage(requestStr).build(); + System.out.println("request message--->" + requestStr); + byte[] requestData = request.toByteArray(); + requestData = ByteArrayWithLengthExample(requestData); + // 编码请求载荷为gRPC-Web格式 + String encodedPayload = Base64.getEncoder().encodeToString(requestData); + try { + String result = ""; + String encoding = "utf-8"; + HttpPost httpPost = getPost(url, encodedPayload, encoding); + //发送请求,并拿到结果(同步阻塞) + CloseableHttpResponse response = httpClient.execute(httpPost); + //获取返回结果 + HttpEntity entity = response.getEntity(); + if (entity != null) { + //按指定编码转换结果实体为String类型 + result = EntityUtils.toString(entity, encoding); + } + EntityUtils.consume(entity); + //释放Http请求链接 + response.close(); + + System.out.println("result-->" + result); + System.out.println("after decode..."); + // gAAAAA9ncnBjLXN0YXR1czowDQo= 是结尾字符 + int endStartIndex = result.indexOf("gAAAAA"); + String data = result.substring(0,endStartIndex); + String end = result.substring(endStartIndex,result.length()); + byte[] decodedData = Base64.getDecoder().decode(data); + byte[] decodedEnd = Base64.getDecoder().decode(end); + // 去掉前5个byte + decodedData = RemoveBytesExample(decodedData); + decodedEnd = RemoveBytesExample(decodedEnd); + Echo.EchoResponse echoResponse = Echo.EchoResponse.parseFrom(decodedData); + System.out.println("response message--->" + echoResponse.getMessage()); + String endStr = new String(decodedEnd); + System.out.println(endStr); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + + @Test + public void streamReqTest() { + // stream response + String url = hostName + "/grpc.gateway.testing.EchoService/ServerStreamingEcho"; + String requestStr = "hello world!!!"; + Echo.ServerStreamingEchoRequest request = Echo.ServerStreamingEchoRequest.newBuilder().setMessage(requestStr) + .setMessageCount(5) + .build(); + byte[] requestData = request.toByteArray(); + requestData = ByteArrayWithLengthExample(requestData); + // 编码请求载荷为gRPC-Web格式 + String encodedPayload = Base64.getEncoder().encodeToString(requestData); + try { + String encoding = "utf-8"; + HttpPost httpPost = getPost(url, encodedPayload, encoding); + //发送请求 + CloseableHttpResponse response = httpClient.execute(httpPost); + //获取返回结果 + HttpEntity entity = response.getEntity(); + if (entity != null) { + try (InputStream inputStream = entity.getContent()) { + // 在这里使用 inputStream 流式处理响应内容 + // 例如,逐行读取响应内容 + byte[] buffer = new byte[1024]; + int bytesRead; + while ((bytesRead = inputStream.read(buffer)) != -1) { + // 处理读取的数据 + String result = new String(buffer, 0, bytesRead); + System.out.println("result-->" + result); + System.out.println("after decode..."); + // gAAAAA9ncnBjLXN0YXR1czowDQo= 是结尾字符 + + byte[] decodedData = Base64.getDecoder().decode(result); + // 去掉前5个byte + decodedData = RemoveBytesExample(decodedData); + if(result.startsWith("gAAAAA")){ + String end = new String(decodedData); + System.out.println(end); + }else { + Echo.ServerStreamingEchoResponse echoResponse = Echo.ServerStreamingEchoResponse.parseFrom(decodedData); + System.out.println("response message--->" + echoResponse.getMessage()); + } + } + } + } + EntityUtils.consume(entity); + //释放Http请求链接 + response.close(); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + public HttpPost getPost(String url, String param, String encoding) throws IOException { + System.out.println("request param(encode)--->" + param); + //创建post方式请求对象 + HttpPost httpPost = new HttpPost (url); + //设置请求参数实体 + StringEntity reqParam = new StringEntity(param,encoding); + reqParam.setContentEncoding(new BasicHeader(HTTP.CONTENT_TYPE, "application/grpc-web-text")); +// 将请求参数放到请求对象中 + httpPost.setEntity(reqParam); + //设置请求报文头信息 + httpPost.setHeader("Connection","keep-alive"); + httpPost.setHeader("Accept", "application/grpc-web-text"); + httpPost.setHeader("Content-type", "application/grpc-web-text");//设置发送表单请求 + httpPost.setHeader("X-Grpc-Web","1"); + httpPost.setHeader("X-User-Agent", "grpc-web-javascript/0.1"); + httpPost.setHeader("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"); + return httpPost; + } + + public byte[] ByteArrayWithLengthExample(byte[] data){ + // 添加长度信息,用于编码过程 + int length = data.length; + byte[] newData = {0,0,0,0,(byte) length}; + byte[] combineArray = new byte[newData.length + data.length]; + System.arraycopy(newData, 0, combineArray, 0, newData.length); + System.arraycopy(data, 0, combineArray, newData.length, data.length); + return combineArray; + } + + public byte[] RemoveBytesExample(byte[] data){ + // 去掉长度信息,用于解码过程 + byte[] result = Arrays.copyOfRange(data, 5, data.length); + return result; + } + +} diff --git a/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/MessageDeframerTest.java b/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/MessageDeframerTest.java new file mode 100644 index 0000000000..6388b3fd24 --- /dev/null +++ b/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/MessageDeframerTest.java @@ -0,0 +1,33 @@ +package com.taobao.arthas.grpcweb.proxy.server; + +import com.taobao.arthas.grpcweb.proxy.MessageDeframer; +import com.taobao.arthas.grpcweb.proxy.MessageUtils; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.Unpooled; +import io.netty.util.CharsetUtil; +import org.apache.http.entity.StringEntity; +import org.apache.http.message.BasicHeader; +import org.apache.http.protocol.HTTP; +import org.junit.Assert; +import org.junit.Test; + +import java.io.InputStream; +import java.util.Arrays; + +public class MessageDeframerTest { + + @Test + public void testProcessInput(){ + String str = "AAAAAAcKBWhlbGxv"; + ByteBuf content = Unpooled.copiedBuffer(str, CharsetUtil.UTF_8); + InputStream in = new ByteBufInputStream(content); + String contentTypeStr = "application/grpc-web-text"; + MessageUtils.ContentType contentType = MessageUtils.validateContentType(contentTypeStr); + MessageDeframer deframer = new MessageDeframer(); + + boolean result = deframer.processInput(in, contentType); + + Assert.assertTrue(result); + } +} diff --git a/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/MessageUtilsTest.java b/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/MessageUtilsTest.java new file mode 100644 index 0000000000..d7268fd938 --- /dev/null +++ b/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/MessageUtilsTest.java @@ -0,0 +1,33 @@ +package com.taobao.arthas.grpcweb.proxy.server; + +import com.taobao.arthas.grpcweb.proxy.MessageUtils; +import org.junit.Assert; +import org.junit.Test; + +public class MessageUtilsTest { + + @Test + public void testValidateContentType(){ + String contentType1 = "application/grpc-web"; + MessageUtils.ContentType result1 = MessageUtils.validateContentType(contentType1); + String contentType2 = "application/grpc-web+proto"; + MessageUtils.ContentType result2 = MessageUtils.validateContentType(contentType2); + String contentType3 = "application/grpc-web-text"; + MessageUtils.ContentType result3 = MessageUtils.validateContentType(contentType3); + String contentType4 = "application/grpc-web-text+proto"; + MessageUtils.ContentType result4 = MessageUtils.validateContentType(contentType4); + MessageUtils.ContentType result5 = MessageUtils.ContentType.GRPC_WEB_BINARY; + try { + String contentType5 = null; + result5 = MessageUtils.validateContentType(contentType5); + }catch (IllegalArgumentException e){ + result5 = null; + } + + Assert.assertEquals(result1,MessageUtils.ContentType.GRPC_WEB_BINARY); + Assert.assertEquals(result2,MessageUtils.ContentType.GRPC_WEB_BINARY); + Assert.assertEquals(result3,MessageUtils.ContentType.GRPC_WEB_TEXT); + Assert.assertEquals(result4,MessageUtils.ContentType.GRPC_WEB_TEXT); + Assert.assertNull(result5); + } +} diff --git a/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/StartGrpcTest.java b/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/StartGrpcTest.java new file mode 100644 index 0000000000..1589a8928d --- /dev/null +++ b/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/StartGrpcTest.java @@ -0,0 +1,32 @@ +package com.taobao.arthas.grpcweb.proxy.server; + +import com.taobao.arthas.grpcweb.proxy.server.grpcService.EchoImpl; +import com.taobao.arthas.grpcweb.proxy.server.grpcService.GreeterService; +import com.taobao.arthas.grpcweb.proxy.server.grpcService.HelloImpl; +import io.grpc.BindableService; +import io.grpc.Server; +import io.grpc.ServerBuilder; + +import java.io.IOException; + +public class StartGrpcTest { + + private int GRPC_PORT; + + public StartGrpcTest(int grpcPort){ + this.GRPC_PORT = grpcPort; + } + + public void startGrpcService(){ + try { + Server grpcServer = ServerBuilder.forPort(GRPC_PORT).addService((BindableService) new GreeterService()) + .addService((BindableService) new HelloImpl()).addService(new EchoImpl()).build(); + grpcServer.start(); + System.out.println("started gRPC server on port # " + GRPC_PORT); + System.in.read(); + } catch (IOException e) { + System.out.println("fail to start gRPC server"); + throw new RuntimeException(e); + } + } +} diff --git a/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/StartGrpcWebProxyTest.java b/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/StartGrpcWebProxyTest.java new file mode 100644 index 0000000000..5f15996404 --- /dev/null +++ b/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/StartGrpcWebProxyTest.java @@ -0,0 +1,19 @@ +package com.taobao.arthas.grpcweb.proxy.server; + +public class StartGrpcWebProxyTest { + + private int GRPC_WEB_PROXY_PORT; + + private int GRPC_PORT; + + + public StartGrpcWebProxyTest(int grpcWebPort, int grpcPort){ + this.GRPC_WEB_PROXY_PORT = grpcWebPort; + this.GRPC_PORT = grpcPort; + } + + public void startGrpcWebProxy(){ + GrpcWebProxyServer grpcWebProxyServer = new GrpcWebProxyServer(GRPC_WEB_PROXY_PORT, GRPC_PORT); + grpcWebProxyServer.start(); + } +} diff --git a/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/grpcService/EchoImpl.java b/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/grpcService/EchoImpl.java new file mode 100644 index 0000000000..04202939fc --- /dev/null +++ b/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/grpcService/EchoImpl.java @@ -0,0 +1,81 @@ +package com.taobao.arthas.grpcweb.proxy.server.grpcService; + +import grpc.gateway.testing.Echo.*; +import grpc.gateway.testing.EchoServiceGrpc.EchoServiceImplBase; +import io.grpc.Metadata; +import io.grpc.Metadata.Key; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; + +public class EchoImpl extends EchoServiceImplBase { + + @Override + public void echo(EchoRequest request, StreamObserver responseObserver) { + String message = request.getMessage(); + responseObserver.onNext(EchoResponse.newBuilder().setMessage(message).setMessageCount(1).build()); + responseObserver.onCompleted(); + } + + @Override + public void echoAbort(EchoRequest request, StreamObserver responseObserver) { + // TODO Auto-generated method stub + + responseObserver.onNext(EchoResponse.newBuilder().setMessage(request.getMessage()).build()); + Metadata trailers = new Metadata(); + Key customKey = Key.of("custom-key", Metadata.ASCII_STRING_MARSHALLER); + // 添加自定义元数据 + trailers.put(customKey, "custom-value"); + responseObserver.onError(Status.ABORTED.withDescription("error desc").asException(trailers)); + } + + @Override + public void noOp(Empty request, StreamObserver responseObserver) { + // TODO Auto-generated method stub + super.noOp(request, responseObserver); + } + + @Override + public void serverStreamingEcho(ServerStreamingEchoRequest request, + StreamObserver responseObserver) { + + String message = request.getMessage(); + + int messageCount = request.getMessageCount(); + + System.err.println(message); + + for (int i = 0; i < messageCount; ++i) { + responseObserver.onNext(ServerStreamingEchoResponse.newBuilder().setMessage(message).build()); + } + + responseObserver.onCompleted(); + + } + + @Override + public void serverStreamingEchoAbort(ServerStreamingEchoRequest request, + StreamObserver responseObserver) { + // TODO Auto-generated method stub + super.serverStreamingEchoAbort(request, responseObserver); + } + + @Override + public StreamObserver clientStreamingEcho( + StreamObserver responseObserver) { + // TODO Auto-generated method stub + return super.clientStreamingEcho(responseObserver); + } + + @Override + public StreamObserver fullDuplexEcho(StreamObserver responseObserver) { + // TODO Auto-generated method stub + return super.fullDuplexEcho(responseObserver); + } + + @Override + public StreamObserver halfDuplexEcho(StreamObserver responseObserver) { + // TODO Auto-generated method stub + return super.halfDuplexEcho(responseObserver); + } + +} diff --git a/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/grpcService/GreeterService.java b/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/grpcService/GreeterService.java new file mode 100644 index 0000000000..461adaf7c1 --- /dev/null +++ b/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/grpcService/GreeterService.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.taobao.arthas.grpcweb.proxy.server.grpcService; + +import grpcweb.examples.greeter.GreeterGrpc; +import grpcweb.examples.greeter.GreeterOuterClass.HelloReply; +import grpcweb.examples.greeter.GreeterOuterClass.HelloRequest; +import io.grpc.stub.StreamObserver; + +public class GreeterService extends GreeterGrpc.GreeterImplBase { + @Override + public void sayHello(HelloRequest req, StreamObserver responseObserver) { + System.out.println("Greeter Service responding in sayhello() method"); + +// throw new RuntimeException("xxxxxx"); + HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } +} diff --git a/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/grpcService/HelloImpl.java b/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/grpcService/HelloImpl.java new file mode 100644 index 0000000000..c7bc26a915 --- /dev/null +++ b/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/grpcService/HelloImpl.java @@ -0,0 +1,40 @@ +package com.taobao.arthas.grpcweb.proxy.server.grpcService; + +import helloworld.GreeterGrpc.GreeterImplBase; +import helloworld.Helloworld.HelloReply; +import helloworld.Helloworld.HelloRequest; +import helloworld.Helloworld.RepeatHelloRequest; +import io.grpc.stub.StreamObserver; + +public class HelloImpl extends GreeterImplBase{ + + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + // TODO Auto-generated method stub +// super.sayHello(request, responseObserver); + + System.err.println("sayHello"); + +// throw new RuntimeException("eeee"); + + responseObserver.onNext(HelloReply.newBuilder().setMessage("xxxx").build()); + + responseObserver.onCompleted(); + } + + @Override + public void sayRepeatHello(RepeatHelloRequest request, StreamObserver responseObserver) { + // TODO Auto-generated method stub +// super.sayRepeatHello(request, responseObserver); + + System.err.println("sayRepeatHello eeee "); + +// throw new RuntimeException("eeee"); + + responseObserver.onNext(HelloReply.newBuilder().setMessage("xxxx").build()); + + responseObserver.onCompleted(); + } + + +} diff --git a/arthas-grpc-web-proxy/src/test/proto/echo.proto b/arthas-grpc-web-proxy/src/test/proto/echo.proto new file mode 100644 index 0000000000..60171d0f4b --- /dev/null +++ b/arthas-grpc-web-proxy/src/test/proto/echo.proto @@ -0,0 +1,100 @@ +// Copyright 2018 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package grpc.gateway.testing; + +message Empty {} + +message EchoRequest { + string message = 1; +} + +message EchoResponse { + string message = 1; + int32 message_count = 2; +} + +// Request type for server side streaming echo. +message ServerStreamingEchoRequest { + // Message string for server streaming request. + string message = 1; + + // The total number of messages to be generated before the server + // closes the stream; default is 10. + int32 message_count = 2; + + // The interval (ms) between two server messages. The server implementation + // may enforce some minimum interval (e.g. 100ms) to avoid message overflow. + int32 message_interval = 3; +} + +// Response type for server streaming response. +message ServerStreamingEchoResponse { + // Response message. + string message = 1; +} + +// Request type for client side streaming echo. +message ClientStreamingEchoRequest { + // A special value "" indicates that there's no further messages. + string message = 1; +} + +// Response type for client side streaming echo. +message ClientStreamingEchoResponse { + // Total number of client messages that have been received. + int32 message_count = 1; +} + +// A simple echo service. +service EchoService { + // One request followed by one response + // The server returns the client message as-is. + rpc Echo(EchoRequest) returns (EchoResponse); + + // Sends back abort status. + rpc EchoAbort(EchoRequest) returns (EchoResponse) {} + + // One empty request, ZERO processing, followed by one empty response + // (minimum effort to do message serialization). + rpc NoOp(Empty) returns (Empty); + + // One request followed by a sequence of responses (streamed download). + // The server will return the same client message repeatedly. + rpc ServerStreamingEcho(ServerStreamingEchoRequest) + returns (stream ServerStreamingEchoResponse); + + // One request followed by a sequence of responses (streamed download). + // The server abort directly. + rpc ServerStreamingEchoAbort(ServerStreamingEchoRequest) + returns (stream ServerStreamingEchoResponse) {} + + // A sequence of requests followed by one response (streamed upload). + // The server returns the total number of messages as the result. + rpc ClientStreamingEcho(stream ClientStreamingEchoRequest) + returns (ClientStreamingEchoResponse); + + // A sequence of requests with each message echoed by the server immediately. + // The server returns the same client messages in order. + // E.g. this is how the speech API works. + rpc FullDuplexEcho(stream EchoRequest) returns (stream EchoResponse); + + // A sequence of requests followed by a sequence of responses. + // The server buffers all the client messages and then returns the same + // client messages one by one after the client half-closes the stream. + // This is how an image recognition API may work. + rpc HalfDuplexEcho(stream EchoRequest) returns (stream EchoResponse); +} diff --git a/arthas-grpc-web-proxy/src/test/proto/greeter.proto b/arthas-grpc-web-proxy/src/test/proto/greeter.proto new file mode 100644 index 0000000000..b0e9cbcb7b --- /dev/null +++ b/arthas-grpc-web-proxy/src/test/proto/greeter.proto @@ -0,0 +1,45 @@ +// Copyright 2020 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// ======================================= +// +// DO NOT EDIT +// this is copy of +// https://github.com/grpc/grpc-web/blob/master/net/grpc/gateway/ +// examples/helloworld/helloworld.proto +// +// TODO: can the original be directly used without making copy here +// ======================================= + +syntax = "proto3"; + +option java_package = "grpcweb.examples.greeter"; + +package grpcweb.examples.greeter; + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} diff --git a/arthas-grpc-web-proxy/src/test/proto/helloworld.proto b/arthas-grpc-web-proxy/src/test/proto/helloworld.proto new file mode 100644 index 0000000000..3dc2a0435e --- /dev/null +++ b/arthas-grpc-web-proxy/src/test/proto/helloworld.proto @@ -0,0 +1,37 @@ +// Copyright 2018 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package helloworld; + +service Greeter { + // unary call + rpc SayHello(HelloRequest) returns (HelloReply); + // server streaming call + rpc SayRepeatHello(RepeatHelloRequest) returns (stream HelloReply); +} + +message HelloRequest { + string name = 1; +} + +message RepeatHelloRequest { + string name = 1; + int32 count = 2; +} + +message HelloReply { + string message = 1; +} diff --git a/pom.xml b/pom.xml index 15a6aa87a6..7a4ed0b1ab 100644 --- a/pom.xml +++ b/pom.xml @@ -75,6 +75,7 @@ testcase site packaging + arthas-grpc-web-proxy