From 32ae65ed55150131a6889e6661de5684b133b5da Mon Sep 17 00:00:00 2001 From: Matt Oberle Date: Wed, 21 Jun 2023 08:30:35 -0400 Subject: [PATCH] fix(grpc): Allow gRPC connections via Unix socket (#1833) * fix(grpc): Allow gRPC connections via Unix socket This commit addresses issue #1832. The way `NET_PEER_IP` and `NET_PEER_PORT` are retrieved raises a `ValueError` when gRPC connections are handled via Unix sockets. ```py ip, port = ( context.peer().split(",")[0].split(":", 1)[1].rsplit(":", 1) ) ``` When using an address like `unix:///tmp/grpc.sock` the value of `context.peer()` is `"unix:"`. Substituting that in the function above... ```py ip, port = "unix:".split(",")[0].split(":", 1)[1].rsplit(":", 1) ip, port = ["unix:"][0].split(":", 1)[1].rsplit(":", 1) ip, port = "unix:".split(":", 1)[1].rsplit(":", 1) ip, port = ["unix", ""][1].rsplit(":", 1) ip, port = "".rsplit(":", 1) ip, port = [""] # ValueError ``` I "addressed" the issue by guarding the retrieval of `net.peer.*` values under an `if` statement that checks if we are using a Unix socket. I extended the `server_interceptor` tests to run against TCP and Unix socket configurations. --- **Open Questions** - [ ] The socket tests will fail on Windows. Is there a way to annotate that? - [ ] Are there other span values we should be setting for the unix socket? * Update CHANGELOG * Add placeholder attributes for linter * fix lint --------- Co-authored-by: Matt Oberle Co-authored-by: Shalev Roda <65566801+shalevr@users.noreply.github.com> --- CHANGELOG.md | 2 + .../instrumentation/grpc/_server.py | 38 ++-- .../tests/test_server_interceptor.py | 188 ++++++++---------- 3 files changed, 107 insertions(+), 121 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c83443dc02..ad6f969aa1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1679](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1679)) - `opentelemetry-instrumentation-asgi` Add `http.server.response.size` metric ([#1789](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1789)) +- `opentelemetry-instrumentation-grpc` Allow gRPC connections via Unix socket + ([#1833](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1833)) ## Version 1.18.0/0.39b0 (2023-05-10) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index a34cac0b3c..dcee959b4d 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -250,24 +250,30 @@ def _start_span( # * ipv4:127.0.0.1:57284 # * ipv4:10.2.1.1:57284,127.0.0.1:57284 # - try: - ip, port = ( - context.peer().split(",")[0].split(":", 1)[1].rsplit(":", 1) - ) - ip = unquote(ip) - attributes.update( - { - SpanAttributes.NET_PEER_IP: ip, - SpanAttributes.NET_PEER_PORT: port, - } - ) + if context.peer() != "unix:": + try: + ip, port = ( + context.peer() + .split(",")[0] + .split(":", 1)[1] + .rsplit(":", 1) + ) + ip = unquote(ip) + attributes.update( + { + SpanAttributes.NET_PEER_IP: ip, + SpanAttributes.NET_PEER_PORT: port, + } + ) - # other telemetry sources add this, so we will too - if ip in ("[::1]", "127.0.0.1"): - attributes[SpanAttributes.NET_PEER_NAME] = "localhost" + # other telemetry sources add this, so we will too + if ip in ("[::1]", "127.0.0.1"): + attributes[SpanAttributes.NET_PEER_NAME] = "localhost" - except IndexError: - logger.warning("Failed to parse peer address '%s'", context.peer()) + except IndexError: + logger.warning( + "Failed to parse peer address '%s'", context.peer() + ) return self._tracer.start_as_current_span( name=handler_call_details.method, diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py index b48d887f5a..57f27c89d6 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py @@ -15,6 +15,8 @@ # pylint:disable=unused-argument # pylint:disable=no-self-use +import contextlib +import tempfile import threading from concurrent import futures @@ -78,23 +80,32 @@ def ServerStreamingMethod(self, request, context): class TestOpenTelemetryServerInterceptor(TestBase): - def test_instrumentor(self): - def handler(request, context): - return b"" - - grpc_server_instrumentor = GrpcInstrumentorServer() - grpc_server_instrumentor.instrument() - with futures.ThreadPoolExecutor(max_workers=1) as executor: + net_peer_span_attributes = { + SpanAttributes.NET_PEER_IP: "[::1]", + SpanAttributes.NET_PEER_NAME: "localhost", + } + + @contextlib.contextmanager + def server(self, max_workers=1, interceptors=None): + with futures.ThreadPoolExecutor(max_workers=max_workers) as executor: server = grpc.server( executor, options=(("grpc.so_reuseport", 0),), + interceptors=interceptors or [], ) - server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) - port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") + yield server, channel + + def test_instrumentor(self): + def handler(request, context): + return b"" + grpc_server_instrumentor = GrpcInstrumentorServer() + grpc_server_instrumentor.instrument() + with self.server(max_workers=1) as (server, channel): + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) rpc_call = "TestServicer/handler" try: server.start() @@ -117,8 +128,7 @@ def handler(request, context): self.assertSpanHasAttributes( span, { - SpanAttributes.NET_PEER_IP: "[::1]", - SpanAttributes.NET_PEER_NAME: "localhost", + **self.net_peer_span_attributes, SpanAttributes.RPC_METHOD: "handler", SpanAttributes.RPC_SERVICE: "TestServicer", SpanAttributes.RPC_SYSTEM: "grpc", @@ -137,17 +147,8 @@ def handler(request, context): grpc_server_instrumentor = GrpcInstrumentorServer() grpc_server_instrumentor.instrument() grpc_server_instrumentor.uninstrument() - with futures.ThreadPoolExecutor(max_workers=1) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - ) - + with self.server(max_workers=1) as (server, channel): server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) - - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") - rpc_call = "TestServicer/test" try: server.start() @@ -164,15 +165,11 @@ def test_create_span(self): # Intercept gRPC calls... interceptor = server_interceptor() - with futures.ThreadPoolExecutor(max_workers=1) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - interceptors=[interceptor], - ) + with self.server( + max_workers=1, + interceptors=[interceptor], + ) as (server, channel): add_GRPCTestServerServicer_to_server(Servicer(), server) - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") rpc_call = "/GRPCTestServer/SimpleMethod" request = Request(client_id=1, request_data="test") @@ -199,8 +196,7 @@ def test_create_span(self): self.assertSpanHasAttributes( span, { - SpanAttributes.NET_PEER_IP: "[::1]", - SpanAttributes.NET_PEER_NAME: "localhost", + **self.net_peer_span_attributes, SpanAttributes.RPC_METHOD: "SimpleMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", @@ -231,15 +227,11 @@ def SimpleMethod(self, request, context): interceptor = server_interceptor() # setup the server - with futures.ThreadPoolExecutor(max_workers=1) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - interceptors=[interceptor], - ) + with self.server( + max_workers=1, + interceptors=[interceptor], + ) as (server, channel): add_GRPCTestServerServicer_to_server(TwoSpanServicer(), server) - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") # setup the RPC rpc_call = "/GRPCTestServer/SimpleMethod" @@ -268,8 +260,7 @@ def SimpleMethod(self, request, context): self.assertSpanHasAttributes( parent_span, { - SpanAttributes.NET_PEER_IP: "[::1]", - SpanAttributes.NET_PEER_NAME: "localhost", + **self.net_peer_span_attributes, SpanAttributes.RPC_METHOD: "SimpleMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", @@ -292,15 +283,11 @@ def test_create_span_streaming(self): # Intercept gRPC calls... interceptor = server_interceptor() - with futures.ThreadPoolExecutor(max_workers=1) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - interceptors=[interceptor], - ) + with self.server( + max_workers=1, + interceptors=[interceptor], + ) as (server, channel): add_GRPCTestServerServicer_to_server(Servicer(), server) - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") # setup the RPC rpc_call = "/GRPCTestServer/ServerStreamingMethod" @@ -328,8 +315,7 @@ def test_create_span_streaming(self): self.assertSpanHasAttributes( span, { - SpanAttributes.NET_PEER_IP: "[::1]", - SpanAttributes.NET_PEER_NAME: "localhost", + **self.net_peer_span_attributes, SpanAttributes.RPC_METHOD: "ServerStreamingMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", @@ -360,15 +346,11 @@ def ServerStreamingMethod(self, request, context): # Intercept gRPC calls... interceptor = server_interceptor() - with futures.ThreadPoolExecutor(max_workers=1) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - interceptors=[interceptor], - ) + with self.server( + max_workers=1, + interceptors=[interceptor], + ) as (server, channel): add_GRPCTestServerServicer_to_server(TwoSpanServicer(), server) - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") # setup the RPC rpc_call = "/GRPCTestServer/ServerStreamingMethod" @@ -397,8 +379,7 @@ def ServerStreamingMethod(self, request, context): self.assertSpanHasAttributes( parent_span, { - SpanAttributes.NET_PEER_IP: "[::1]", - SpanAttributes.NET_PEER_NAME: "localhost", + **self.net_peer_span_attributes, SpanAttributes.RPC_METHOD: "ServerStreamingMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", @@ -427,17 +408,12 @@ def handler(request, context): active_span_in_handler = trace.get_current_span() return b"" - with futures.ThreadPoolExecutor(max_workers=1) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - interceptors=[interceptor], - ) + with self.server( + max_workers=1, + interceptors=[interceptor], + ) as (server, channel): server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") - active_span_before_call = trace.get_current_span() try: server.start() @@ -463,17 +439,12 @@ def handler(request, context): active_spans_in_handler.append(trace.get_current_span()) return b"" - with futures.ThreadPoolExecutor(max_workers=1) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - interceptors=[interceptor], - ) + with self.server( + max_workers=1, + interceptors=[interceptor], + ) as (server, channel): server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") - try: server.start() channel.unary_unary("TestServicer/handler")(b"") @@ -496,8 +467,7 @@ def handler(request, context): self.assertSpanHasAttributes( span, { - SpanAttributes.NET_PEER_IP: "[::1]", - SpanAttributes.NET_PEER_NAME: "localhost", + **self.net_peer_span_attributes, SpanAttributes.RPC_METHOD: "handler", SpanAttributes.RPC_SERVICE: "TestServicer", SpanAttributes.RPC_SYSTEM: "grpc", @@ -527,17 +497,12 @@ def handler(request, context): active_spans_in_handler.append(trace.get_current_span()) return b"" - with futures.ThreadPoolExecutor(max_workers=2) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - interceptors=[interceptor], - ) + with self.server( + max_workers=2, + interceptors=[interceptor], + ) as (server, channel): server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") - try: server.start() # Interleave calls so spans are active on each thread at the same @@ -568,8 +533,7 @@ def handler(request, context): self.assertSpanHasAttributes( span, { - SpanAttributes.NET_PEER_IP: "[::1]", - SpanAttributes.NET_PEER_NAME: "localhost", + **self.net_peer_span_attributes, SpanAttributes.RPC_METHOD: "handler", SpanAttributes.RPC_SERVICE: "TestServicer", SpanAttributes.RPC_SYSTEM: "grpc", @@ -592,18 +556,11 @@ def test_abort(self): def handler(request, context): context.abort(grpc.StatusCode.FAILED_PRECONDITION, failure_message) - with futures.ThreadPoolExecutor(max_workers=1) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - interceptors=[interceptor], - ) - + with self.server( + max_workers=1, + interceptors=[interceptor], + ) as (server, channel): server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) - - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") - rpc_call = "TestServicer/handler" server.start() @@ -635,8 +592,7 @@ def handler(request, context): self.assertSpanHasAttributes( span, { - SpanAttributes.NET_PEER_IP: "[::1]", - SpanAttributes.NET_PEER_NAME: "localhost", + **self.net_peer_span_attributes, SpanAttributes.RPC_METHOD: "handler", SpanAttributes.RPC_SERVICE: "TestServicer", SpanAttributes.RPC_SYSTEM: "grpc", @@ -647,6 +603,28 @@ def handler(request, context): ) +class TestOpenTelemetryServerInterceptorUnix( + TestOpenTelemetryServerInterceptor, +): + net_peer_span_attributes = {} + + @contextlib.contextmanager + def server(self, max_workers=1, interceptors=None): + with futures.ThreadPoolExecutor( + max_workers=max_workers + ) as executor, tempfile.TemporaryDirectory() as tmp: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=interceptors or [], + ) + + sock = f"unix://{tmp}/grpc.sock" + server.add_insecure_port(sock) + channel = grpc.insecure_channel(sock) + yield server, channel + + def get_latch(num): """Get a countdown latch function for use in n threads.""" cv = threading.Condition()