From f6eae8fd809ede807add253961cfb8dc700a6e3c Mon Sep 17 00:00:00 2001 From: Matt Oberle Date: Thu, 1 Jun 2023 12:46:41 -0400 Subject: [PATCH 1/4] fix(grpc): Allow gRPC connections via Unix socket This commit addresses issue #1832. The way `NET_PEER_IP` and `NET_PEER_PORT` are retrieved raises a `ValueError` when gRPC connections are handled via Unix sockets. ```py ip, port = ( context.peer().split(",")[0].split(":", 1)[1].rsplit(":", 1) ) ``` When using an address like `unix:///tmp/grpc.sock` the value of `context.peer()` is `"unix:"`. Substituting that in the function above... ```py ip, port = "unix:".split(",")[0].split(":", 1)[1].rsplit(":", 1) ip, port = ["unix:"][0].split(":", 1)[1].rsplit(":", 1) ip, port = "unix:".split(":", 1)[1].rsplit(":", 1) ip, port = ["unix", ""][1].rsplit(":", 1) ip, port = "".rsplit(":", 1) ip, port = [""] # ValueError ``` I "addressed" the issue by guarding the retrieval of `net.peer.*` values under an `if` statement that checks if we are using a Unix socket. I extended the `server_interceptor` tests to run against TCP and Unix socket configurations. --- **Open Questions** - [ ] The socket tests will fail on Windows. Is there a way to annotate that? - [ ] Are there other span values we should be setting for the unix socket? --- .../instrumentation/grpc/_server.py | 38 ++-- .../tests/test_server_interceptor.py | 198 ++++++++---------- 2 files changed, 113 insertions(+), 123 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index a34cac0b3c..dcee959b4d 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -250,24 +250,30 @@ def _start_span( # * ipv4:127.0.0.1:57284 # * ipv4:10.2.1.1:57284,127.0.0.1:57284 # - try: - ip, port = ( - context.peer().split(",")[0].split(":", 1)[1].rsplit(":", 1) - ) - ip = unquote(ip) - attributes.update( - { - SpanAttributes.NET_PEER_IP: ip, - SpanAttributes.NET_PEER_PORT: port, - } - ) + if context.peer() != "unix:": + try: + ip, port = ( + context.peer() + .split(",")[0] + .split(":", 1)[1] + .rsplit(":", 1) + ) + ip = unquote(ip) + attributes.update( + { + SpanAttributes.NET_PEER_IP: ip, + SpanAttributes.NET_PEER_PORT: port, + } + ) - # other telemetry sources add this, so we will too - if ip in ("[::1]", "127.0.0.1"): - attributes[SpanAttributes.NET_PEER_NAME] = "localhost" + # other telemetry sources add this, so we will too + if ip in ("[::1]", "127.0.0.1"): + attributes[SpanAttributes.NET_PEER_NAME] = "localhost" - except IndexError: - logger.warning("Failed to parse peer address '%s'", context.peer()) + except IndexError: + logger.warning( + "Failed to parse peer address '%s'", context.peer() + ) return self._tracer.start_as_current_span( name=handler_call_details.method, diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py index b48d887f5a..7192e462a7 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py @@ -15,6 +15,8 @@ # pylint:disable=unused-argument # pylint:disable=no-self-use +import contextlib +import tempfile import threading from concurrent import futures @@ -77,24 +79,15 @@ def ServerStreamingMethod(self, request, context): ) -class TestOpenTelemetryServerInterceptor(TestBase): +class OpenTelemetryServerInterceptorBase: def test_instrumentor(self): def handler(request, context): return b"" grpc_server_instrumentor = GrpcInstrumentorServer() grpc_server_instrumentor.instrument() - with futures.ThreadPoolExecutor(max_workers=1) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - ) - + with self.server(max_workers=1) as (server, channel): server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) - - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") - rpc_call = "TestServicer/handler" try: server.start() @@ -117,8 +110,7 @@ def handler(request, context): self.assertSpanHasAttributes( span, { - SpanAttributes.NET_PEER_IP: "[::1]", - SpanAttributes.NET_PEER_NAME: "localhost", + **self.net_peer_span_attributes, SpanAttributes.RPC_METHOD: "handler", SpanAttributes.RPC_SERVICE: "TestServicer", SpanAttributes.RPC_SYSTEM: "grpc", @@ -137,17 +129,8 @@ def handler(request, context): grpc_server_instrumentor = GrpcInstrumentorServer() grpc_server_instrumentor.instrument() grpc_server_instrumentor.uninstrument() - with futures.ThreadPoolExecutor(max_workers=1) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - ) - + with self.server(max_workers=1) as (server, channel): server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) - - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") - rpc_call = "TestServicer/test" try: server.start() @@ -164,15 +147,11 @@ def test_create_span(self): # Intercept gRPC calls... interceptor = server_interceptor() - with futures.ThreadPoolExecutor(max_workers=1) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - interceptors=[interceptor], - ) + with self.server( + max_workers=1, + interceptors=[interceptor], + ) as (server, channel): add_GRPCTestServerServicer_to_server(Servicer(), server) - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") rpc_call = "/GRPCTestServer/SimpleMethod" request = Request(client_id=1, request_data="test") @@ -199,8 +178,7 @@ def test_create_span(self): self.assertSpanHasAttributes( span, { - SpanAttributes.NET_PEER_IP: "[::1]", - SpanAttributes.NET_PEER_NAME: "localhost", + **self.net_peer_span_attributes, SpanAttributes.RPC_METHOD: "SimpleMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", @@ -231,15 +209,11 @@ def SimpleMethod(self, request, context): interceptor = server_interceptor() # setup the server - with futures.ThreadPoolExecutor(max_workers=1) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - interceptors=[interceptor], - ) + with self.server( + max_workers=1, + interceptors=[interceptor], + ) as (server, channel): add_GRPCTestServerServicer_to_server(TwoSpanServicer(), server) - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") # setup the RPC rpc_call = "/GRPCTestServer/SimpleMethod" @@ -268,8 +242,7 @@ def SimpleMethod(self, request, context): self.assertSpanHasAttributes( parent_span, { - SpanAttributes.NET_PEER_IP: "[::1]", - SpanAttributes.NET_PEER_NAME: "localhost", + **self.net_peer_span_attributes, SpanAttributes.RPC_METHOD: "SimpleMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", @@ -292,15 +265,11 @@ def test_create_span_streaming(self): # Intercept gRPC calls... interceptor = server_interceptor() - with futures.ThreadPoolExecutor(max_workers=1) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - interceptors=[interceptor], - ) + with self.server( + max_workers=1, + interceptors=[interceptor], + ) as (server, channel): add_GRPCTestServerServicer_to_server(Servicer(), server) - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") # setup the RPC rpc_call = "/GRPCTestServer/ServerStreamingMethod" @@ -328,8 +297,7 @@ def test_create_span_streaming(self): self.assertSpanHasAttributes( span, { - SpanAttributes.NET_PEER_IP: "[::1]", - SpanAttributes.NET_PEER_NAME: "localhost", + **self.net_peer_span_attributes, SpanAttributes.RPC_METHOD: "ServerStreamingMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", @@ -360,15 +328,11 @@ def ServerStreamingMethod(self, request, context): # Intercept gRPC calls... interceptor = server_interceptor() - with futures.ThreadPoolExecutor(max_workers=1) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - interceptors=[interceptor], - ) + with self.server( + max_workers=1, + interceptors=[interceptor], + ) as (server, channel): add_GRPCTestServerServicer_to_server(TwoSpanServicer(), server) - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") # setup the RPC rpc_call = "/GRPCTestServer/ServerStreamingMethod" @@ -397,8 +361,7 @@ def ServerStreamingMethod(self, request, context): self.assertSpanHasAttributes( parent_span, { - SpanAttributes.NET_PEER_IP: "[::1]", - SpanAttributes.NET_PEER_NAME: "localhost", + **self.net_peer_span_attributes, SpanAttributes.RPC_METHOD: "ServerStreamingMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", @@ -427,17 +390,12 @@ def handler(request, context): active_span_in_handler = trace.get_current_span() return b"" - with futures.ThreadPoolExecutor(max_workers=1) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - interceptors=[interceptor], - ) + with self.server( + max_workers=1, + interceptors=[interceptor], + ) as (server, channel): server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") - active_span_before_call = trace.get_current_span() try: server.start() @@ -463,17 +421,12 @@ def handler(request, context): active_spans_in_handler.append(trace.get_current_span()) return b"" - with futures.ThreadPoolExecutor(max_workers=1) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - interceptors=[interceptor], - ) + with self.server( + max_workers=1, + interceptors=[interceptor], + ) as (server, channel): server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") - try: server.start() channel.unary_unary("TestServicer/handler")(b"") @@ -496,8 +449,7 @@ def handler(request, context): self.assertSpanHasAttributes( span, { - SpanAttributes.NET_PEER_IP: "[::1]", - SpanAttributes.NET_PEER_NAME: "localhost", + **self.net_peer_span_attributes, SpanAttributes.RPC_METHOD: "handler", SpanAttributes.RPC_SERVICE: "TestServicer", SpanAttributes.RPC_SYSTEM: "grpc", @@ -527,17 +479,12 @@ def handler(request, context): active_spans_in_handler.append(trace.get_current_span()) return b"" - with futures.ThreadPoolExecutor(max_workers=2) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - interceptors=[interceptor], - ) + with self.server( + max_workers=2, + interceptors=[interceptor], + ) as (server, channel): server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") - try: server.start() # Interleave calls so spans are active on each thread at the same @@ -568,8 +515,7 @@ def handler(request, context): self.assertSpanHasAttributes( span, { - SpanAttributes.NET_PEER_IP: "[::1]", - SpanAttributes.NET_PEER_NAME: "localhost", + **self.net_peer_span_attributes, SpanAttributes.RPC_METHOD: "handler", SpanAttributes.RPC_SERVICE: "TestServicer", SpanAttributes.RPC_SYSTEM: "grpc", @@ -592,18 +538,11 @@ def test_abort(self): def handler(request, context): context.abort(grpc.StatusCode.FAILED_PRECONDITION, failure_message) - with futures.ThreadPoolExecutor(max_workers=1) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - interceptors=[interceptor], - ) - + with self.server( + max_workers=1, + interceptors=[interceptor], + ) as (server, channel): server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) - - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") - rpc_call = "TestServicer/handler" server.start() @@ -635,8 +574,7 @@ def handler(request, context): self.assertSpanHasAttributes( span, { - SpanAttributes.NET_PEER_IP: "[::1]", - SpanAttributes.NET_PEER_NAME: "localhost", + **self.net_peer_span_attributes, SpanAttributes.RPC_METHOD: "handler", SpanAttributes.RPC_SERVICE: "TestServicer", SpanAttributes.RPC_SYSTEM: "grpc", @@ -647,6 +585,52 @@ def handler(request, context): ) +class TestOpenTelemetryServerInterceptorTcp( + TestBase, + OpenTelemetryServerInterceptorBase, +): + net_peer_span_attributes = { + SpanAttributes.NET_PEER_IP: "[::1]", + SpanAttributes.NET_PEER_NAME: "localhost", + } + + @contextlib.contextmanager + def server(self, max_workers=1, interceptors=None): + with futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=interceptors or [], + ) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + yield server, channel + + +class TestOpenTelemetryServerInterceptorUnix( + TestBase, + OpenTelemetryServerInterceptorBase, +): + net_peer_span_attributes = {} + + @contextlib.contextmanager + def server(self, max_workers=1, interceptors=None): + with futures.ThreadPoolExecutor( + max_workers=max_workers + ) as executor, tempfile.TemporaryDirectory() as tmp: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=interceptors or [], + ) + + sock = f"unix://{tmp}/grpc.sock" + server.add_insecure_port(sock) + channel = grpc.insecure_channel(sock) + yield server, channel + + def get_latch(num): """Get a countdown latch function for use in n threads.""" cv = threading.Condition() From ec25dd571d415d99211d7b9be82458fcd09ed8f5 Mon Sep 17 00:00:00 2001 From: Matt Oberle Date: Thu, 1 Jun 2023 13:21:28 -0400 Subject: [PATCH 2/4] Update CHANGELOG --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ee92bdab7a..21fa5b4117 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1706](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1706)) - `opentelemetry-instrumentation-pymemcache` Update instrumentation to support pymemcache >4 ([#1764](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1764)) +- `opentelemetry-instrumentation-grpc` Allow gRPC connections via Unix socket + ([#1833](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1833)) ### Added From 80a518f848cb290d1144bd8700fde38b8c5ca898 Mon Sep 17 00:00:00 2001 From: Matt Oberle Date: Thu, 1 Jun 2023 14:12:01 -0400 Subject: [PATCH 3/4] Add placeholder attributes for linter --- .../tests/test_server_interceptor.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py index 7192e462a7..4eec56d289 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py @@ -80,6 +80,12 @@ def ServerStreamingMethod(self, request, context): class OpenTelemetryServerInterceptorBase: + net_peer_span_attributes = NotImplemented + + @contextlib.contextmanager + def server(self): + raise NotImplementedError + def test_instrumentor(self): def handler(request, context): return b"" From 67beb2520aca57aa90a1ddfa196f642d9b2b716e Mon Sep 17 00:00:00 2001 From: Matt Oberle Date: Thu, 1 Jun 2023 15:21:22 -0400 Subject: [PATCH 4/4] fix lint --- .../tests/test_server_interceptor.py | 46 +++++++------------ 1 file changed, 17 insertions(+), 29 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py index 4eec56d289..57f27c89d6 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py @@ -79,12 +79,24 @@ def ServerStreamingMethod(self, request, context): ) -class OpenTelemetryServerInterceptorBase: - net_peer_span_attributes = NotImplemented +class TestOpenTelemetryServerInterceptor(TestBase): + net_peer_span_attributes = { + SpanAttributes.NET_PEER_IP: "[::1]", + SpanAttributes.NET_PEER_NAME: "localhost", + } @contextlib.contextmanager - def server(self): - raise NotImplementedError + def server(self, max_workers=1, interceptors=None): + with futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=interceptors or [], + ) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + yield server, channel def test_instrumentor(self): def handler(request, context): @@ -591,32 +603,8 @@ def handler(request, context): ) -class TestOpenTelemetryServerInterceptorTcp( - TestBase, - OpenTelemetryServerInterceptorBase, -): - net_peer_span_attributes = { - SpanAttributes.NET_PEER_IP: "[::1]", - SpanAttributes.NET_PEER_NAME: "localhost", - } - - @contextlib.contextmanager - def server(self, max_workers=1, interceptors=None): - with futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - server = grpc.server( - executor, - options=(("grpc.so_reuseport", 0),), - interceptors=interceptors or [], - ) - - port = server.add_insecure_port("[::]:0") - channel = grpc.insecure_channel(f"localhost:{port:d}") - yield server, channel - - class TestOpenTelemetryServerInterceptorUnix( - TestBase, - OpenTelemetryServerInterceptorBase, + TestOpenTelemetryServerInterceptor, ): net_peer_span_attributes = {}